summaryrefslogtreecommitdiffstats
path: root/module/zfs/dmu_send.c
diff options
context:
space:
mode:
authorPaul Dagnelie <[email protected]>2019-06-19 09:48:13 -0700
committerBrian Behlendorf <[email protected]>2019-06-19 09:48:12 -0700
commit30af21b02569ac192f52ce6e6511015f8a8d5729 (patch)
treee5f1091c2d3a6e511bbd2414782e490c18e0f59c /module/zfs/dmu_send.c
parentc1b5801bb5af0055e5f3d263beaa07026103e212 (diff)
Implement Redacted Send/Receive
Redacted send/receive allows users to send subsets of their data to a target system. One possible use case for this feature is to not transmit sensitive information to a data warehousing, test/dev, or analytics environment. Another is to save space by not replicating unimportant data within a given dataset, for example in backup tools like zrepl. Redacted send/receive is a three-stage process. First, a clone (or clones) is made of the snapshot to be sent to the target. In this clone (or clones), all unnecessary or unwanted data is removed or modified. This clone is then snapshotted to create the "redaction snapshot" (or snapshots). Second, the new zfs redact command is used to create a redaction bookmark. The redaction bookmark stores the list of blocks in a snapshot that were modified by the redaction snapshot(s). Finally, the redaction bookmark is passed as a parameter to zfs send. When sending to the snapshot that was redacted, the redaction bookmark is used to filter out blocks that contain sensitive or unwanted information, and those blocks are not included in the send stream. When sending from the redaction bookmark, the blocks it contains are considered as candidate blocks in addition to those blocks in the destination snapshot that were modified since the creation_txg of the redaction bookmark. This step is necessary to allow the target to rehydrate data in the case where some blocks are accidentally or unnecessarily modified in the redaction snapshot. The changes to bookmarks to enable fast space estimation involve adding deadlists to bookmarks. There is also logic to manage the life cycles of these deadlists. The new size estimation process operates in cases where previously an accurate estimate could not be provided. In those cases, a send is performed where no data blocks are read, reducing the runtime significantly and providing a byte-accurate size estimate. Reviewed-by: Dan Kimmel <[email protected]> Reviewed-by: Matt Ahrens <[email protected]> Reviewed-by: Prashanth Sreenivasa <[email protected]> Reviewed-by: John Kennedy <[email protected]> Reviewed-by: George Wilson <[email protected]> Reviewed-by: Chris Williamson <[email protected]> Reviewed-by: Pavel Zhakarov <[email protected]> Reviewed-by: Sebastien Roy <[email protected]> Reviewed-by: Prakash Surya <[email protected]> Reviewed-by: Brian Behlendorf <[email protected]> Signed-off-by: Paul Dagnelie <[email protected]> Closes #7958
Diffstat (limited to 'module/zfs/dmu_send.c')
-rw-r--r--module/zfs/dmu_send.c2826
1 files changed, 2086 insertions, 740 deletions
diff --git a/module/zfs/dmu_send.c b/module/zfs/dmu_send.c
index a6ff5ce3e..884be31bd 100644
--- a/module/zfs/dmu_send.c
+++ b/module/zfs/dmu_send.c
@@ -21,7 +21,7 @@
/*
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
- * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
* Copyright (c) 2014, Joyent, Inc. All rights reserved.
* Copyright 2014 HybridCluster. All rights reserved.
* Copyright 2016 RackTop Systems.
@@ -58,104 +58,213 @@
#include <sys/bqueue.h>
#include <sys/zvol.h>
#include <sys/policy.h>
+#include <sys/objlist.h>
+#ifdef _KERNEL
+#include <sys/zfs_vfsops.h>
+#endif
/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
int zfs_send_corrupt_data = B_FALSE;
+/*
+ * This tunable controls the amount of data (measured in bytes) that will be
+ * prefetched by zfs send. If the main thread is blocking on reads that haven't
+ * completed, this variable might need to be increased. If instead the main
+ * thread is issuing new reads because the prefetches have fallen out of the
+ * cache, this may need to be decreased.
+ */
int zfs_send_queue_length = SPA_MAXBLOCKSIZE;
+/*
+ * This tunable controls the length of the queues that zfs send worker threads
+ * use to communicate. If the send_main_thread is blocking on these queues,
+ * this variable may need to be increased. If there is a significant slowdown
+ * at the start of a send as these threads consume all the available IO
+ * resources, this variable may need to be decreased.
+ */
+int zfs_send_no_prefetch_queue_length = 1024 * 1024;
+/*
+ * These tunables control the fill fraction of the queues by zfs send. The fill
+ * fraction controls the frequency with which threads have to be cv_signaled.
+ * If a lot of cpu time is being spent on cv_signal, then these should be tuned
+ * down. If the queues empty before the signalled thread can catch up, then
+ * these should be tuned up.
+ */
+int zfs_send_queue_ff = 20;
+int zfs_send_no_prefetch_queue_ff = 20;
+
+/*
+ * Use this to override the recordsize calculation for fast zfs send estimates.
+ */
+int zfs_override_estimate_recordsize = 0;
+
/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
int zfs_send_set_freerecords_bit = B_TRUE;
+
/* Set this tunable to FALSE is disable sending unmodified spill blocks. */
int zfs_send_unmodified_spill_blocks = B_TRUE;
+static inline boolean_t
+overflow_multiply(uint64_t a, uint64_t b, uint64_t *c)
+{
+ uint64_t temp = a * b;
+ if (b != 0 && temp / b != a)
+ return (B_FALSE);
+ *c = temp;
+ return (B_TRUE);
+}
+
/*
- * Use this to override the recordsize calculation for fast zfs send estimates.
+ * Return B_TRUE and modifies *out to the span if the span is less than 2^64,
+ * returns B_FALSE otherwise.
*/
-unsigned long zfs_override_estimate_recordsize = 0;
-
-#define BP_SPAN(datablkszsec, indblkshift, level) \
- (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
- (level) * (indblkshift - SPA_BLKPTRSHIFT)))
+static inline boolean_t
+bp_span(uint32_t datablksz, uint8_t indblkshift, uint64_t level, uint64_t *out)
+{
+ uint64_t spanb = bp_span_in_blocks(indblkshift, level);
+ return (overflow_multiply(spanb, datablksz, out));
+}
struct send_thread_arg {
bqueue_t q;
dsl_dataset_t *ds; /* Dataset to traverse */
+ redaction_list_t *redaction_list;
+ struct send_redact_record *current_record;
uint64_t fromtxg; /* Traverse from this txg */
int flags; /* flags to pass to traverse_dataset */
int error_code;
boolean_t cancel;
zbookmark_phys_t resume;
+ objlist_t *deleted_objs;
+ uint64_t *num_blocks_visited;
};
-struct send_block_record {
- boolean_t eos_marker; /* Marks the end of the stream */
- blkptr_t bp;
- zbookmark_phys_t zb;
- uint8_t indblkshift;
- uint16_t datablkszsec;
- bqueue_node_t ln;
+struct redact_list_thread_arg {
+ boolean_t cancel;
+ bqueue_t q;
+ zbookmark_phys_t resume;
+ redaction_list_t *rl;
+ boolean_t mark_redact;
+ int error_code;
+ uint64_t *num_blocks_visited;
};
-typedef struct dump_bytes_io {
- dmu_sendarg_t *dbi_dsp;
- void *dbi_buf;
- int dbi_len;
-} dump_bytes_io_t;
-
-static int do_dump(dmu_sendarg_t *dsa, struct send_block_record *data);
+/*
+ * A wrapper around struct redact_block so it can be stored in a list_t.
+ */
+struct redact_block_list_node {
+ redact_block_phys_t block;
+ list_node_t node;
+};
-static void
-dump_bytes_cb(void *arg)
-{
- dump_bytes_io_t *dbi = (dump_bytes_io_t *)arg;
- dmu_sendarg_t *dsp = dbi->dbi_dsp;
- dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
- ssize_t resid; /* have to get resid to get detailed errno */
+struct redact_bookmark_info {
+ redact_block_phys_t rbi_furthest[TXG_SIZE];
+ /* Lists of struct redact_block_list_node. */
+ list_t rbi_blocks[TXG_SIZE];
+ boolean_t rbi_synctasc_txg[TXG_SIZE];
+ uint64_t rbi_latest_synctask_txg;
+ redaction_list_t *rbi_redaction_list;
+};
+struct send_merge_thread_arg {
+ bqueue_t q;
+ objset_t *os;
+ struct redact_list_thread_arg *from_arg;
+ struct send_thread_arg *to_arg;
+ struct redact_list_thread_arg *redact_arg;
+ int error;
+ boolean_t cancel;
+ struct redact_bookmark_info rbi;
/*
- * The code does not rely on len being a multiple of 8. We keep
- * this assertion because of the corresponding assertion in
- * receive_read(). Keeping this assertion ensures that we do not
- * inadvertently break backwards compatibility (causing the assertion
- * in receive_read() to trigger on old software). Newer feature flags
- * (such as raw send) may break this assertion since they were
- * introduced after the requirement was made obsolete.
+ * If we're resuming a redacted send, then the object/offset from the
+ * resume token may be different from the object/offset that we have
+ * updated the bookmark to. resume_redact_zb will store the earlier of
+ * the two object/offset pairs, and bookmark_before will be B_TRUE if
+ * resume_redact_zb has the object/offset for resuming the redaction
+ * bookmark, and B_FALSE if resume_redact_zb is storing the
+ * object/offset from the resume token.
*/
+ zbookmark_phys_t resume_redact_zb;
+ boolean_t bookmark_before;
+};
- ASSERT(dbi->dbi_len % 8 == 0 ||
- (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
-
- dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
- (caddr_t)dbi->dbi_buf, dbi->dbi_len,
- 0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
+struct send_range {
+ boolean_t eos_marker; /* Marks the end of the stream */
+ uint64_t object;
+ uint64_t start_blkid;
+ uint64_t end_blkid;
+ bqueue_node_t ln;
+ enum type {DATA, HOLE, OBJECT, OBJECT_RANGE, REDACT,
+ PREVIOUSLY_REDACTED} type;
+ union {
+ struct srd {
+ dmu_object_type_t obj_type;
+ uint32_t datablksz;
+ blkptr_t bp;
+ } data;
+ struct srh {
+ uint32_t datablksz;
+ } hole;
+ struct sro {
+ /*
+ * This is a pointer because embedding it in the
+ * struct causes these structures to be massively larger
+ * for all range types; this makes the code much less
+ * memory efficient.
+ */
+ dnode_phys_t *dnp;
+ blkptr_t bp;
+ } object;
+ struct srr {
+ uint32_t datablksz;
+ } redact;
+ struct sror {
+ blkptr_t bp;
+ } object_range;
+ } sru;
+};
- mutex_enter(&ds->ds_sendstream_lock);
- *dsp->dsa_off += dbi->dbi_len;
- mutex_exit(&ds->ds_sendstream_lock);
-}
+/*
+ * The list of data whose inclusion in a send stream can be pending from
+ * one call to backup_cb to another. Multiple calls to dump_free(),
+ * dump_freeobjects(), and dump_redact() can be aggregated into a single
+ * DRR_FREE, DRR_FREEOBJECTS, or DRR_REDACT replay record.
+ */
+typedef enum {
+ PENDING_NONE,
+ PENDING_FREE,
+ PENDING_FREEOBJECTS,
+ PENDING_REDACT
+} dmu_pendop_t;
+
+typedef struct dmu_send_cookie {
+ dmu_replay_record_t *dsc_drr;
+ dmu_send_outparams_t *dsc_dso;
+ offset_t *dsc_off;
+ objset_t *dsc_os;
+ zio_cksum_t dsc_zc;
+ uint64_t dsc_toguid;
+ uint64_t dsc_fromtxg;
+ int dsc_err;
+ dmu_pendop_t dsc_pending_op;
+ uint64_t dsc_featureflags;
+ uint64_t dsc_last_data_object;
+ uint64_t dsc_last_data_offset;
+ uint64_t dsc_resume_object;
+ uint64_t dsc_resume_offset;
+ boolean_t dsc_sent_begin;
+ boolean_t dsc_sent_end;
+} dmu_send_cookie_t;
+
+static int do_dump(dmu_send_cookie_t *dscp, struct send_range *range);
-static int
-dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
+static void
+range_free(struct send_range *range)
{
- dump_bytes_io_t dbi;
-
- dbi.dbi_dsp = dsp;
- dbi.dbi_buf = buf;
- dbi.dbi_len = len;
-
-#if defined(HAVE_LARGE_STACKS)
- dump_bytes_cb(&dbi);
-#else
- /*
- * The vn_rdwr() call is performed in a taskq to ensure that there is
- * always enough stack space to write safely to the target filesystem.
- * The ZIO_TYPE_FREE threads are used because there can be a lot of
- * them and they are used in vdev_file.c for a similar purpose.
- */
- spa_taskq_dispatch_sync(dmu_objset_spa(dsp->dsa_os), ZIO_TYPE_FREE,
- ZIO_TASKQ_ISSUE, dump_bytes_cb, &dbi, TQ_SLEEP);
-#endif /* HAVE_LARGE_STACKS */
-
- return (dsp->dsa_err);
+ if (range->type == OBJECT) {
+ size_t size = sizeof (dnode_phys_t) *
+ (range->sru.object.dnp->dn_extra_slots + 1);
+ kmem_free(range->sru.object.dnp, size);
+ }
+ kmem_free(range, sizeof (*range));
}
/*
@@ -164,32 +273,60 @@ dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
* up to the start of the checksum itself.
*/
static int
-dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
+dump_record(dmu_send_cookie_t *dscp, void *payload, int payload_len)
{
+ dmu_send_outparams_t *dso = dscp->dsc_dso;
ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
- (void) fletcher_4_incremental_native(dsp->dsa_drr,
+ (void) fletcher_4_incremental_native(dscp->dsc_drr,
offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
- &dsp->dsa_zc);
- if (dsp->dsa_drr->drr_type == DRR_BEGIN) {
- dsp->dsa_sent_begin = B_TRUE;
+ &dscp->dsc_zc);
+ if (dscp->dsc_drr->drr_type == DRR_BEGIN) {
+ dscp->dsc_sent_begin = B_TRUE;
} else {
- ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
+ ASSERT(ZIO_CHECKSUM_IS_ZERO(&dscp->dsc_drr->drr_u.
drr_checksum.drr_checksum));
- dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
+ dscp->dsc_drr->drr_u.drr_checksum.drr_checksum = dscp->dsc_zc;
}
- if (dsp->dsa_drr->drr_type == DRR_END) {
- dsp->dsa_sent_end = B_TRUE;
+ if (dscp->dsc_drr->drr_type == DRR_END) {
+ dscp->dsc_sent_end = B_TRUE;
}
- (void) fletcher_4_incremental_native(&dsp->dsa_drr->
+ (void) fletcher_4_incremental_native(&dscp->dsc_drr->
drr_u.drr_checksum.drr_checksum,
- sizeof (zio_cksum_t), &dsp->dsa_zc);
- if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
+ sizeof (zio_cksum_t), &dscp->dsc_zc);
+ *dscp->dsc_off += sizeof (dmu_replay_record_t);
+ dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, dscp->dsc_drr,
+ sizeof (dmu_replay_record_t), dso->dso_arg);
+ if (dscp->dsc_err != 0)
return (SET_ERROR(EINTR));
if (payload_len != 0) {
- (void) fletcher_4_incremental_native(payload, payload_len,
- &dsp->dsa_zc);
- if (dump_bytes(dsp, payload, payload_len) != 0)
+ *dscp->dsc_off += payload_len;
+ /*
+ * payload is null when dso->ryrun == B_TRUE (i.e. when we're
+ * doing a send size calculation)
+ */
+ if (payload != NULL) {
+ (void) fletcher_4_incremental_native(
+ payload, payload_len, &dscp->dsc_zc);
+ }
+
+ /*
+ * The code does not rely on this (len being a multiple of 8).
+ * We keep this assertion because of the corresponding assertion
+ * in receive_read(). Keeping this assertion ensures that we do
+ * not inadvertently break backwards compatibility (causing the
+ * assertion in receive_read() to trigger on old software).
+ *
+ * Raw sends cannot be received on old software, and so can
+ * bypass this assertion.
+ */
+
+ ASSERT((payload_len % 8 == 0) ||
+ (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW));
+
+ dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, payload,
+ payload_len, dso->dso_arg);
+ if (dscp->dsc_err != 0)
return (SET_ERROR(EINTR));
}
return (0);
@@ -204,10 +341,10 @@ dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
* and freeobject records that were generated on the source.
*/
static int
-dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
+dump_free(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
uint64_t length)
{
- struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
+ struct drr_free *drrf = &(dscp->dsc_drr->drr_u.drr_free);
/*
* When we receive a free record, dbuf_free_range() assumes
@@ -222,87 +359,131 @@ dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
* another way to assert that the one-record constraint is still
* satisfied.
*/
- ASSERT(object > dsp->dsa_last_data_object ||
- (object == dsp->dsa_last_data_object &&
- offset > dsp->dsa_last_data_offset));
+ ASSERT(object > dscp->dsc_last_data_object ||
+ (object == dscp->dsc_last_data_object &&
+ offset > dscp->dsc_last_data_offset));
/*
* If there is a pending op, but it's not PENDING_FREE, push it out,
* since free block aggregation can only be done for blocks of the
* same type (i.e., DRR_FREE records can only be aggregated with
* other DRR_FREE records. DRR_FREEOBJECTS records can only be
- * aggregated with other DRR_FREEOBJECTS records.
+ * aggregated with other DRR_FREEOBJECTS records).
*/
- if (dsp->dsa_pending_op != PENDING_NONE &&
- dsp->dsa_pending_op != PENDING_FREE) {
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dscp->dsc_pending_op != PENDING_NONE &&
+ dscp->dsc_pending_op != PENDING_FREE) {
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
- dsp->dsa_pending_op = PENDING_NONE;
+ dscp->dsc_pending_op = PENDING_NONE;
}
- if (dsp->dsa_pending_op == PENDING_FREE) {
- /*
- * There should never be a PENDING_FREE if length is
- * DMU_OBJECT_END (because dump_dnode is the only place where
- * this function is called with a DMU_OBJECT_END, and only after
- * flushing any pending record).
- */
- ASSERT(length != DMU_OBJECT_END);
+ if (dscp->dsc_pending_op == PENDING_FREE) {
/*
* Check to see whether this free block can be aggregated
* with pending one.
*/
if (drrf->drr_object == object && drrf->drr_offset +
drrf->drr_length == offset) {
- if (offset + length < offset)
- drrf->drr_length = DMU_OBJECT_END;
+ if (offset + length < offset || length == UINT64_MAX)
+ drrf->drr_length = UINT64_MAX;
else
drrf->drr_length += length;
return (0);
} else {
/* not a continuation. Push out pending record */
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
- dsp->dsa_pending_op = PENDING_NONE;
+ dscp->dsc_pending_op = PENDING_NONE;
}
}
/* create a FREE record and make it pending */
- bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
- dsp->dsa_drr->drr_type = DRR_FREE;
+ bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ dscp->dsc_drr->drr_type = DRR_FREE;
drrf->drr_object = object;
drrf->drr_offset = offset;
if (offset + length < offset)
drrf->drr_length = DMU_OBJECT_END;
else
drrf->drr_length = length;
- drrf->drr_toguid = dsp->dsa_toguid;
+ drrf->drr_toguid = dscp->dsc_toguid;
if (length == DMU_OBJECT_END) {
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
} else {
- dsp->dsa_pending_op = PENDING_FREE;
+ dscp->dsc_pending_op = PENDING_FREE;
}
return (0);
}
+/*
+ * Fill in the drr_redact struct, or perform aggregation if the previous record
+ * is also a redaction record, and the two are adjacent.
+ */
static int
-dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, uint64_t object,
+dump_redact(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
+ uint64_t length)
+{
+ struct drr_redact *drrr = &dscp->dsc_drr->drr_u.drr_redact;
+
+ /*
+ * If there is a pending op, but it's not PENDING_REDACT, push it out,
+ * since free block aggregation can only be done for blocks of the
+ * same type (i.e., DRR_REDACT records can only be aggregated with
+ * other DRR_REDACT records).
+ */
+ if (dscp->dsc_pending_op != PENDING_NONE &&
+ dscp->dsc_pending_op != PENDING_REDACT) {
+ if (dump_record(dscp, NULL, 0) != 0)
+ return (SET_ERROR(EINTR));
+ dscp->dsc_pending_op = PENDING_NONE;
+ }
+
+ if (dscp->dsc_pending_op == PENDING_REDACT) {
+ /*
+ * Check to see whether this redacted block can be aggregated
+ * with pending one.
+ */
+ if (drrr->drr_object == object && drrr->drr_offset +
+ drrr->drr_length == offset) {
+ drrr->drr_length += length;
+ return (0);
+ } else {
+ /* not a continuation. Push out pending record */
+ if (dump_record(dscp, NULL, 0) != 0)
+ return (SET_ERROR(EINTR));
+ dscp->dsc_pending_op = PENDING_NONE;
+ }
+ }
+ /* create a REDACT record and make it pending */
+ bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ dscp->dsc_drr->drr_type = DRR_REDACT;
+ drrr->drr_object = object;
+ drrr->drr_offset = offset;
+ drrr->drr_length = length;
+ drrr->drr_toguid = dscp->dsc_toguid;
+ dscp->dsc_pending_op = PENDING_REDACT;
+
+ return (0);
+}
+
+static int
+dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
uint64_t offset, int lsize, int psize, const blkptr_t *bp, void *data)
{
uint64_t payload_size;
- boolean_t raw = (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW);
- struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
+ boolean_t raw = (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);
+ struct drr_write *drrw = &(dscp->dsc_drr->drr_u.drr_write);
/*
* We send data in increasing object, offset order.
* See comment in dump_free() for details.
*/
- ASSERT(object > dsp->dsa_last_data_object ||
- (object == dsp->dsa_last_data_object &&
- offset > dsp->dsa_last_data_offset));
- dsp->dsa_last_data_object = object;
- dsp->dsa_last_data_offset = offset + lsize - 1;
+ ASSERT(object > dscp->dsc_last_data_object ||
+ (object == dscp->dsc_last_data_object &&
+ offset > dscp->dsc_last_data_offset));
+ dscp->dsc_last_data_object = object;
+ dscp->dsc_last_data_offset = offset + lsize - 1;
/*
* If there is any kind of pending aggregation (currently either
@@ -310,22 +491,24 @@ dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, uint64_t object,
* the stream, since aggregation can't be done across operations
* of different types.
*/
- if (dsp->dsa_pending_op != PENDING_NONE) {
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dscp->dsc_pending_op != PENDING_NONE) {
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
- dsp->dsa_pending_op = PENDING_NONE;
+ dscp->dsc_pending_op = PENDING_NONE;
}
/* write a WRITE record */
- bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
- dsp->dsa_drr->drr_type = DRR_WRITE;
+ bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ dscp->dsc_drr->drr_type = DRR_WRITE;
drrw->drr_object = object;
drrw->drr_type = type;
drrw->drr_offset = offset;
- drrw->drr_toguid = dsp->dsa_toguid;
+ drrw->drr_toguid = dscp->dsc_toguid;
drrw->drr_logical_size = lsize;
/* only set the compression fields if the buf is compressed or raw */
if (raw || lsize != psize) {
+ ASSERT(raw || dscp->dsc_featureflags &
+ DMU_BACKUP_FEATURE_COMPRESSED);
ASSERT(!BP_IS_EMBEDDED(bp));
ASSERT3S(psize, >, 0);
@@ -345,7 +528,7 @@ dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, uint64_t object,
zio_crypt_decode_mac_bp(bp, drrw->drr_mac);
} else {
/* this is a compressed block */
- ASSERT(dsp->dsa_featureflags &
+ ASSERT(dscp->dsc_featureflags &
DMU_BACKUP_FEATURE_COMPRESSED);
ASSERT(!BP_SHOULD_BYTESWAP(bp));
ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)));
@@ -381,33 +564,33 @@ dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, uint64_t object,
drrw->drr_key.ddk_cksum = bp->blk_cksum;
}
- if (dump_record(dsp, data, payload_size) != 0)
+ if (dump_record(dscp, data, payload_size) != 0)
return (SET_ERROR(EINTR));
return (0);
}
static int
-dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
+dump_write_embedded(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
int blksz, const blkptr_t *bp)
{
char buf[BPE_PAYLOAD_SIZE];
struct drr_write_embedded *drrw =
- &(dsp->dsa_drr->drr_u.drr_write_embedded);
+ &(dscp->dsc_drr->drr_u.drr_write_embedded);
- if (dsp->dsa_pending_op != PENDING_NONE) {
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dscp->dsc_pending_op != PENDING_NONE) {
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
- dsp->dsa_pending_op = PENDING_NONE;
+ dscp->dsc_pending_op = PENDING_NONE;
}
ASSERT(BP_IS_EMBEDDED(bp));
- bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
- dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
+ bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ dscp->dsc_drr->drr_type = DRR_WRITE_EMBEDDED;
drrw->drr_object = object;
drrw->drr_offset = offset;
drrw->drr_length = blksz;
- drrw->drr_toguid = dsp->dsa_toguid;
+ drrw->drr_toguid = dscp->dsc_toguid;
drrw->drr_compression = BP_GET_COMPRESS(bp);
drrw->drr_etype = BPE_GET_ETYPE(bp);
drrw->drr_lsize = BPE_GET_LSIZE(bp);
@@ -415,39 +598,40 @@ dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
decode_embedded_bp_compressed(bp, buf);
- if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
+ if (dump_record(dscp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
return (SET_ERROR(EINTR));
return (0);
}
static int
-dump_spill(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object, void *data)
+dump_spill(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
+ void *data)
{
- struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
+ struct drr_spill *drrs = &(dscp->dsc_drr->drr_u.drr_spill);
uint64_t blksz = BP_GET_LSIZE(bp);
uint64_t payload_size = blksz;
- if (dsp->dsa_pending_op != PENDING_NONE) {
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dscp->dsc_pending_op != PENDING_NONE) {
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
- dsp->dsa_pending_op = PENDING_NONE;
+ dscp->dsc_pending_op = PENDING_NONE;
}
/* write a SPILL record */
- bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
- dsp->dsa_drr->drr_type = DRR_SPILL;
+ bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ dscp->dsc_drr->drr_type = DRR_SPILL;
drrs->drr_object = object;
drrs->drr_length = blksz;
- drrs->drr_toguid = dsp->dsa_toguid;
+ drrs->drr_toguid = dscp->dsc_toguid;
/* See comment in dump_dnode() for full details */
if (zfs_send_unmodified_spill_blocks &&
- (bp->blk_birth <= dsp->dsa_fromtxg)) {
+ (bp->blk_birth <= dscp->dsc_fromtxg)) {
drrs->drr_flags |= DRR_SPILL_UNMODIFIED;
}
/* handle raw send fields */
- if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
+ if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {
ASSERT(BP_IS_PROTECTED(bp));
if (BP_SHOULD_BYTESWAP(bp))
@@ -459,17 +643,17 @@ dump_spill(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object, void *data)
payload_size = drrs->drr_compressed_size;
}
- if (dump_record(dsp, data, payload_size) != 0)
+ if (dump_record(dscp, data, payload_size) != 0)
return (SET_ERROR(EINTR));
return (0);
}
static int
-dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
+dump_freeobjects(dmu_send_cookie_t *dscp, uint64_t firstobj, uint64_t numobjs)
{
- struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
+ struct drr_freeobjects *drrfo = &(dscp->dsc_drr->drr_u.drr_freeobjects);
uint64_t maxobj = DNODES_PER_BLOCK *
- (DMU_META_DNODE(dsp->dsa_os)->dn_maxblkid + 1);
+ (DMU_META_DNODE(dscp->dsc_os)->dn_maxblkid + 1);
/*
* ZoL < 0.7 does not handle large FREEOBJECTS records correctly,
@@ -490,15 +674,18 @@ dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
* push it out, since free block aggregation can only be done for
* blocks of the same type (i.e., DRR_FREE records can only be
* aggregated with other DRR_FREE records. DRR_FREEOBJECTS records
- * can only be aggregated with other DRR_FREEOBJECTS records.
+ * can only be aggregated with other DRR_FREEOBJECTS records).
*/
- if (dsp->dsa_pending_op != PENDING_NONE &&
- dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dscp->dsc_pending_op != PENDING_NONE &&
+ dscp->dsc_pending_op != PENDING_FREEOBJECTS) {
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
- dsp->dsa_pending_op = PENDING_NONE;
+ dscp->dsc_pending_op = PENDING_NONE;
}
- if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
+ if (numobjs == 0)
+ numobjs = UINT64_MAX - firstobj;
+
+ if (dscp->dsc_pending_op == PENDING_FREEOBJECTS) {
/*
* See whether this free object array can be aggregated
* with pending one
@@ -508,32 +695,32 @@ dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
return (0);
} else {
/* can't be aggregated. Push out pending record */
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
- dsp->dsa_pending_op = PENDING_NONE;
+ dscp->dsc_pending_op = PENDING_NONE;
}
}
/* write a FREEOBJECTS record */
- bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
- dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
+ bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ dscp->dsc_drr->drr_type = DRR_FREEOBJECTS;
drrfo->drr_firstobj = firstobj;
drrfo->drr_numobjs = numobjs;
- drrfo->drr_toguid = dsp->dsa_toguid;
+ drrfo->drr_toguid = dscp->dsc_toguid;
- dsp->dsa_pending_op = PENDING_FREEOBJECTS;
+ dscp->dsc_pending_op = PENDING_FREEOBJECTS;
return (0);
}
static int
-dump_dnode(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object,
+dump_dnode(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
dnode_phys_t *dnp)
{
- struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
+ struct drr_object *drro = &(dscp->dsc_drr->drr_u.drr_object);
int bonuslen;
- if (object < dsp->dsa_resume_object) {
+ if (object < dscp->dsc_resume_object) {
/*
* Note: when resuming, we will visit all the dnodes in
* the block of dnodes that we are resuming from. In
@@ -541,23 +728,23 @@ dump_dnode(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object,
* the one we are resuming from. We should be at most one
* block's worth of dnodes behind the resume point.
*/
- ASSERT3U(dsp->dsa_resume_object - object, <,
+ ASSERT3U(dscp->dsc_resume_object - object, <,
1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
return (0);
}
if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
- return (dump_freeobjects(dsp, object, 1));
+ return (dump_freeobjects(dscp, object, 1));
- if (dsp->dsa_pending_op != PENDING_NONE) {
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dscp->dsc_pending_op != PENDING_NONE) {
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
- dsp->dsa_pending_op = PENDING_NONE;
+ dscp->dsc_pending_op = PENDING_NONE;
}
/* write an OBJECT record */
- bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
- dsp->dsa_drr->drr_type = DRR_OBJECT;
+ bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ dscp->dsc_drr->drr_type = DRR_OBJECT;
drro->drr_object = object;
drro->drr_type = dnp->dn_type;
drro->drr_bonustype = dnp->dn_bonustype;
@@ -566,15 +753,15 @@ dump_dnode(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object,
drro->drr_dn_slots = dnp->dn_extra_slots + 1;
drro->drr_checksumtype = dnp->dn_checksum;
drro->drr_compress = dnp->dn_compress;
- drro->drr_toguid = dsp->dsa_toguid;
+ drro->drr_toguid = dscp->dsc_toguid;
- if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
+ if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
bonuslen = P2ROUNDUP(dnp->dn_bonuslen, 8);
- if ((dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW)) {
+ if ((dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) {
ASSERT(BP_IS_ENCRYPTED(bp));
if (BP_SHOULD_BYTESWAP(bp))
@@ -599,22 +786,22 @@ dump_dnode(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object,
/*
* DRR_OBJECT_SPILL is set for every dnode which references a
- * spill block. This allows the receiving pool to definitively
+ * spill block. This allows the receiving pool to definitively
* determine when a spill block should be kept or freed.
*/
if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR)
drro->drr_flags |= DRR_OBJECT_SPILL;
- if (dump_record(dsp, DN_BONUS(dnp), bonuslen) != 0)
+ if (dump_record(dscp, DN_BONUS(dnp), bonuslen) != 0)
return (SET_ERROR(EINTR));
/* Free anything past the end of the file. */
- if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
+ if (dump_free(dscp, object, (dnp->dn_maxblkid + 1) *
(dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), DMU_OBJECT_END) != 0)
return (SET_ERROR(EINTR));
/*
- * Send DRR_SPILL records for unmodified spill blocks. This is useful
+ * Send DRR_SPILL records for unmodified spill blocks. This is useful
* because changing certain attributes of the object (e.g. blocksize)
* can cause old versions of ZFS to incorrectly remove a spill block.
* Including these records in the stream forces an up to date version
@@ -624,62 +811,67 @@ dump_dnode(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object,
*/
if (zfs_send_unmodified_spill_blocks &&
(dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) &&
- (DN_SPILL_BLKPTR(dnp)->blk_birth <= dsp->dsa_fromtxg)) {
- struct send_block_record record;
+ (DN_SPILL_BLKPTR(dnp)->blk_birth <= dscp->dsc_fromtxg)) {
+ struct send_range record;
+ blkptr_t *bp = DN_SPILL_BLKPTR(dnp);
- bzero(&record, sizeof (struct send_block_record));
+ bzero(&record, sizeof (struct send_range));
+ record.type = DATA;
+ record.object = object;
record.eos_marker = B_FALSE;
- record.bp = *DN_SPILL_BLKPTR(dnp);
- SET_BOOKMARK(&(record.zb), dmu_objset_id(dsp->dsa_os),
- object, 0, DMU_SPILL_BLKID);
+ record.start_blkid = DMU_SPILL_BLKID;
+ record.end_blkid = record.start_blkid + 1;
+ record.sru.data.bp = *bp;
+ record.sru.data.obj_type = dnp->dn_type;
+ record.sru.data.datablksz = BP_GET_LSIZE(bp);
- if (do_dump(dsp, &record) != 0)
+ if (do_dump(dscp, &record) != 0)
return (SET_ERROR(EINTR));
}
- if (dsp->dsa_err != 0)
+ if (dscp->dsc_err != 0)
return (SET_ERROR(EINTR));
return (0);
}
static int
-dump_object_range(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t firstobj,
- uint64_t numslots)
+dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp,
+ uint64_t firstobj, uint64_t numslots)
{
struct drr_object_range *drror =
- &(dsp->dsa_drr->drr_u.drr_object_range);
+ &(dscp->dsc_drr->drr_u.drr_object_range);
/* we only use this record type for raw sends */
ASSERT(BP_IS_PROTECTED(bp));
- ASSERT(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW);
+ ASSERT(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);
ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_DNODE);
ASSERT0(BP_GET_LEVEL(bp));
- if (dsp->dsa_pending_op != PENDING_NONE) {
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dscp->dsc_pending_op != PENDING_NONE) {
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
- dsp->dsa_pending_op = PENDING_NONE;
+ dscp->dsc_pending_op = PENDING_NONE;
}
- bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
- dsp->dsa_drr->drr_type = DRR_OBJECT_RANGE;
+ bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ dscp->dsc_drr->drr_type = DRR_OBJECT_RANGE;
drror->drr_firstobj = firstobj;
drror->drr_numslots = numslots;
- drror->drr_toguid = dsp->dsa_toguid;
+ drror->drr_toguid = dscp->dsc_toguid;
if (BP_SHOULD_BYTESWAP(bp))
drror->drr_flags |= DRR_RAW_BYTESWAP;
zio_crypt_decode_params_bp(bp, drror->drr_salt, drror->drr_iv);
zio_crypt_decode_mac_bp(bp, drror->drr_mac);
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dump_record(dscp, NULL, 0) != 0)
return (SET_ERROR(EINTR));
return (0);
}
static boolean_t
-backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
+send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
{
if (!BP_IS_EMBEDDED(bp))
return (B_FALSE);
@@ -688,7 +880,7 @@ backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
* Compression function must be legacy, or explicitly enabled.
*/
if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
- !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LZ4)))
+ !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LZ4)))
return (B_FALSE);
/*
@@ -696,7 +888,7 @@ backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
*/
switch (BPE_GET_ETYPE(bp)) {
case BP_EMBEDDED_TYPE_DATA:
- if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
+ if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
return (B_TRUE);
break;
default:
@@ -706,7 +898,239 @@ backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
}
/*
- * This is the callback function to traverse_dataset that acts as the worker
+ * This function actually handles figuring out what kind of record needs to be
+ * dumped, reading the data (which has hopefully been prefetched), and calling
+ * the appropriate helper function.
+ */
+static int
+do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
+{
+ int err = 0;
+ switch (range->type) {
+ case OBJECT:
+ err = dump_dnode(dscp, &range->sru.object.bp, range->object,
+ range->sru.object.dnp);
+ return (err);
+ case OBJECT_RANGE: {
+ ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
+ if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) {
+ return (0);
+ }
+ uint64_t epb = BP_GET_LSIZE(&range->sru.object_range.bp) >>
+ DNODE_SHIFT;
+ uint64_t firstobj = range->start_blkid * epb;
+ err = dump_object_range(dscp, &range->sru.object_range.bp,
+ firstobj, epb);
+ break;
+ }
+ case REDACT: {
+ struct srr *srrp = &range->sru.redact;
+ err = dump_redact(dscp, range->object, range->start_blkid *
+ srrp->datablksz, (range->end_blkid - range->start_blkid) *
+ srrp->datablksz);
+ return (err);
+ }
+ case DATA: {
+ struct srd *srdp = &range->sru.data;
+ blkptr_t *bp = &srdp->bp;
+ spa_t *spa =
+ dmu_objset_spa(dscp->dsc_os);
+
+ ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
+ ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
+ if (BP_GET_TYPE(bp) == DMU_OT_SA) {
+ arc_flags_t aflags = ARC_FLAG_WAIT;
+ enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
+
+ if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {
+ ASSERT(BP_IS_PROTECTED(bp));
+ zioflags |= ZIO_FLAG_RAW;
+ }
+
+ arc_buf_t *abuf;
+ zbookmark_phys_t zb;
+ ASSERT3U(range->start_blkid, ==, DMU_SPILL_BLKID);
+ zb.zb_objset = dmu_objset_id(dscp->dsc_os);
+ zb.zb_object = range->object;
+ zb.zb_level = 0;
+ zb.zb_blkid = range->start_blkid;
+
+ if (!dscp->dsc_dso->dso_dryrun && arc_read(NULL, spa,
+ bp, arc_getbuf_func, &abuf, ZIO_PRIORITY_ASYNC_READ,
+ zioflags, &aflags, &zb) != 0)
+ return (SET_ERROR(EIO));
+
+ err = dump_spill(dscp, bp, zb.zb_object, abuf->b_data);
+ arc_buf_destroy(abuf, &abuf);
+ return (err);
+ }
+ if (send_do_embed(dscp, bp)) {
+ err = dump_write_embedded(dscp, range->object,
+ range->start_blkid * srdp->datablksz,
+ srdp->datablksz, bp);
+ return (err);
+ }
+ ASSERT(range->object > dscp->dsc_resume_object ||
+ (range->object == dscp->dsc_resume_object &&
+ range->start_blkid * srdp->datablksz >=
+ dscp->dsc_resume_offset));
+ /* it's a level-0 block of a regular object */
+ arc_flags_t aflags = ARC_FLAG_WAIT;
+ arc_buf_t *abuf = NULL;
+ uint64_t offset;
+
+ /*
+ * If we have large blocks stored on disk but the send flags
+ * don't allow us to send large blocks, we split the data from
+ * the arc buf into chunks.
+ */
+ boolean_t split_large_blocks =
+ srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
+ !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
+
+ /*
+ * Raw sends require that we always get raw data as it exists
+ * on disk, so we assert that we are not splitting blocks here.
+ */
+ boolean_t request_raw =
+ (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
+
+ /*
+ * We should only request compressed data from the ARC if all
+ * the following are true:
+ * - stream compression was requested
+ * - we aren't splitting large blocks into smaller chunks
+ * - the data won't need to be byteswapped before sending
+ * - this isn't an embedded block
+ * - this isn't metadata (if receiving on a different endian
+ * system it can be byteswapped more easily)
+ */
+ boolean_t request_compressed =
+ (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
+ !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
+ !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
+
+ IMPLY(request_raw, !split_large_blocks);
+ IMPLY(request_raw, BP_IS_PROTECTED(bp));
+ if (!dscp->dsc_dso->dso_dryrun) {
+ enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
+
+ ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
+
+ if (request_raw)
+ zioflags |= ZIO_FLAG_RAW;
+ else if (request_compressed)
+ zioflags |= ZIO_FLAG_RAW_COMPRESS;
+ zbookmark_phys_t zb;
+ zb.zb_objset = dmu_objset_id(dscp->dsc_os);
+ zb.zb_object = range->object;
+ zb.zb_level = 0;
+ zb.zb_blkid = range->start_blkid;
+
+ err = arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
+ ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, &zb);
+ }
+
+ if (err != 0) {
+ if (zfs_send_corrupt_data &&
+ !dscp->dsc_dso->dso_dryrun) {
+ /* Send a block filled with 0x"zfs badd bloc" */
+ abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
+ srdp->datablksz);
+ uint64_t *ptr;
+ for (ptr = abuf->b_data;
+ (char *)ptr < (char *)abuf->b_data +
+ srdp->datablksz; ptr++)
+ *ptr = 0x2f5baddb10cULL;
+ } else {
+ return (SET_ERROR(EIO));
+ }
+ }
+
+ offset = range->start_blkid * srdp->datablksz;
+
+ if (split_large_blocks) {
+ ASSERT0(arc_is_encrypted(abuf));
+ ASSERT3U(arc_get_compression(abuf), ==,
+ ZIO_COMPRESS_OFF);
+ char *buf = abuf->b_data;
+ while (srdp->datablksz > 0 && err == 0) {
+ int n = MIN(srdp->datablksz,
+ SPA_OLD_MAXBLOCKSIZE);
+ err = dump_write(dscp, srdp->obj_type,
+ range->object, offset, n, n, NULL, buf);
+ offset += n;
+ buf += n;
+ srdp->datablksz -= n;
+ }
+ } else {
+ int psize;
+ if (abuf != NULL) {
+ psize = arc_buf_size(abuf);
+ if (arc_get_compression(abuf) !=
+ ZIO_COMPRESS_OFF) {
+ ASSERT3S(psize, ==, BP_GET_PSIZE(bp));
+ }
+ } else if (!request_compressed) {
+ psize = srdp->datablksz;
+ } else {
+ psize = BP_GET_PSIZE(bp);
+ }
+ err = dump_write(dscp, srdp->obj_type, range->object,
+ offset, srdp->datablksz, psize, bp,
+ (abuf == NULL ? NULL : abuf->b_data));
+ }
+ if (abuf != NULL)
+ arc_buf_destroy(abuf, &abuf);
+ return (err);
+ }
+ case HOLE: {
+ struct srh *srhp = &range->sru.hole;
+ if (range->object == DMU_META_DNODE_OBJECT) {
+ uint32_t span = srhp->datablksz >> DNODE_SHIFT;
+ uint64_t first_obj = range->start_blkid * span;
+ uint64_t numobj = range->end_blkid * span - first_obj;
+ return (dump_freeobjects(dscp, first_obj, numobj));
+ }
+ uint64_t offset = 0;
+
+ /*
+ * If this multiply overflows, we don't need to send this block.
+ * Even if it has a birth time, it can never not be a hole, so
+ * we don't need to send records for it.
+ */
+ if (!overflow_multiply(range->start_blkid, srhp->datablksz,
+ &offset)) {
+ return (0);
+ }
+ uint64_t len = 0;
+
+ if (!overflow_multiply(range->end_blkid, srhp->datablksz, &len))
+ len = UINT64_MAX;
+ len = len - offset;
+ return (dump_free(dscp, range->object, offset, len));
+ }
+ default:
+ panic("Invalid range type in do_dump: %d", range->type);
+ }
+ return (err);
+}
+
+struct send_range *
+range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
+ uint64_t end_blkid, boolean_t eos)
+{
+ struct send_range *range = kmem_alloc(sizeof (*range), KM_SLEEP);
+ range->type = type;
+ range->object = object;
+ range->start_blkid = start_blkid;
+ range->end_blkid = end_blkid;
+ range->eos_marker = eos;
+ return (range);
+}
+
+/*
+ * This is the callback function to traverse_dataset that acts as a worker
* thread for dmu_send_impl.
*/
/*ARGSUSED*/
@@ -715,318 +1139,1189 @@ send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
{
struct send_thread_arg *sta = arg;
- struct send_block_record *record;
- uint64_t record_size;
- int err = 0;
+ struct send_range *record;
ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
zb->zb_object >= sta->resume.zb_object);
ASSERT3P(sta->ds, !=, NULL);
+ /*
+ * All bps of an encrypted os should have the encryption bit set.
+ * If this is not true it indicates tampering and we report an error.
+ */
+ objset_t *os;
+ VERIFY0(dmu_objset_from_ds(sta->ds, &os));
+ if (os->os_encrypted &&
+ !BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {
+ spa_log_error(spa, zb);
+ zfs_panic_recover("unencrypted block in encrypted "
+ "object set %llu", sta->ds->ds_object);
+ return (SET_ERROR(EIO));
+ }
+
if (sta->cancel)
return (SET_ERROR(EINTR));
+ if (zb->zb_object != DMU_META_DNODE_OBJECT &&
+ DMU_OBJECT_IS_SPECIAL(zb->zb_object))
+ return (0);
+ atomic_inc_64(sta->num_blocks_visited);
- if (bp == NULL) {
- ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
+ if (zb->zb_level == ZB_DNODE_LEVEL) {
+ if (zb->zb_object == DMU_META_DNODE_OBJECT)
+ return (0);
+ record = range_alloc(OBJECT, zb->zb_object, 0, 0, B_FALSE);
+ record->sru.object.bp = *bp;
+ size_t size = sizeof (*dnp) * (dnp->dn_extra_slots + 1);
+ record->sru.object.dnp = kmem_alloc(size, KM_SLEEP);
+ bcopy(dnp, record->sru.object.dnp, size);
+ bqueue_enqueue(&sta->q, record, sizeof (*record));
return (0);
- } else if (zb->zb_level < 0) {
+ }
+ if (zb->zb_level == 0 && zb->zb_object == DMU_META_DNODE_OBJECT &&
+ !BP_IS_HOLE(bp)) {
+ record = range_alloc(OBJECT_RANGE, 0, zb->zb_blkid,
+ zb->zb_blkid + 1, B_FALSE);
+ record->sru.object_range.bp = *bp;
+ bqueue_enqueue(&sta->q, record, sizeof (*record));
return (0);
}
+ if (zb->zb_level < 0 || (zb->zb_level > 0 && !BP_IS_HOLE(bp)))
+ return (0);
+ if (zb->zb_object == DMU_META_DNODE_OBJECT && !BP_IS_HOLE(bp))
+ return (0);
- record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
- record->eos_marker = B_FALSE;
- record->bp = *bp;
- record->zb = *zb;
- record->indblkshift = dnp->dn_indblkshift;
- record->datablkszsec = dnp->dn_datablkszsec;
- record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
- bqueue_enqueue(&sta->q, record, record_size);
+ uint64_t span = bp_span_in_blocks(dnp->dn_indblkshift, zb->zb_level);
+ uint64_t start;
- return (err);
+ /*
+ * If this multiply overflows, we don't need to send this block.
+ * Even if it has a birth time, it can never not be a hole, so
+ * we don't need to send records for it.
+ */
+ if (!overflow_multiply(span, zb->zb_blkid, &start) || (!(zb->zb_blkid ==
+ DMU_SPILL_BLKID || DMU_OT_IS_METADATA(dnp->dn_type)) &&
+ span * zb->zb_blkid > dnp->dn_maxblkid)) {
+ ASSERT(BP_IS_HOLE(bp));
+ return (0);
+ }
+
+ if (zb->zb_blkid == DMU_SPILL_BLKID)
+ ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
+
+ record = range_alloc(DATA, zb->zb_object, start, (start + span < start ?
+ 0 : start + span), B_FALSE);
+
+ uint64_t datablksz = (zb->zb_blkid == DMU_SPILL_BLKID ?
+ BP_GET_LSIZE(bp) : dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT);
+ if (BP_IS_HOLE(bp)) {
+ record->type = HOLE;
+ record->sru.hole.datablksz = datablksz;
+ } else if (BP_IS_REDACTED(bp)) {
+ record->type = REDACT;
+ record->sru.redact.datablksz = datablksz;
+ } else {
+ record->type = DATA;
+ record->sru.data.datablksz = datablksz;
+ record->sru.data.obj_type = dnp->dn_type;
+ record->sru.data.bp = *bp;
+ }
+ bqueue_enqueue(&sta->q, record, sizeof (*record));
+ return (0);
+}
+
+struct redact_list_cb_arg {
+ uint64_t *num_blocks_visited;
+ bqueue_t *q;
+ boolean_t *cancel;
+ boolean_t mark_redact;
+};
+
+static int
+redact_list_cb(redact_block_phys_t *rb, void *arg)
+{
+ struct redact_list_cb_arg *rlcap = arg;
+
+ atomic_inc_64(rlcap->num_blocks_visited);
+ if (*rlcap->cancel)
+ return (-1);
+
+ struct send_range *data = range_alloc(REDACT, rb->rbp_object,
+ rb->rbp_blkid, rb->rbp_blkid + redact_block_get_count(rb), B_FALSE);
+ ASSERT3U(data->end_blkid, >, rb->rbp_blkid);
+ if (rlcap->mark_redact) {
+ data->type = REDACT;
+ data->sru.redact.datablksz = redact_block_get_size(rb);
+ } else {
+ data->type = PREVIOUSLY_REDACTED;
+ }
+ bqueue_enqueue(rlcap->q, data, sizeof (*data));
+
+ return (0);
}
/*
* This function kicks off the traverse_dataset. It also handles setting the
* error code of the thread in case something goes wrong, and pushes the End of
* Stream record when the traverse_dataset call has finished. If there is no
- * dataset to traverse, the thread immediately pushes End of Stream marker.
+ * dataset to traverse, then we traverse the redaction list provided and enqueue
+ * records for that. If neither is provided, the thread immediately pushes an
+ * End of Stream marker.
*/
static void
send_traverse_thread(void *arg)
{
struct send_thread_arg *st_arg = arg;
- int err;
- struct send_block_record *data;
+ int err = 0;
+ struct send_range *data;
fstrans_cookie_t cookie = spl_fstrans_mark();
if (st_arg->ds != NULL) {
+ ASSERT3P(st_arg->redaction_list, ==, NULL);
err = traverse_dataset_resume(st_arg->ds,
st_arg->fromtxg, &st_arg->resume,
st_arg->flags, send_cb, st_arg);
+ } else if (st_arg->redaction_list != NULL) {
+ struct redact_list_cb_arg rlcba = {0};
+ rlcba.cancel = &st_arg->cancel;
+ rlcba.num_blocks_visited = st_arg->num_blocks_visited;
+ rlcba.q = &st_arg->q;
+ rlcba.mark_redact = B_FALSE;
+ err = dsl_redaction_list_traverse(st_arg->redaction_list,
+ &st_arg->resume, redact_list_cb, &rlcba);
+ }
+
+ if (err != EINTR)
+ st_arg->error_code = err;
+ data = range_alloc(DATA, 0, 0, 0, B_TRUE);
+ bqueue_enqueue_flush(&st_arg->q, data, sizeof (*data));
+ spl_fstrans_unmark(cookie);
+ thread_exit();
+}
+
+/*
+ * Utility function that causes End of Stream records to compare after of all
+ * others, so that other threads' comparison logic can stay simple.
+ */
+static int __attribute__((unused))
+send_range_after(const struct send_range *from, const struct send_range *to)
+{
+ if (from->eos_marker == B_TRUE)
+ return (1);
+ if (to->eos_marker == B_TRUE)
+ return (-1);
+
+ uint64_t from_obj = from->object;
+ uint64_t from_end_obj = from->object + 1;
+ uint64_t to_obj = to->object;
+ uint64_t to_end_obj = to->object + 1;
+ if (from_obj == 0) {
+ ASSERT(from->type == HOLE || from->type == OBJECT_RANGE);
+ from_obj = from->start_blkid << DNODES_PER_BLOCK_SHIFT;
+ from_end_obj = from->end_blkid << DNODES_PER_BLOCK_SHIFT;
+ }
+ if (to_obj == 0) {
+ ASSERT(to->type == HOLE || to->type == OBJECT_RANGE);
+ to_obj = to->start_blkid << DNODES_PER_BLOCK_SHIFT;
+ to_end_obj = to->end_blkid << DNODES_PER_BLOCK_SHIFT;
+ }
+
+ if (from_end_obj <= to_obj)
+ return (-1);
+ if (from_obj >= to_end_obj)
+ return (1);
+ int64_t cmp = AVL_CMP(to->type == OBJECT_RANGE, from->type ==
+ OBJECT_RANGE);
+ if (unlikely(cmp))
+ return (cmp);
+ cmp = AVL_CMP(to->type == OBJECT, from->type == OBJECT);
+ if (unlikely(cmp))
+ return (cmp);
+ if (from->end_blkid <= to->start_blkid)
+ return (-1);
+ if (from->start_blkid >= to->end_blkid)
+ return (1);
+ return (0);
+}
+/*
+ * Pop the new data off the queue, check that the records we receive are in
+ * the right order, but do not free the old data. This is used so that the
+ * records can be sent on to the main thread without copying the data.
+ */
+static struct send_range *
+get_next_range_nofree(bqueue_t *bq, struct send_range *prev)
+{
+ struct send_range *next = bqueue_dequeue(bq);
+ ASSERT3S(send_range_after(prev, next), ==, -1);
+ return (next);
+}
+
+/*
+ * Pop the new data off the queue, check that the records we receive are in
+ * the right order, and free the old data.
+ */
+static struct send_range *
+get_next_range(bqueue_t *bq, struct send_range *prev)
+{
+ struct send_range *next = get_next_range_nofree(bq, prev);
+ range_free(prev);
+ return (next);
+}
+
+static void
+redact_list_thread(void *arg)
+{
+ struct redact_list_thread_arg *rlt_arg = arg;
+ struct send_range *record;
+ fstrans_cookie_t cookie = spl_fstrans_mark();
+ if (rlt_arg->rl != NULL) {
+ struct redact_list_cb_arg rlcba = {0};
+ rlcba.cancel = &rlt_arg->cancel;
+ rlcba.q = &rlt_arg->q;
+ rlcba.num_blocks_visited = rlt_arg->num_blocks_visited;
+ rlcba.mark_redact = rlt_arg->mark_redact;
+ int err = dsl_redaction_list_traverse(rlt_arg->rl,
+ &rlt_arg->resume, redact_list_cb, &rlcba);
if (err != EINTR)
- st_arg->error_code = err;
+ rlt_arg->error_code = err;
}
- data = kmem_zalloc(sizeof (*data), KM_SLEEP);
- data->eos_marker = B_TRUE;
- bqueue_enqueue(&st_arg->q, data, 1);
+ record = range_alloc(DATA, 0, 0, 0, B_TRUE);
+ bqueue_enqueue_flush(&rlt_arg->q, record, sizeof (*record));
spl_fstrans_unmark(cookie);
- thread_exit();
}
/*
- * This function actually handles figuring out what kind of record needs to be
- * dumped, reading the data (which has hopefully been prefetched), and calling
- * the appropriate helper function.
+ * Compare the start point of the two provided ranges. End of stream ranges
+ * compare last, objects compare before any data or hole inside that object and
+ * multi-object holes that start at the same object.
*/
static int
-do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
+send_range_start_compare(struct send_range *r1, struct send_range *r2)
{
- dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
- const blkptr_t *bp = &data->bp;
- const zbookmark_phys_t *zb = &data->zb;
- uint8_t indblkshift = data->indblkshift;
- uint16_t dblkszsec = data->datablkszsec;
- spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
- dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
- int err = 0;
+ uint64_t r1_objequiv = r1->object;
+ uint64_t r1_l0equiv = r1->start_blkid;
+ uint64_t r2_objequiv = r2->object;
+ uint64_t r2_l0equiv = r2->start_blkid;
+ int64_t cmp = AVL_CMP(r1->eos_marker, r2->eos_marker);
+ if (unlikely(cmp))
+ return (cmp);
+ if (r1->object == 0) {
+ r1_objequiv = r1->start_blkid * DNODES_PER_BLOCK;
+ r1_l0equiv = 0;
+ }
+ if (r2->object == 0) {
+ r2_objequiv = r2->start_blkid * DNODES_PER_BLOCK;
+ r2_l0equiv = 0;
+ }
- ASSERT3U(zb->zb_level, >=, 0);
+ cmp = AVL_CMP(r1_objequiv, r2_objequiv);
+ if (likely(cmp))
+ return (cmp);
+ cmp = AVL_CMP(r2->type == OBJECT_RANGE, r1->type == OBJECT_RANGE);
+ if (unlikely(cmp))
+ return (cmp);
+ cmp = AVL_CMP(r2->type == OBJECT, r1->type == OBJECT);
+ if (unlikely(cmp))
+ return (cmp);
+
+ return (AVL_CMP(r1_l0equiv, r2_l0equiv));
+}
- ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
- zb->zb_object >= dsa->dsa_resume_object);
+enum q_idx {
+ REDACT_IDX = 0,
+ TO_IDX,
+ FROM_IDX,
+ NUM_THREADS
+};
+/*
+ * This function returns the next range the send_merge_thread should operate on.
+ * The inputs are two arrays; the first one stores the range at the front of the
+ * queues stored in the second one. The ranges are sorted in descending
+ * priority order; the metadata from earlier ranges overrules metadata from
+ * later ranges. out_mask is used to return which threads the ranges came from;
+ * bit i is set if ranges[i] started at the same place as the returned range.
+ *
+ * This code is not hardcoded to compare a specific number of threads; it could
+ * be used with any number, just by changing the q_idx enum.
+ *
+ * The "next range" is the one with the earliest start; if two starts are equal,
+ * the highest-priority range is the next to operate on. If a higher-priority
+ * range starts in the middle of the first range, then the first range will be
+ * truncated to end where the higher-priority range starts, and we will operate
+ * on that one next time. In this way, we make sure that each block covered by
+ * some range gets covered by a returned range, and each block covered is
+ * returned using the metadata of the highest-priority range it appears in.
+ *
+ * For example, if the three ranges at the front of the queues were [2,4),
+ * [3,5), and [1,3), then the ranges returned would be [1,2) with the metadata
+ * from the third range, [2,4) with the metadata from the first range, and then
+ * [4,5) with the metadata from the second.
+ */
+static struct send_range *
+find_next_range(struct send_range **ranges, bqueue_t **qs, uint64_t *out_mask)
+{
+ int idx = 0; // index of the range with the earliest start
+ int i;
+ uint64_t bmask = 0;
+ for (i = 1; i < NUM_THREADS; i++) {
+ if (send_range_start_compare(ranges[i], ranges[idx]) < 0)
+ idx = i;
+ }
+ if (ranges[idx]->eos_marker) {
+ struct send_range *ret = range_alloc(DATA, 0, 0, 0, B_TRUE);
+ *out_mask = 0;
+ return (ret);
+ }
/*
- * All bps of an encrypted os should have the encryption bit set.
- * If this is not true it indicates tampering and we report an error.
+ * Find all the ranges that start at that same point.
*/
- if (dsa->dsa_os->os_encrypted &&
- !BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {
- spa_log_error(spa, zb);
- zfs_panic_recover("unencrypted block in encrypted "
- "object set %llu", ds->ds_object);
- return (SET_ERROR(EIO));
+ for (i = 0; i < NUM_THREADS; i++) {
+ if (send_range_start_compare(ranges[i], ranges[idx]) == 0)
+ bmask |= 1 << i;
+ }
+ *out_mask = bmask;
+ /*
+ * OBJECT_RANGE records only come from the TO thread, and should always
+ * be treated as overlapping with nothing and sent on immediately. They
+ * are only used in raw sends, and are never redacted.
+ */
+ if (ranges[idx]->type == OBJECT_RANGE) {
+ ASSERT3U(idx, ==, TO_IDX);
+ ASSERT3U(*out_mask, ==, 1 << TO_IDX);
+ struct send_range *ret = ranges[idx];
+ ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]);
+ return (ret);
+ }
+ /*
+ * Find the first start or end point after the start of the first range.
+ */
+ uint64_t first_change = ranges[idx]->end_blkid;
+ for (i = 0; i < NUM_THREADS; i++) {
+ if (i == idx || ranges[i]->eos_marker ||
+ ranges[i]->object > ranges[idx]->object ||
+ ranges[i]->object == DMU_META_DNODE_OBJECT)
+ continue;
+ ASSERT3U(ranges[i]->object, ==, ranges[idx]->object);
+ if (first_change > ranges[i]->start_blkid &&
+ (bmask & (1 << i)) == 0)
+ first_change = ranges[i]->start_blkid;
+ else if (first_change > ranges[i]->end_blkid)
+ first_change = ranges[i]->end_blkid;
+ }
+ /*
+ * Update all ranges to no longer overlap with the range we're
+ * returning. All such ranges must start at the same place as the range
+ * being returned, and end at or after first_change. Thus we update
+ * their start to first_change. If that makes them size 0, then free
+ * them and pull a new range from that thread.
+ */
+ for (i = 0; i < NUM_THREADS; i++) {
+ if (i == idx || (bmask & (1 << i)) == 0)
+ continue;
+ ASSERT3U(first_change, >, ranges[i]->start_blkid);
+ ranges[i]->start_blkid = first_change;
+ ASSERT3U(ranges[i]->start_blkid, <=, ranges[i]->end_blkid);
+ if (ranges[i]->start_blkid == ranges[i]->end_blkid)
+ ranges[i] = get_next_range(qs[i], ranges[i]);
+ }
+ /*
+ * Short-circuit the simple case; if the range doesn't overlap with
+ * anything else, or it only overlaps with things that start at the same
+ * place and are longer, send it on.
+ */
+ if (first_change == ranges[idx]->end_blkid) {
+ struct send_range *ret = ranges[idx];
+ ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]);
+ return (ret);
}
- if (zb->zb_object != DMU_META_DNODE_OBJECT &&
- DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
- return (0);
- } else if (BP_IS_HOLE(bp) &&
- zb->zb_object == DMU_META_DNODE_OBJECT) {
- uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
- uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
- err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
- } else if (BP_IS_HOLE(bp)) {
- uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
- uint64_t offset = zb->zb_blkid * span;
- /* Don't dump free records for offsets > DMU_OBJECT_END */
- if (zb->zb_blkid == 0 || span <= DMU_OBJECT_END / zb->zb_blkid)
- err = dump_free(dsa, zb->zb_object, offset, span);
- } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
- return (0);
- } else if (type == DMU_OT_DNODE) {
- int epb = BP_GET_LSIZE(bp) >> DNODE_SHIFT;
- arc_flags_t aflags = ARC_FLAG_WAIT;
- arc_buf_t *abuf;
- enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
+ /*
+ * Otherwise, return a truncated copy of ranges[idx] and move the start
+ * of ranges[idx] back to first_change.
+ */
+ struct send_range *ret = kmem_alloc(sizeof (*ret), KM_SLEEP);
+ *ret = *ranges[idx];
+ ret->end_blkid = first_change;
+ ranges[idx]->start_blkid = first_change;
+ return (ret);
+}
+
+#define FROM_AND_REDACT_BITS ((1 << REDACT_IDX) | (1 << FROM_IDX))
- if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
- ASSERT(BP_IS_ENCRYPTED(bp));
- ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
- zioflags |= ZIO_FLAG_RAW;
+/*
+ * Merge the results from the from thread and the to thread, and then hand the
+ * records off to send_prefetch_thread to prefetch them. If this is not a
+ * send from a redaction bookmark, the from thread will push an end of stream
+ * record and stop, and we'll just send everything that was changed in the
+ * to_ds since the ancestor's creation txg. If it is, then since
+ * traverse_dataset has a canonical order, we can compare each change as
+ * they're pulled off the queues. That will give us a stream that is
+ * appropriately sorted, and covers all records. In addition, we pull the
+ * data from the redact_list_thread and use that to determine which blocks
+ * should be redacted.
+ */
+static void
+send_merge_thread(void *arg)
+{
+ struct send_merge_thread_arg *smt_arg = arg;
+ struct send_range *front_ranges[NUM_THREADS];
+ bqueue_t *queues[NUM_THREADS];
+ int err = 0;
+ fstrans_cookie_t cookie = spl_fstrans_mark();
+
+ if (smt_arg->redact_arg == NULL) {
+ front_ranges[REDACT_IDX] =
+ kmem_zalloc(sizeof (struct send_range), KM_SLEEP);
+ front_ranges[REDACT_IDX]->eos_marker = B_TRUE;
+ front_ranges[REDACT_IDX]->type = REDACT;
+ queues[REDACT_IDX] = NULL;
+ } else {
+ front_ranges[REDACT_IDX] =
+ bqueue_dequeue(&smt_arg->redact_arg->q);
+ queues[REDACT_IDX] = &smt_arg->redact_arg->q;
+ }
+ front_ranges[TO_IDX] = bqueue_dequeue(&smt_arg->to_arg->q);
+ queues[TO_IDX] = &smt_arg->to_arg->q;
+ front_ranges[FROM_IDX] = bqueue_dequeue(&smt_arg->from_arg->q);
+ queues[FROM_IDX] = &smt_arg->from_arg->q;
+ uint64_t mask = 0;
+ struct send_range *range;
+ for (range = find_next_range(front_ranges, queues, &mask);
+ !range->eos_marker && err == 0 && !smt_arg->cancel;
+ range = find_next_range(front_ranges, queues, &mask)) {
+ /*
+ * If the range in question was in both the from redact bookmark
+ * and the bookmark we're using to redact, then don't send it.
+ * It's already redacted on the receiving system, so a redaction
+ * record would be redundant.
+ */
+ if ((mask & FROM_AND_REDACT_BITS) == FROM_AND_REDACT_BITS) {
+ ASSERT3U(range->type, ==, REDACT);
+ range_free(range);
+ continue;
}
+ bqueue_enqueue(&smt_arg->q, range, sizeof (*range));
+
+ if (smt_arg->to_arg->error_code != 0) {
+ err = smt_arg->to_arg->error_code;
+ } else if (smt_arg->from_arg->error_code != 0) {
+ err = smt_arg->from_arg->error_code;
+ } else if (smt_arg->redact_arg != NULL &&
+ smt_arg->redact_arg->error_code != 0) {
+ err = smt_arg->redact_arg->error_code;
+ }
+ }
+ if (smt_arg->cancel && err == 0)
+ err = SET_ERROR(EINTR);
+ smt_arg->error = err;
+ if (smt_arg->error != 0) {
+ smt_arg->to_arg->cancel = B_TRUE;
+ smt_arg->from_arg->cancel = B_TRUE;
+ if (smt_arg->redact_arg != NULL)
+ smt_arg->redact_arg->cancel = B_TRUE;
+ }
+ for (int i = 0; i < NUM_THREADS; i++) {
+ while (!front_ranges[i]->eos_marker) {
+ front_ranges[i] = get_next_range(queues[i],
+ front_ranges[i]);
+ }
+ range_free(front_ranges[i]);
+ }
+ if (range == NULL)
+ range = kmem_zalloc(sizeof (*range), KM_SLEEP);
+ range->eos_marker = B_TRUE;
+ bqueue_enqueue_flush(&smt_arg->q, range, 1);
+ spl_fstrans_unmark(cookie);
+ thread_exit();
+}
+
+struct send_prefetch_thread_arg {
+ struct send_merge_thread_arg *smta;
+ bqueue_t q;
+ boolean_t cancel;
+ boolean_t issue_prefetches;
+ int error;
+};
- ASSERT0(zb->zb_level);
+/*
+ * Create a new record with the given values.
+ */
+static void
+enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
+ uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz)
+{
+ enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE :
+ (BP_IS_REDACTED(bp) ? REDACT : DATA));
- if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
- ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0)
- return (SET_ERROR(EIO));
+ struct send_range *range = range_alloc(range_type, dn->dn_object,
+ blkid, blkid + count, B_FALSE);
- dnode_phys_t *blk = abuf->b_data;
- uint64_t dnobj = zb->zb_blkid * epb;
+ if (blkid == DMU_SPILL_BLKID)
+ ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
- /*
- * Raw sends require sending encryption parameters for the
- * block of dnodes. Regular sends do not need to send this
- * info.
- */
- if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
- ASSERT(arc_is_encrypted(abuf));
- err = dump_object_range(dsa, bp, dnobj, epb);
+ switch (range_type) {
+ case HOLE:
+ range->sru.hole.datablksz = datablksz;
+ break;
+ case DATA:
+ ASSERT3U(count, ==, 1);
+ range->sru.data.datablksz = datablksz;
+ range->sru.data.obj_type = dn->dn_type;
+ range->sru.data.bp = *bp;
+ if (spta->issue_prefetches) {
+ zbookmark_phys_t zb = {0};
+ zb.zb_objset = dmu_objset_id(dn->dn_objset);
+ zb.zb_object = dn->dn_object;
+ zb.zb_level = 0;
+ zb.zb_blkid = blkid;
+ arc_flags_t aflags = ARC_FLAG_NOWAIT |
+ ARC_FLAG_PREFETCH;
+ (void) arc_read(NULL, dn->dn_objset->os_spa, bp, NULL,
+ NULL, ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
+ ZIO_FLAG_SPECULATIVE, &aflags, &zb);
}
+ break;
+ case REDACT:
+ range->sru.redact.datablksz = datablksz;
+ break;
+ default:
+ break;
+ }
+ bqueue_enqueue(q, range, datablksz);
+}
- if (err == 0) {
- for (int i = 0; i < epb;
- i += blk[i].dn_extra_slots + 1) {
- err = dump_dnode(dsa, bp, dnobj + i, blk + i);
+/*
+ * This thread is responsible for two things: First, it retrieves the correct
+ * blkptr in the to ds if we need to send the data because of something from
+ * the from thread. As a result of this, we're the first ones to discover that
+ * some indirect blocks can be discarded because they're not holes. Second,
+ * it issues prefetches for the data we need to send.
+ */
+static void
+send_prefetch_thread(void *arg)
+{
+ struct send_prefetch_thread_arg *spta = arg;
+ struct send_merge_thread_arg *smta = spta->smta;
+ bqueue_t *inq = &smta->q;
+ bqueue_t *outq = &spta->q;
+ objset_t *os = smta->os;
+ fstrans_cookie_t cookie = spl_fstrans_mark();
+ struct send_range *range = bqueue_dequeue(inq);
+ int err = 0;
+
+ /*
+ * If the record we're analyzing is from a redaction bookmark from the
+ * fromds, then we need to know whether or not it exists in the tods so
+ * we know whether to create records for it or not. If it does, we need
+ * the datablksz so we can generate an appropriate record for it.
+ * Finally, if it isn't redacted, we need the blkptr so that we can send
+ * a WRITE record containing the actual data.
+ */
+ uint64_t last_obj = UINT64_MAX;
+ uint64_t last_obj_exists = B_TRUE;
+ while (!range->eos_marker && !spta->cancel && smta->error == 0 &&
+ err == 0) {
+ switch (range->type) {
+ case DATA: {
+ zbookmark_phys_t zb;
+ zb.zb_objset = dmu_objset_id(os);
+ zb.zb_object = range->object;
+ zb.zb_level = 0;
+ zb.zb_blkid = range->start_blkid;
+ ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
+ if (!BP_IS_REDACTED(&range->sru.data.bp) &&
+ spta->issue_prefetches &&
+ !BP_IS_EMBEDDED(&range->sru.data.bp)) {
+ arc_flags_t aflags = ARC_FLAG_NOWAIT |
+ ARC_FLAG_PREFETCH;
+ (void) arc_read(NULL, os->os_spa,
+ &range->sru.data.bp, NULL, NULL,
+ ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
+ ZIO_FLAG_SPECULATIVE, &aflags, &zb);
+ }
+ bqueue_enqueue(outq, range, range->sru.data.datablksz);
+ range = get_next_range_nofree(inq, range);
+ break;
+ }
+ case HOLE:
+ case OBJECT:
+ case OBJECT_RANGE:
+ case REDACT: // Redacted blocks must exist
+ bqueue_enqueue(outq, range, sizeof (*range));
+ range = get_next_range_nofree(inq, range);
+ break;
+ case PREVIOUSLY_REDACTED: {
+ /*
+ * This entry came from the "from bookmark" when
+ * sending from a bookmark that has a redaction
+ * list. We need to check if this object/blkid
+ * exists in the target ("to") dataset, and if
+ * not then we drop this entry. We also need
+ * to fill in the block pointer so that we know
+ * what to prefetch.
+ *
+ * To accomplish the above, we first cache whether or
+ * not the last object we examined exists. If it
+ * doesn't, we can drop this record. If it does, we hold
+ * the dnode and use it to call dbuf_dnode_findbp. We do
+ * this instead of dbuf_bookmark_findbp because we will
+ * often operate on large ranges, and holding the dnode
+ * once is more efficient.
+ */
+ boolean_t object_exists = B_TRUE;
+ /*
+ * If the data is redacted, we only care if it exists,
+ * so that we don't send records for objects that have
+ * been deleted.
+ */
+ dnode_t *dn;
+ if (range->object == last_obj && !last_obj_exists) {
+ /*
+ * If we're still examining the same object as
+ * previously, and it doesn't exist, we don't
+ * need to call dbuf_bookmark_findbp.
+ */
+ object_exists = B_FALSE;
+ } else {
+ err = dnode_hold(os, range->object, FTAG, &dn);
+ if (err == ENOENT) {
+ object_exists = B_FALSE;
+ err = 0;
+ }
+ last_obj = range->object;
+ last_obj_exists = object_exists;
+ }
+
+ if (err != 0) {
+ break;
+ } else if (!object_exists) {
+ /*
+ * The block was modified, but doesn't
+ * exist in the to dataset; if it was
+ * deleted in the to dataset, then we'll
+ * visit the hole bp for it at some point.
+ */
+ range = get_next_range(inq, range);
+ continue;
+ }
+ uint64_t file_max =
+ (dn->dn_maxblkid < range->end_blkid ?
+ dn->dn_maxblkid : range->end_blkid);
+ /*
+ * The object exists, so we need to try to find the
+ * blkptr for each block in the range we're processing.
+ */
+ rw_enter(&dn->dn_struct_rwlock, RW_READER);
+ for (uint64_t blkid = range->start_blkid;
+ blkid < file_max; blkid++) {
+ blkptr_t bp;
+ uint32_t datablksz =
+ dn->dn_phys->dn_datablkszsec <<
+ SPA_MINBLOCKSHIFT;
+ uint64_t offset = blkid * datablksz;
+ /*
+ * This call finds the next non-hole block in
+ * the object. This is to prevent a
+ * performance problem where we're unredacting
+ * a large hole. Using dnode_next_offset to
+ * skip over the large hole avoids iterating
+ * over every block in it.
+ */
+ err = dnode_next_offset(dn, DNODE_FIND_HAVELOCK,
+ &offset, 1, 1, 0);
+ if (err == ESRCH) {
+ offset = UINT64_MAX;
+ err = 0;
+ } else if (err != 0) {
+ break;
+ }
+ if (offset != blkid * datablksz) {
+ /*
+ * if there is a hole from here
+ * (blkid) to offset
+ */
+ offset = MIN(offset, file_max *
+ datablksz);
+ uint64_t nblks = (offset / datablksz) -
+ blkid;
+ enqueue_range(spta, outq, dn, blkid,
+ nblks, NULL, datablksz);
+ blkid += nblks;
+ }
+ if (blkid >= file_max)
+ break;
+ err = dbuf_dnode_findbp(dn, 0, blkid, &bp,
+ NULL, NULL);
if (err != 0)
break;
+ ASSERT(!BP_IS_HOLE(&bp));
+ enqueue_range(spta, outq, dn, blkid, 1, &bp,
+ datablksz);
}
+ rw_exit(&dn->dn_struct_rwlock);
+ dnode_rele(dn, FTAG);
+ range = get_next_range(inq, range);
}
- arc_buf_destroy(abuf, &abuf);
- } else if (type == DMU_OT_SA) {
- arc_flags_t aflags = ARC_FLAG_WAIT;
- arc_buf_t *abuf;
- enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
-
- if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
- ASSERT(BP_IS_PROTECTED(bp));
- zioflags |= ZIO_FLAG_RAW;
}
+ }
+ if (spta->cancel || err != 0) {
+ smta->cancel = B_TRUE;
+ spta->error = err;
+ } else if (smta->error != 0) {
+ spta->error = smta->error;
+ }
+ while (!range->eos_marker)
+ range = get_next_range(inq, range);
- if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
- ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0)
- return (SET_ERROR(EIO));
-
- err = dump_spill(dsa, bp, zb->zb_object, abuf->b_data);
- arc_buf_destroy(abuf, &abuf);
- } else if (backup_do_embed(dsa, bp)) {
- /* it's an embedded level-0 block of a regular object */
- int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
- ASSERT0(zb->zb_level);
- err = dump_write_embedded(dsa, zb->zb_object,
- zb->zb_blkid * blksz, blksz, bp);
- } else {
- /* it's a level-0 block of a regular object */
- arc_flags_t aflags = ARC_FLAG_WAIT;
- arc_buf_t *abuf;
- int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
- uint64_t offset;
+ bqueue_enqueue_flush(outq, range, 1);
+ spl_fstrans_unmark(cookie);
+ thread_exit();
+}
- /*
- * If we have large blocks stored on disk but the send flags
- * don't allow us to send large blocks, we split the data from
- * the arc buf into chunks.
- */
- boolean_t split_large_blocks = blksz > SPA_OLD_MAXBLOCKSIZE &&
- !(dsa->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
+#define NUM_SNAPS_NOT_REDACTED UINT64_MAX
- /*
- * Raw sends require that we always get raw data as it exists
- * on disk, so we assert that we are not splitting blocks here.
- */
- boolean_t request_raw =
- (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
+struct dmu_send_params {
+ /* Pool args */
+ void *tag; // Tag that dp was held with, will be used to release dp.
+ dsl_pool_t *dp;
+ /* To snapshot args */
+ const char *tosnap;
+ dsl_dataset_t *to_ds;
+ /* From snapshot args */
+ zfs_bookmark_phys_t ancestor_zb;
+ uint64_t *fromredactsnaps;
+ /* NUM_SNAPS_NOT_REDACTED if not sending from redaction bookmark */
+ uint64_t numfromredactsnaps;
+ /* Stream params */
+ boolean_t is_clone;
+ boolean_t embedok;
+ boolean_t large_block_ok;
+ boolean_t compressok;
+ uint64_t resumeobj;
+ uint64_t resumeoff;
+ zfs_bookmark_phys_t *redactbook;
+ /* Stream output params */
+ dmu_send_outparams_t *dso;
+
+ /* Stream progress params */
+ offset_t *off;
+ int outfd;
+ boolean_t rawok;
+};
- /*
- * We should only request compressed data from the ARC if all
- * the following are true:
- * - stream compression was requested
- * - we aren't splitting large blocks into smaller chunks
- * - the data won't need to be byteswapped before sending
- * - this isn't an embedded block
- * - this isn't metadata (if receiving on a different endian
- * system it can be byteswapped more easily)
- */
- boolean_t request_compressed =
- (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
- !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
- !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
+static int
+setup_featureflags(struct dmu_send_params *dspp, objset_t *os,
+ uint64_t *featureflags)
+{
+ dsl_dataset_t *to_ds = dspp->to_ds;
+ dsl_pool_t *dp = dspp->dp;
+#ifdef _KERNEL
+ if (dmu_objset_type(os) == DMU_OST_ZFS) {
+ uint64_t version;
+ if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0)
+ return (SET_ERROR(EINVAL));
- IMPLY(request_raw, !split_large_blocks);
- IMPLY(request_raw, BP_IS_PROTECTED(bp));
- ASSERT0(zb->zb_level);
- ASSERT(zb->zb_object > dsa->dsa_resume_object ||
- (zb->zb_object == dsa->dsa_resume_object &&
- zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
-
- ASSERT3U(blksz, ==, BP_GET_LSIZE(bp));
-
- enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
- if (request_raw)
- zioflags |= ZIO_FLAG_RAW;
- else if (request_compressed)
- zioflags |= ZIO_FLAG_RAW_COMPRESS;
-
- if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
- ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0) {
- if (zfs_send_corrupt_data) {
- /* Send a block filled with 0x"zfs badd bloc" */
- abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
- blksz);
- uint64_t *ptr;
- for (ptr = abuf->b_data;
- (char *)ptr < (char *)abuf->b_data + blksz;
- ptr++)
- *ptr = 0x2f5baddb10cULL;
- } else {
- return (SET_ERROR(EIO));
- }
- }
+ if (version >= ZPL_VERSION_SA)
+ *featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
+ }
+#endif
- offset = zb->zb_blkid * blksz;
+ /* raw sends imply large_block_ok */
+ if ((dspp->rawok || dspp->large_block_ok) &&
+ dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_BLOCKS)) {
+ *featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
+ }
- if (split_large_blocks) {
- ASSERT0(arc_is_encrypted(abuf));
- ASSERT3U(arc_get_compression(abuf), ==,
- ZIO_COMPRESS_OFF);
- char *buf = abuf->b_data;
- while (blksz > 0 && err == 0) {
- int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
- err = dump_write(dsa, type, zb->zb_object,
- offset, n, n, NULL, buf);
- offset += n;
- buf += n;
- blksz -= n;
- }
- } else {
- err = dump_write(dsa, type, zb->zb_object, offset,
- blksz, arc_buf_size(abuf), bp, abuf->b_data);
- }
- arc_buf_destroy(abuf, &abuf);
+ /* encrypted datasets will not have embedded blocks */
+ if ((dspp->embedok || dspp->rawok) && !os->os_encrypted &&
+ spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
+ *featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
}
- ASSERT(err == 0 || err == EINTR);
- return (err);
+ /* raw send implies compressok */
+ if (dspp->compressok || dspp->rawok)
+ *featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
+ if (dspp->rawok && os->os_encrypted)
+ *featureflags |= DMU_BACKUP_FEATURE_RAW;
+
+ if ((*featureflags &
+ (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED |
+ DMU_BACKUP_FEATURE_RAW)) != 0 &&
+ spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {
+ *featureflags |= DMU_BACKUP_FEATURE_LZ4;
+ }
+
+ if (dspp->resumeobj != 0 || dspp->resumeoff != 0) {
+ *featureflags |= DMU_BACKUP_FEATURE_RESUMING;
+ }
+
+ if (dspp->redactbook != NULL) {
+ *featureflags |= DMU_BACKUP_FEATURE_REDACTED;
+ }
+
+ if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_DNODE)) {
+ *featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;
+ }
+ return (0);
}
-/*
- * Pop the new data off the queue, and free the old data.
- */
-static struct send_block_record *
-get_next_record(bqueue_t *bq, struct send_block_record *data)
+static dmu_replay_record_t *
+create_begin_record(struct dmu_send_params *dspp, objset_t *os,
+ uint64_t featureflags)
+{
+ dmu_replay_record_t *drr = kmem_zalloc(sizeof (dmu_replay_record_t),
+ KM_SLEEP);
+ drr->drr_type = DRR_BEGIN;
+
+ struct drr_begin *drrb = &drr->drr_u.drr_begin;
+ dsl_dataset_t *to_ds = dspp->to_ds;
+
+ drrb->drr_magic = DMU_BACKUP_MAGIC;
+ drrb->drr_creation_time = dsl_dataset_phys(to_ds)->ds_creation_time;
+ drrb->drr_type = dmu_objset_type(os);
+ drrb->drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
+ drrb->drr_fromguid = dspp->ancestor_zb.zbm_guid;
+
+ DMU_SET_STREAM_HDRTYPE(drrb->drr_versioninfo, DMU_SUBSTREAM);
+ DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, featureflags);
+
+ if (dspp->is_clone)
+ drrb->drr_flags |= DRR_FLAG_CLONE;
+ if (dsl_dataset_phys(dspp->to_ds)->ds_flags & DS_FLAG_CI_DATASET)
+ drrb->drr_flags |= DRR_FLAG_CI_DATA;
+ if (zfs_send_set_freerecords_bit)
+ drrb->drr_flags |= DRR_FLAG_FREERECORDS;
+ drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK;
+
+ dsl_dataset_name(to_ds, drrb->drr_toname);
+ if (!to_ds->ds_is_snapshot) {
+ (void) strlcat(drrb->drr_toname, "@--head--",
+ sizeof (drrb->drr_toname));
+ }
+ return (drr);
+}
+
+static void
+setup_to_thread(struct send_thread_arg *to_arg, dsl_dataset_t *to_ds,
+ dmu_sendstatus_t *dssp, uint64_t fromtxg, boolean_t rawok)
+{
+ VERIFY0(bqueue_init(&to_arg->q, zfs_send_no_prefetch_queue_ff,
+ MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
+ offsetof(struct send_range, ln)));
+ to_arg->error_code = 0;
+ to_arg->cancel = B_FALSE;
+ to_arg->ds = to_ds;
+ to_arg->fromtxg = fromtxg;
+ to_arg->flags = TRAVERSE_PRE | TRAVERSE_PREFETCH_METADATA;
+ if (rawok)
+ to_arg->flags |= TRAVERSE_NO_DECRYPT;
+ to_arg->redaction_list = NULL;
+ to_arg->num_blocks_visited = &dssp->dss_blocks;
+ (void) thread_create(NULL, 0, send_traverse_thread, to_arg, 0,
+ curproc, TS_RUN, minclsyspri);
+}
+
+static void
+setup_from_thread(struct redact_list_thread_arg *from_arg,
+ redaction_list_t *from_rl, dmu_sendstatus_t *dssp)
+{
+ VERIFY0(bqueue_init(&from_arg->q, zfs_send_no_prefetch_queue_ff,
+ MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
+ offsetof(struct send_range, ln)));
+ from_arg->error_code = 0;
+ from_arg->cancel = B_FALSE;
+ from_arg->rl = from_rl;
+ from_arg->mark_redact = B_FALSE;
+ from_arg->num_blocks_visited = &dssp->dss_blocks;
+ /*
+ * If from_ds is null, send_traverse_thread just returns success and
+ * enqueues an eos marker.
+ */
+ (void) thread_create(NULL, 0, redact_list_thread, from_arg, 0,
+ curproc, TS_RUN, minclsyspri);
+}
+
+static void
+setup_redact_list_thread(struct redact_list_thread_arg *rlt_arg,
+ struct dmu_send_params *dspp, redaction_list_t *rl, dmu_sendstatus_t *dssp)
{
- struct send_block_record *tmp = bqueue_dequeue(bq);
- kmem_free(data, sizeof (*data));
- return (tmp);
+ if (dspp->redactbook == NULL)
+ return;
+
+ rlt_arg->cancel = B_FALSE;
+ VERIFY0(bqueue_init(&rlt_arg->q, zfs_send_no_prefetch_queue_ff,
+ MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
+ offsetof(struct send_range, ln)));
+ rlt_arg->error_code = 0;
+ rlt_arg->mark_redact = B_TRUE;
+ rlt_arg->rl = rl;
+ rlt_arg->num_blocks_visited = &dssp->dss_blocks;
+
+ (void) thread_create(NULL, 0, redact_list_thread, rlt_arg, 0,
+ curproc, TS_RUN, minclsyspri);
+}
+
+static void
+setup_merge_thread(struct send_merge_thread_arg *smt_arg,
+ struct dmu_send_params *dspp, struct redact_list_thread_arg *from_arg,
+ struct send_thread_arg *to_arg, struct redact_list_thread_arg *rlt_arg,
+ objset_t *os)
+{
+ VERIFY0(bqueue_init(&smt_arg->q, zfs_send_no_prefetch_queue_ff,
+ MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
+ offsetof(struct send_range, ln)));
+ smt_arg->cancel = B_FALSE;
+ smt_arg->error = 0;
+ smt_arg->from_arg = from_arg;
+ smt_arg->to_arg = to_arg;
+ if (dspp->redactbook != NULL)
+ smt_arg->redact_arg = rlt_arg;
+
+ smt_arg->os = os;
+ (void) thread_create(NULL, 0, send_merge_thread, smt_arg, 0, curproc,
+ TS_RUN, minclsyspri);
+}
+
+static void
+setup_prefetch_thread(struct send_prefetch_thread_arg *spt_arg,
+ struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg)
+{
+ VERIFY0(bqueue_init(&spt_arg->q, zfs_send_queue_ff,
+ MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
+ offsetof(struct send_range, ln)));
+ spt_arg->smta = smt_arg;
+ spt_arg->issue_prefetches = !dspp->dso->dso_dryrun;
+ (void) thread_create(NULL, 0, send_prefetch_thread, spt_arg, 0,
+ curproc, TS_RUN, minclsyspri);
+}
+
+static int
+setup_resume_points(struct dmu_send_params *dspp,
+ struct send_thread_arg *to_arg, struct redact_list_thread_arg *from_arg,
+ struct redact_list_thread_arg *rlt_arg,
+ struct send_merge_thread_arg *smt_arg, boolean_t resuming, objset_t *os,
+ redaction_list_t *redact_rl, nvlist_t *nvl)
+{
+ dsl_dataset_t *to_ds = dspp->to_ds;
+ int err = 0;
+
+ uint64_t obj = 0;
+ uint64_t blkid = 0;
+ if (resuming) {
+ obj = dspp->resumeobj;
+ dmu_object_info_t to_doi;
+ err = dmu_object_info(os, obj, &to_doi);
+ if (err != 0)
+ return (err);
+
+ blkid = dspp->resumeoff / to_doi.doi_data_block_size;
+ }
+ /*
+ * If we're resuming a redacted send, we can skip to the appropriate
+ * point in the redaction bookmark by binary searching through it.
+ */
+ smt_arg->bookmark_before = B_FALSE;
+ if (redact_rl != NULL) {
+ SET_BOOKMARK(&rlt_arg->resume, to_ds->ds_object, obj, 0, blkid);
+ }
+
+ SET_BOOKMARK(&to_arg->resume, to_ds->ds_object, obj, 0, blkid);
+ if (nvlist_exists(nvl, BEGINNV_REDACT_FROM_SNAPS)) {
+ uint64_t objset = dspp->ancestor_zb.zbm_redaction_obj;
+ /*
+ * Note: If the resume point is in an object whose
+ * blocksize is different in the from vs to snapshots,
+ * we will have divided by the "wrong" blocksize.
+ * However, in this case fromsnap's send_cb() will
+ * detect that the blocksize has changed and therefore
+ * ignore this object.
+ *
+ * If we're resuming a send from a redaction bookmark,
+ * we still cannot accidentally suggest blocks behind
+ * the to_ds. In addition, we know that any blocks in
+ * the object in the to_ds will have to be sent, since
+ * the size changed. Therefore, we can't cause any harm
+ * this way either.
+ */
+ SET_BOOKMARK(&from_arg->resume, objset, obj, 0, blkid);
+ }
+ if (resuming) {
+ fnvlist_add_uint64(nvl, BEGINNV_RESUME_OBJECT, dspp->resumeobj);
+ fnvlist_add_uint64(nvl, BEGINNV_RESUME_OFFSET, dspp->resumeoff);
+ }
+ return (0);
+}
+
+static dmu_sendstatus_t *
+setup_send_progress(struct dmu_send_params *dspp)
+{
+ dmu_sendstatus_t *dssp = kmem_zalloc(sizeof (*dssp), KM_SLEEP);
+ dssp->dss_outfd = dspp->outfd;
+ dssp->dss_off = dspp->off;
+ dssp->dss_proc = curproc;
+ mutex_enter(&dspp->to_ds->ds_sendstream_lock);
+ list_insert_head(&dspp->to_ds->ds_sendstreams, dssp);
+ mutex_exit(&dspp->to_ds->ds_sendstream_lock);
+ return (dssp);
}
/*
* Actually do the bulk of the work in a zfs send.
*
+ * The idea is that we want to do a send from ancestor_zb to to_ds. We also
+ * want to not send any data that has been modified by all the datasets in
+ * redactsnaparr, and store the list of blocks that are redacted in this way in
+ * a bookmark named redactbook, created on the to_ds. We do this by creating
+ * several worker threads, whose function is described below.
+ *
+ * There are three cases.
+ * The first case is a redacted zfs send. In this case there are 5 threads.
+ * The first thread is the to_ds traversal thread: it calls dataset_traverse on
+ * the to_ds and finds all the blocks that have changed since ancestor_zb (if
+ * it's a full send, that's all blocks in the dataset). It then sends those
+ * blocks on to the send merge thread. The redact list thread takes the data
+ * from the redaction bookmark and sends those blocks on to the send merge
+ * thread. The send merge thread takes the data from the to_ds traversal
+ * thread, and combines it with the redaction records from the redact list
+ * thread. If a block appears in both the to_ds's data and the redaction data,
+ * the send merge thread will mark it as redacted and send it on to the prefetch
+ * thread. Otherwise, the send merge thread will send the block on to the
+ * prefetch thread unchanged. The prefetch thread will issue prefetch reads for
+ * any data that isn't redacted, and then send the data on to the main thread.
+ * The main thread behaves the same as in a normal send case, issuing demand
+ * reads for data blocks and sending out records over the network
+ *
+ * The graphic below diagrams the flow of data in the case of a redacted zfs
+ * send. Each box represents a thread, and each line represents the flow of
+ * data.
+ *
+ * Records from the |
+ * redaction bookmark |
+ * +--------------------+ | +---------------------------+
+ * | | v | Send Merge Thread |
+ * | Redact List Thread +----------> Apply redaction marks to |
+ * | | | records as specified by |
+ * +--------------------+ | redaction ranges |
+ * +----^---------------+------+
+ * | | Merged data
+ * | |
+ * | +------------v--------+
+ * | | Prefetch Thread |
+ * +--------------------+ | | Issues prefetch |
+ * | to_ds Traversal | | | reads of data blocks|
+ * | Thread (finds +---------------+ +------------+--------+
+ * | candidate blocks) | Blocks modified | Prefetched data
+ * +--------------------+ by to_ds since |
+ * ancestor_zb +------------v----+
+ * | Main Thread | File Descriptor
+ * | Sends data over +->(to zfs receive)
+ * | wire |
+ * +-----------------+
+ *
+ * The second case is an incremental send from a redaction bookmark. The to_ds
+ * traversal thread and the main thread behave the same as in the redacted
+ * send case. The new thread is the from bookmark traversal thread. It
+ * iterates over the redaction list in the redaction bookmark, and enqueues
+ * records for each block that was redacted in the original send. The send
+ * merge thread now has to merge the data from the two threads. For details
+ * about that process, see the header comment of send_merge_thread(). Any data
+ * it decides to send on will be prefetched by the prefetch thread. Note that
+ * you can perform a redacted send from a redaction bookmark; in that case,
+ * the data flow behaves very similarly to the flow in the redacted send case,
+ * except with the addition of the bookmark traversal thread iterating over the
+ * redaction bookmark. The send_merge_thread also has to take on the
+ * responsibility of merging the redact list thread's records, the bookmark
+ * traversal thread's records, and the to_ds records.
+ *
+ * +---------------------+
+ * | |
+ * | Redact List Thread +--------------+
+ * | | |
+ * +---------------------+ |
+ * Blocks in redaction list | Ranges modified by every secure snap
+ * of from bookmark | (or EOS if not readcted)
+ * |
+ * +---------------------+ | +----v----------------------+
+ * | bookmark Traversal | v | Send Merge Thread |
+ * | Thread (finds +---------> Merges bookmark, rlt, and |
+ * | candidate blocks) | | to_ds send records |
+ * +---------------------+ +----^---------------+------+
+ * | | Merged data
+ * | +------------v--------+
+ * | | Prefetch Thread |
+ * +--------------------+ | | Issues prefetch |
+ * | to_ds Traversal | | | reads of data blocks|
+ * | Thread (finds +---------------+ +------------+--------+
+ * | candidate blocks) | Blocks modified | Prefetched data
+ * +--------------------+ by to_ds since +------------v----+
+ * ancestor_zb | Main Thread | File Descriptor
+ * | Sends data over +->(to zfs receive)
+ * | wire |
+ * +-----------------+
+ *
+ * The final case is a simple zfs full or incremental send. The to_ds traversal
+ * thread behaves the same as always. The redact list thread is never started.
+ * The send merge thread takes all the blocks that the to_ds traveral thread
+ * sends it, prefetches the data, and sends the blocks on to the main thread.
+ * The main thread sends the data over the wire.
+ *
+ * To keep performance acceptable, we want to prefetch the data in the worker
+ * threads. While the to_ds thread could simply use the TRAVERSE_PREFETCH
+ * feature built into traverse_dataset, the combining and deletion of records
+ * due to redaction and sends from redaction bookmarks mean that we could
+ * issue many unnecessary prefetches. As a result, we only prefetch data
+ * after we've determined that the record is not going to be redacted. To
+ * prevent the prefetching from getting too far ahead of the main thread, the
+ * blocking queues that are used for communication are capped not by the
+ * number of entries in the queue, but by the sum of the size of the
+ * prefetches associated with them. The limit on the amount of data that the
+ * thread can prefetch beyond what the main thread has reached is controlled
+ * by the global variable zfs_send_queue_length. In addition, to prevent poor
+ * performance in the beginning of a send, we also limit the distance ahead
+ * that the traversal threads can be. That distance is controlled by the
+ * zfs_send_no_prefetch_queue_length tunable.
+ *
* Note: Releases dp using the specified tag.
*/
static int
-dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
- zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone,
- boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
- boolean_t rawok, int outfd, uint64_t resumeobj, uint64_t resumeoff,
- vnode_t *vp, offset_t *off)
+dmu_send_impl(struct dmu_send_params *dspp)
{
objset_t *os;
dmu_replay_record_t *drr;
- dmu_sendarg_t *dsp;
+ dmu_sendstatus_t *dssp;
+ dmu_send_cookie_t dsc = {0};
int err;
- uint64_t fromtxg = 0;
+ uint64_t fromtxg = dspp->ancestor_zb.zbm_creation_txg;
uint64_t featureflags = 0;
- struct send_thread_arg to_arg;
- void *payload = NULL;
- size_t payload_len = 0;
- struct send_block_record *to_data;
+ struct redact_list_thread_arg *from_arg;
+ struct send_thread_arg *to_arg;
+ struct redact_list_thread_arg *rlt_arg;
+ struct send_merge_thread_arg *smt_arg;
+ struct send_prefetch_thread_arg *spt_arg;
+ struct send_range *range;
+ redaction_list_t *from_rl = NULL;
+ redaction_list_t *redact_rl = NULL;
+ boolean_t resuming = (dspp->resumeobj != 0 || dspp->resumeoff != 0);
+ boolean_t book_resuming = resuming;
+
+ dsl_dataset_t *to_ds = dspp->to_ds;
+ zfs_bookmark_phys_t *ancestor_zb = &dspp->ancestor_zb;
+ dsl_pool_t *dp = dspp->dp;
+ void *tag = dspp->tag;
err = dmu_objset_from_ds(to_ds, &os);
if (err != 0) {
dsl_pool_rele(dp, tag);
return (err);
}
-
/*
* If this is a non-raw send of an encrypted ds, we can ensure that
* the objset_phys_t is authenticated. This is safe because this is
* either a snapshot or we have owned the dataset, ensuring that
* it can't be modified.
*/
- if (!rawok && os->os_encrypted &&
+ if (!dspp->rawok && os->os_encrypted &&
arc_is_unauthenticated(os->os_phys_buf)) {
zbookmark_phys_t zb;
@@ -1042,225 +2337,236 @@ dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
ASSERT0(arc_is_unauthenticated(os->os_phys_buf));
}
- drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
- drr->drr_type = DRR_BEGIN;
- drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
- DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
- DMU_SUBSTREAM);
+ if ((err = setup_featureflags(dspp, os, &featureflags)) != 0) {
+ dsl_pool_rele(dp, tag);
+ return (err);
+ }
- bzero(&to_arg, sizeof (to_arg));
+ from_arg = kmem_zalloc(sizeof (*from_arg), KM_SLEEP);
+ to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
+ rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
+ smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
+ spt_arg = kmem_zalloc(sizeof (*spt_arg), KM_SLEEP);
-#ifdef _KERNEL
- if (dmu_objset_type(os) == DMU_OST_ZFS) {
- uint64_t version;
- if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
- kmem_free(drr, sizeof (dmu_replay_record_t));
+ /*
+ * If we're doing a redacted send, hold the bookmark's redaction list.
+ */
+ if (dspp->redactbook != NULL) {
+ err = dsl_redaction_list_hold_obj(dp,
+ dspp->redactbook->zbm_redaction_obj, FTAG,
+ &redact_rl);
+ if (err != 0) {
dsl_pool_rele(dp, tag);
return (SET_ERROR(EINVAL));
}
- if (version >= ZPL_VERSION_SA) {
- featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
- }
+ dsl_redaction_list_long_hold(dp, redact_rl, FTAG);
}
-#endif
- /* raw sends imply large_block_ok */
- if ((large_block_ok || rawok) &&
- dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_BLOCKS))
- featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
- if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_DNODE))
- featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;
-
- /* encrypted datasets will not have embedded blocks */
- if ((embedok || rawok) && !os->os_encrypted &&
- spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
- featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
+ /*
+ * If we're sending from a redaction bookmark, hold the redaction list
+ * so that we can consider sending the redacted blocks.
+ */
+ if (ancestor_zb->zbm_redaction_obj != 0) {
+ err = dsl_redaction_list_hold_obj(dp,
+ ancestor_zb->zbm_redaction_obj, FTAG, &from_rl);
+ if (err != 0) {
+ if (redact_rl != NULL) {
+ dsl_redaction_list_long_rele(redact_rl, FTAG);
+ dsl_redaction_list_rele(redact_rl, FTAG);
+ }
+ dsl_pool_rele(dp, tag);
+ return (SET_ERROR(EINVAL));
+ }
+ dsl_redaction_list_long_hold(dp, from_rl, FTAG);
}
- /* raw send implies compressok */
- if (compressok || rawok)
- featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
+ dsl_dataset_long_hold(to_ds, FTAG);
- if (rawok && os->os_encrypted)
- featureflags |= DMU_BACKUP_FEATURE_RAW;
+ drr = create_begin_record(dspp, os, featureflags);
+ dssp = setup_send_progress(dspp);
- if ((featureflags &
- (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED |
- DMU_BACKUP_FEATURE_RAW)) != 0 &&
- spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {
- featureflags |= DMU_BACKUP_FEATURE_LZ4;
- }
-
- if (resumeobj != 0 || resumeoff != 0) {
- featureflags |= DMU_BACKUP_FEATURE_RESUMING;
- }
+ dsc.dsc_drr = drr;
+ dsc.dsc_dso = dspp->dso;
+ dsc.dsc_os = os;
+ dsc.dsc_off = dspp->off;
+ dsc.dsc_toguid = dsl_dataset_phys(to_ds)->ds_guid;
+ dsc.dsc_fromtxg = fromtxg;
+ dsc.dsc_pending_op = PENDING_NONE;
+ dsc.dsc_featureflags = featureflags;
+ dsc.dsc_resume_object = dspp->resumeobj;
+ dsc.dsc_resume_offset = dspp->resumeoff;
- DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
- featureflags);
-
- drr->drr_u.drr_begin.drr_creation_time =
- dsl_dataset_phys(to_ds)->ds_creation_time;
- drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
- if (is_clone)
- drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
- drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
- if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
- drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
- if (zfs_send_set_freerecords_bit)
- drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS;
+ dsl_pool_rele(dp, tag);
- drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK;
+ void *payload = NULL;
+ size_t payload_len = 0;
+ nvlist_t *nvl = fnvlist_alloc();
- if (ancestor_zb != NULL) {
- drr->drr_u.drr_begin.drr_fromguid =
- ancestor_zb->zbm_guid;
- fromtxg = ancestor_zb->zbm_creation_txg;
- }
- dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
- if (!to_ds->ds_is_snapshot) {
- (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
- sizeof (drr->drr_u.drr_begin.drr_toname));
+ /*
+ * If we're doing a redacted send, we include the snapshots we're
+ * redacted with respect to so that the target system knows what send
+ * streams can be correctly received on top of this dataset. If we're
+ * instead sending a redacted dataset, we include the snapshots that the
+ * dataset was created with respect to.
+ */
+ if (dspp->redactbook != NULL) {
+ fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS,
+ redact_rl->rl_phys->rlp_snaps,
+ redact_rl->rl_phys->rlp_num_snaps);
+ } else if (dsl_dataset_feature_is_active(to_ds,
+ SPA_FEATURE_REDACTED_DATASETS)) {
+ uint64_t *tods_guids;
+ uint64_t length;
+ VERIFY(dsl_dataset_get_uint64_array_feature(to_ds,
+ SPA_FEATURE_REDACTED_DATASETS, &length, &tods_guids));
+ fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS, tods_guids,
+ length);
}
- dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
-
- dsp->dsa_drr = drr;
- dsp->dsa_vp = vp;
- dsp->dsa_outfd = outfd;
- dsp->dsa_proc = curproc;
- dsp->dsa_os = os;
- dsp->dsa_off = off;
- dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
- dsp->dsa_fromtxg = fromtxg;
- dsp->dsa_pending_op = PENDING_NONE;
- dsp->dsa_featureflags = featureflags;
- dsp->dsa_resume_object = resumeobj;
- dsp->dsa_resume_offset = resumeoff;
+ /*
+ * If we're sending from a redaction bookmark, then we should retrieve
+ * the guids of that bookmark so we can send them over the wire.
+ */
+ if (from_rl != NULL) {
+ fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS,
+ from_rl->rl_phys->rlp_snaps,
+ from_rl->rl_phys->rlp_num_snaps);
+ }
- mutex_enter(&to_ds->ds_sendstream_lock);
- list_insert_head(&to_ds->ds_sendstreams, dsp);
- mutex_exit(&to_ds->ds_sendstream_lock);
+ /*
+ * If the snapshot we're sending from is redacted, include the redaction
+ * list in the stream.
+ */
+ if (dspp->numfromredactsnaps != NUM_SNAPS_NOT_REDACTED) {
+ ASSERT3P(from_rl, ==, NULL);
+ fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS,
+ dspp->fromredactsnaps, (uint_t)dspp->numfromredactsnaps);
+ if (dspp->numfromredactsnaps > 0) {
+ kmem_free(dspp->fromredactsnaps,
+ dspp->numfromredactsnaps * sizeof (uint64_t));
+ dspp->fromredactsnaps = NULL;
+ }
+ }
- dsl_dataset_long_hold(to_ds, FTAG);
- dsl_pool_rele(dp, tag);
+ if (resuming || book_resuming) {
+ err = setup_resume_points(dspp, to_arg, from_arg,
+ rlt_arg, smt_arg, resuming, os, redact_rl, nvl);
+ if (err != 0)
+ goto out;
+ }
- /* handle features that require a DRR_BEGIN payload */
- if (featureflags &
- (DMU_BACKUP_FEATURE_RESUMING | DMU_BACKUP_FEATURE_RAW)) {
+ if (featureflags & DMU_BACKUP_FEATURE_RAW) {
+ uint64_t ivset_guid = (ancestor_zb != NULL) ?
+ ancestor_zb->zbm_ivset_guid : 0;
nvlist_t *keynvl = NULL;
- nvlist_t *nvl = fnvlist_alloc();
-
- if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
- dmu_object_info_t to_doi;
- err = dmu_object_info(os, resumeobj, &to_doi);
- if (err != 0) {
- fnvlist_free(nvl);
- goto out;
- }
-
- SET_BOOKMARK(&to_arg.resume, to_ds->ds_object,
- resumeobj, 0,
- resumeoff / to_doi.doi_data_block_size);
+ ASSERT(os->os_encrypted);
- fnvlist_add_uint64(nvl, "resume_object", resumeobj);
- fnvlist_add_uint64(nvl, "resume_offset", resumeoff);
+ err = dsl_crypto_populate_key_nvlist(to_ds, ivset_guid,
+ &keynvl);
+ if (err != 0) {
+ fnvlist_free(nvl);
+ goto out;
}
- if (featureflags & DMU_BACKUP_FEATURE_RAW) {
- uint64_t ivset_guid = (ancestor_zb != NULL) ?
- ancestor_zb->zbm_ivset_guid : 0;
-
- ASSERT(os->os_encrypted);
-
- err = dsl_crypto_populate_key_nvlist(to_ds,
- ivset_guid, &keynvl);
- if (err != 0) {
- fnvlist_free(nvl);
- goto out;
- }
-
- fnvlist_add_nvlist(nvl, "crypt_keydata", keynvl);
- }
+ fnvlist_add_nvlist(nvl, "crypt_keydata", keynvl);
+ fnvlist_free(keynvl);
+ }
+ if (!nvlist_empty(nvl)) {
payload = fnvlist_pack(nvl, &payload_len);
drr->drr_payloadlen = payload_len;
- fnvlist_free(keynvl);
- fnvlist_free(nvl);
}
- err = dump_record(dsp, payload, payload_len);
+ fnvlist_free(nvl);
+ err = dump_record(&dsc, payload, payload_len);
fnvlist_pack_free(payload, payload_len);
if (err != 0) {
- err = dsp->dsa_err;
+ err = dsc.dsc_err;
goto out;
}
- err = bqueue_init(&to_arg.q,
- MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
- offsetof(struct send_block_record, ln));
- to_arg.error_code = 0;
- to_arg.cancel = B_FALSE;
- to_arg.ds = to_ds;
- to_arg.fromtxg = fromtxg;
- to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
- if (rawok)
- to_arg.flags |= TRAVERSE_NO_DECRYPT;
- (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
- TS_RUN, minclsyspri);
-
- to_data = bqueue_dequeue(&to_arg.q);
+ setup_to_thread(to_arg, to_ds, dssp, fromtxg, dspp->rawok);
+ setup_from_thread(from_arg, from_rl, dssp);
+ setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp);
+ setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os);
+ setup_prefetch_thread(spt_arg, dspp, smt_arg);
- while (!to_data->eos_marker && err == 0) {
- err = do_dump(dsp, to_data);
- to_data = get_next_record(&to_arg.q, to_data);
+ range = bqueue_dequeue(&spt_arg->q);
+ while (err == 0 && !range->eos_marker) {
+ err = do_dump(&dsc, range);
+ range = get_next_range(&spt_arg->q, range);
if (issig(JUSTLOOKING) && issig(FORREAL))
err = EINTR;
}
+ /*
+ * If we hit an error or are interrupted, cancel our worker threads and
+ * clear the queue of any pending records. The threads will pass the
+ * cancel up the tree of worker threads, and each one will clean up any
+ * pending records before exiting.
+ */
if (err != 0) {
- to_arg.cancel = B_TRUE;
- while (!to_data->eos_marker) {
- to_data = get_next_record(&to_arg.q, to_data);
+ spt_arg->cancel = B_TRUE;
+ while (!range->eos_marker) {
+ range = get_next_range(&spt_arg->q, range);
}
}
- kmem_free(to_data, sizeof (*to_data));
+ range_free(range);
- bqueue_destroy(&to_arg.q);
+ bqueue_destroy(&spt_arg->q);
+ bqueue_destroy(&smt_arg->q);
+ if (dspp->redactbook != NULL)
+ bqueue_destroy(&rlt_arg->q);
+ bqueue_destroy(&to_arg->q);
+ bqueue_destroy(&from_arg->q);
- if (err == 0 && to_arg.error_code != 0)
- err = to_arg.error_code;
+ if (err == 0 && spt_arg->error != 0)
+ err = spt_arg->error;
if (err != 0)
goto out;
- if (dsp->dsa_pending_op != PENDING_NONE)
- if (dump_record(dsp, NULL, 0) != 0)
+ if (dsc.dsc_pending_op != PENDING_NONE)
+ if (dump_record(&dsc, NULL, 0) != 0)
err = SET_ERROR(EINTR);
if (err != 0) {
- if (err == EINTR && dsp->dsa_err != 0)
- err = dsp->dsa_err;
+ if (err == EINTR && dsc.dsc_err != 0)
+ err = dsc.dsc_err;
goto out;
}
bzero(drr, sizeof (dmu_replay_record_t));
drr->drr_type = DRR_END;
- drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
- drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
+ drr->drr_u.drr_end.drr_checksum = dsc.dsc_zc;
+ drr->drr_u.drr_end.drr_toguid = dsc.dsc_toguid;
- if (dump_record(dsp, NULL, 0) != 0)
- err = dsp->dsa_err;
+ if (dump_record(&dsc, NULL, 0) != 0)
+ err = dsc.dsc_err;
out:
mutex_enter(&to_ds->ds_sendstream_lock);
- list_remove(&to_ds->ds_sendstreams, dsp);
+ list_remove(&to_ds->ds_sendstreams, dssp);
mutex_exit(&to_ds->ds_sendstream_lock);
- VERIFY(err != 0 || (dsp->dsa_sent_begin && dsp->dsa_sent_end));
+ VERIFY(err != 0 || (dsc.dsc_sent_begin && dsc.dsc_sent_end));
kmem_free(drr, sizeof (dmu_replay_record_t));
- kmem_free(dsp, sizeof (dmu_sendarg_t));
+ kmem_free(dssp, sizeof (dmu_sendstatus_t));
+ kmem_free(from_arg, sizeof (*from_arg));
+ kmem_free(to_arg, sizeof (*to_arg));
+ kmem_free(rlt_arg, sizeof (*rlt_arg));
+ kmem_free(smt_arg, sizeof (*smt_arg));
+ kmem_free(spt_arg, sizeof (*spt_arg));
dsl_dataset_long_rele(to_ds, FTAG);
+ if (from_rl != NULL) {
+ dsl_redaction_list_long_rele(from_rl, FTAG);
+ dsl_redaction_list_rele(from_rl, FTAG);
+ }
+ if (redact_rl != NULL) {
+ dsl_redaction_list_long_rele(redact_rl, FTAG);
+ dsl_redaction_list_rele(redact_rl, FTAG);
+ }
return (err);
}
@@ -1268,104 +2574,162 @@ out:
int
dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
- boolean_t rawok, int outfd, vnode_t *vp, offset_t *off)
+ boolean_t rawok, int outfd, offset_t *off, dmu_send_outparams_t *dsop)
{
- dsl_pool_t *dp;
- dsl_dataset_t *ds;
- dsl_dataset_t *fromds = NULL;
- ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
int err;
-
- err = dsl_pool_hold(pool, FTAG, &dp);
+ dsl_dataset_t *fromds;
+ ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
+ struct dmu_send_params dspp = {0};
+ dspp.embedok = embedok;
+ dspp.large_block_ok = large_block_ok;
+ dspp.compressok = compressok;
+ dspp.outfd = outfd;
+ dspp.off = off;
+ dspp.dso = dsop;
+ dspp.tag = FTAG;
+ dspp.rawok = rawok;
+
+ err = dsl_pool_hold(pool, FTAG, &dspp.dp);
if (err != 0)
return (err);
- err = dsl_dataset_hold_obj_flags(dp, tosnap, dsflags, FTAG, &ds);
+ err = dsl_dataset_hold_obj_flags(dspp.dp, tosnap, dsflags, FTAG,
+ &dspp.to_ds);
if (err != 0) {
- dsl_pool_rele(dp, FTAG);
+ dsl_pool_rele(dspp.dp, FTAG);
return (err);
}
if (fromsnap != 0) {
- zfs_bookmark_phys_t zb = { 0 };
- boolean_t is_clone;
-
- err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
+ err = dsl_dataset_hold_obj_flags(dspp.dp, fromsnap, dsflags,
+ FTAG, &fromds);
if (err != 0) {
- dsl_dataset_rele_flags(ds, dsflags, FTAG);
- dsl_pool_rele(dp, FTAG);
+ dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
+ dsl_pool_rele(dspp.dp, FTAG);
return (err);
}
- if (!dsl_dataset_is_before(ds, fromds, 0)) {
- err = SET_ERROR(EXDEV);
- dsl_dataset_rele(fromds, FTAG);
- dsl_dataset_rele_flags(ds, dsflags, FTAG);
- dsl_pool_rele(dp, FTAG);
- return (err);
- }
-
- zb.zbm_creation_time =
+ dspp.ancestor_zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
+ dspp.ancestor_zb.zbm_creation_txg =
+ dsl_dataset_phys(fromds)->ds_creation_txg;
+ dspp.ancestor_zb.zbm_creation_time =
dsl_dataset_phys(fromds)->ds_creation_time;
- zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
- zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
if (dsl_dataset_is_zapified(fromds)) {
- (void) zap_lookup(dp->dp_meta_objset,
+ (void) zap_lookup(dspp.dp->dp_meta_objset,
fromds->ds_object, DS_FIELD_IVSET_GUID, 8, 1,
- &zb.zbm_ivset_guid);
+ &dspp.ancestor_zb.zbm_ivset_guid);
}
- is_clone = (fromds->ds_dir != ds->ds_dir);
- dsl_dataset_rele(fromds, FTAG);
- err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
- embedok, large_block_ok, compressok, rawok, outfd,
- 0, 0, vp, off);
+ /* See dmu_send for the reasons behind this. */
+ uint64_t *fromredact;
+
+ if (!dsl_dataset_get_uint64_array_feature(fromds,
+ SPA_FEATURE_REDACTED_DATASETS,
+ &dspp.numfromredactsnaps,
+ &fromredact)) {
+ dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
+ } else if (dspp.numfromredactsnaps > 0) {
+ uint64_t size = dspp.numfromredactsnaps *
+ sizeof (uint64_t);
+ dspp.fromredactsnaps = kmem_zalloc(size, KM_SLEEP);
+ bcopy(fromredact, dspp.fromredactsnaps, size);
+ }
+
+ if (!dsl_dataset_is_before(dspp.to_ds, fromds, 0)) {
+ err = SET_ERROR(EXDEV);
+ } else {
+ dspp.is_clone = (dspp.to_ds->ds_dir !=
+ fromds->ds_dir);
+ dsl_dataset_rele(fromds, FTAG);
+ err = dmu_send_impl(&dspp);
+ }
} else {
- err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
- embedok, large_block_ok, compressok, rawok, outfd,
- 0, 0, vp, off);
+ dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
+ err = dmu_send_impl(&dspp);
}
- dsl_dataset_rele_flags(ds, dsflags, FTAG);
+ dsl_dataset_rele(dspp.to_ds, FTAG);
return (err);
}
int
dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
boolean_t large_block_ok, boolean_t compressok, boolean_t rawok,
- int outfd, uint64_t resumeobj, uint64_t resumeoff, vnode_t *vp,
- offset_t *off)
+ uint64_t resumeobj, uint64_t resumeoff, const char *redactbook, int outfd,
+ offset_t *off, dmu_send_outparams_t *dsop)
{
- dsl_pool_t *dp;
- dsl_dataset_t *ds;
- int err;
+ int err = 0;
ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
boolean_t owned = B_FALSE;
+ dsl_dataset_t *fromds = NULL;
+ zfs_bookmark_phys_t book = {0};
+ struct dmu_send_params dspp = {0};
+ dspp.tosnap = tosnap;
+ dspp.embedok = embedok;
+ dspp.large_block_ok = large_block_ok;
+ dspp.compressok = compressok;
+ dspp.outfd = outfd;
+ dspp.off = off;
+ dspp.dso = dsop;
+ dspp.tag = FTAG;
+ dspp.resumeobj = resumeobj;
+ dspp.resumeoff = resumeoff;
+ dspp.rawok = rawok;
if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
return (SET_ERROR(EINVAL));
- err = dsl_pool_hold(tosnap, FTAG, &dp);
+ err = dsl_pool_hold(tosnap, FTAG, &dspp.dp);
if (err != 0)
return (err);
- if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
+ if (strchr(tosnap, '@') == NULL && spa_writeable(dspp.dp->dp_spa)) {
/*
* We are sending a filesystem or volume. Ensure
* that it doesn't change by owning the dataset.
*/
- err = dsl_dataset_own(dp, tosnap, dsflags, FTAG, &ds);
+ err = dsl_dataset_own(dspp.dp, tosnap, dsflags, FTAG,
+ &dspp.to_ds);
owned = B_TRUE;
} else {
- err = dsl_dataset_hold_flags(dp, tosnap, dsflags, FTAG, &ds);
+ err = dsl_dataset_hold_flags(dspp.dp, tosnap, dsflags, FTAG,
+ &dspp.to_ds);
}
+
if (err != 0) {
- dsl_pool_rele(dp, FTAG);
+ dsl_pool_rele(dspp.dp, FTAG);
+ return (err);
+ }
+
+ if (redactbook != NULL) {
+ char path[ZFS_MAX_DATASET_NAME_LEN];
+ (void) strlcpy(path, tosnap, sizeof (path));
+ char *at = strchr(path, '@');
+ if (at == NULL) {
+ err = EINVAL;
+ } else {
+ (void) snprintf(at, sizeof (path) - (at - path), "#%s",
+ redactbook);
+ err = dsl_bookmark_lookup(dspp.dp, path,
+ NULL, &book);
+ dspp.redactbook = &book;
+ }
+ }
+
+ if (err != 0) {
+ dsl_pool_rele(dspp.dp, FTAG);
+ if (owned)
+ dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
+ else
+ dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
return (err);
}
if (fromsnap != NULL) {
- zfs_bookmark_phys_t zb = { 0 };
- boolean_t is_clone = B_FALSE;
- int fsnamelen = strchr(tosnap, '@') - tosnap;
+ zfs_bookmark_phys_t *zb = &dspp.ancestor_zb;
+ int fsnamelen;
+ if (strpbrk(tosnap, "@#") != NULL)
+ fsnamelen = strpbrk(tosnap, "@#") - tosnap;
+ else
+ fsnamelen = strlen(tosnap);
/*
* If the fromsnap is in a different filesystem, then
@@ -1374,55 +2738,88 @@ dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
(fromsnap[fsnamelen] != '@' &&
fromsnap[fsnamelen] != '#')) {
- is_clone = B_TRUE;
+ dspp.is_clone = B_TRUE;
}
- if (strchr(fromsnap, '@')) {
- dsl_dataset_t *fromds;
- err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
- if (err == 0) {
- if (!dsl_dataset_is_before(ds, fromds, 0))
+ if (strchr(fromsnap, '@') != NULL) {
+ err = dsl_dataset_hold(dspp.dp, fromsnap, FTAG,
+ &fromds);
+
+ if (err != 0) {
+ ASSERT3P(fromds, ==, NULL);
+ } else {
+ /*
+ * We need to make a deep copy of the redact
+ * snapshots of the from snapshot, because the
+ * array will be freed when we evict from_ds.
+ */
+ uint64_t *fromredact;
+ if (!dsl_dataset_get_uint64_array_feature(
+ fromds, SPA_FEATURE_REDACTED_DATASETS,
+ &dspp.numfromredactsnaps,
+ &fromredact)) {
+ dspp.numfromredactsnaps =
+ NUM_SNAPS_NOT_REDACTED;
+ } else if (dspp.numfromredactsnaps > 0) {
+ uint64_t size =
+ dspp.numfromredactsnaps *
+ sizeof (uint64_t);
+ dspp.fromredactsnaps = kmem_zalloc(size,
+ KM_SLEEP);
+ bcopy(fromredact, dspp.fromredactsnaps,
+ size);
+ }
+ if (!dsl_dataset_is_before(dspp.to_ds, fromds,
+ 0)) {
err = SET_ERROR(EXDEV);
- zb.zbm_creation_time =
- dsl_dataset_phys(fromds)->ds_creation_time;
- zb.zbm_creation_txg =
- dsl_dataset_phys(fromds)->ds_creation_txg;
- zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
- is_clone = (ds->ds_dir != fromds->ds_dir);
-
- if (dsl_dataset_is_zapified(fromds)) {
- (void) zap_lookup(dp->dp_meta_objset,
- fromds->ds_object,
- DS_FIELD_IVSET_GUID, 8, 1,
- &zb.zbm_ivset_guid);
+ } else {
+ ASSERT3U(dspp.is_clone, ==,
+ (dspp.to_ds->ds_dir !=
+ fromds->ds_dir));
+ zb->zbm_creation_txg =
+ dsl_dataset_phys(fromds)->
+ ds_creation_txg;
+ zb->zbm_creation_time =
+ dsl_dataset_phys(fromds)->
+ ds_creation_time;
+ zb->zbm_guid =
+ dsl_dataset_phys(fromds)->ds_guid;
+ zb->zbm_redaction_obj = 0;
+
+ if (dsl_dataset_is_zapified(fromds)) {
+ (void) zap_lookup(
+ dspp.dp->dp_meta_objset,
+ fromds->ds_object,
+ DS_FIELD_IVSET_GUID, 8, 1,
+ &zb->zbm_ivset_guid);
+ }
}
dsl_dataset_rele(fromds, FTAG);
}
} else {
- err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
+ dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
+ err = dsl_bookmark_lookup(dspp.dp, fromsnap, dspp.to_ds,
+ zb);
+ if (err == EXDEV && zb->zbm_redaction_obj != 0 &&
+ zb->zbm_guid ==
+ dsl_dataset_phys(dspp.to_ds)->ds_guid)
+ err = 0;
}
- if (err != 0) {
- if (owned)
- dsl_dataset_disown(ds, dsflags, FTAG);
- else
- dsl_dataset_rele_flags(ds, dsflags, FTAG);
- dsl_pool_rele(dp, FTAG);
- return (err);
+ if (err == 0) {
+ /* dmu_send_impl will call dsl_pool_rele for us. */
+ err = dmu_send_impl(&dspp);
+ } else {
+ dsl_pool_rele(dspp.dp, FTAG);
}
- err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
- embedok, large_block_ok, compressok, rawok,
- outfd, resumeobj, resumeoff, vp, off);
} else {
- err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
- embedok, large_block_ok, compressok, rawok,
- outfd, resumeobj, resumeoff, vp, off);
+ dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
+ err = dmu_send_impl(&dspp);
}
if (owned)
- dsl_dataset_disown(ds, dsflags, FTAG);
+ dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
else
- dsl_dataset_rele_flags(ds, dsflags, FTAG);
-
+ dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
return (err);
}
@@ -1483,39 +2880,40 @@ dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
}
int
-dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds,
- boolean_t stream_compressed, uint64_t *sizep)
+dmu_send_estimate_fast(dsl_dataset_t *ds, dsl_dataset_t *fromds,
+ zfs_bookmark_phys_t *frombook, boolean_t stream_compressed, uint64_t *sizep)
{
int err;
uint64_t uncomp, comp;
ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
+ ASSERT(fromds == NULL || frombook == NULL);
/* tosnap must be a snapshot */
if (!ds->ds_is_snapshot)
return (SET_ERROR(EINVAL));
- /* fromsnap, if provided, must be a snapshot */
- if (fromds != NULL && !fromds->ds_is_snapshot)
- return (SET_ERROR(EINVAL));
+ if (fromds != NULL) {
+ uint64_t used;
+ if (!fromds->ds_is_snapshot)
+ return (SET_ERROR(EINVAL));
- /*
- * fromsnap must be an earlier snapshot from the same fs as tosnap,
- * or the origin's fs.
- */
- if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
- return (SET_ERROR(EXDEV));
+ if (!dsl_dataset_is_before(ds, fromds, 0))
+ return (SET_ERROR(EXDEV));
- /* Get compressed and uncompressed size estimates of changed data. */
- if (fromds == NULL) {
- uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
- comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
- } else {
+ err = dsl_dataset_space_written(fromds, ds, &used, &comp,
+ &uncomp);
+ if (err != 0)
+ return (err);
+ } else if (frombook != NULL) {
uint64_t used;
- err = dsl_dataset_space_written(fromds, ds,
- &used, &comp, &uncomp);
+ err = dsl_dataset_space_written_bookmark(frombook, ds, &used,
+ &comp, &uncomp);
if (err != 0)
return (err);
+ } else {
+ uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
+ comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
}
err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp,
@@ -1527,74 +2925,7 @@ dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds,
return (err);
}
-struct calculate_send_arg {
- uint64_t uncompressed;
- uint64_t compressed;
-};
-
-/*
- * Simple callback used to traverse the blocks of a snapshot and sum their
- * uncompressed and compressed sizes.
- */
-/* ARGSUSED */
-static int
-dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
- const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
-{
- struct calculate_send_arg *space = arg;
- if (bp != NULL && !BP_IS_HOLE(bp)) {
- space->uncompressed += BP_GET_UCSIZE(bp);
- space->compressed += BP_GET_PSIZE(bp);
- }
- return (0);
-}
-
-/*
- * Given a desination snapshot and a TXG, calculate the approximate size of a
- * send stream sent from that TXG. from_txg may be zero, indicating that the
- * whole snapshot will be sent.
- */
-int
-dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
- boolean_t stream_compressed, uint64_t *sizep)
-{
- int err;
- struct calculate_send_arg size = { 0 };
-
- ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
-
- /* tosnap must be a snapshot */
- if (!dsl_dataset_is_snapshot(ds))
- return (SET_ERROR(EINVAL));
-
- /* verify that from_txg is before the provided snapshot was taken */
- if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
- return (SET_ERROR(EXDEV));
- }
- /*
- * traverse the blocks of the snapshot with birth times after
- * from_txg, summing their uncompressed size
- */
- err = traverse_dataset(ds, from_txg,
- TRAVERSE_POST | TRAVERSE_NO_DECRYPT,
- dmu_calculate_send_traversal, &size);
-
- if (err)
- return (err);
-
- err = dmu_adjust_send_estimate_for_indirects(ds, size.uncompressed,
- size.compressed, stream_compressed, sizep);
- return (err);
-}
-
-
#if defined(_KERNEL)
-/* BEGIN CSTYLED */
-module_param(zfs_override_estimate_recordsize, ulong, 0644);
-MODULE_PARM_DESC(zfs_override_estimate_recordsize,
- "Record size calculation override for zfs send estimates");
-/* END CSTYLED */
-
module_param(zfs_send_corrupt_data, int, 0644);
MODULE_PARM_DESC(zfs_send_corrupt_data, "Allow sending corrupt data");
@@ -1604,4 +2935,19 @@ MODULE_PARM_DESC(zfs_send_queue_length, "Maximum send queue length");
module_param(zfs_send_unmodified_spill_blocks, int, 0644);
MODULE_PARM_DESC(zfs_send_unmodified_spill_blocks,
"Send unmodified spill blocks");
+
+module_param(zfs_send_no_prefetch_queue_length, int, 0644);
+MODULE_PARM_DESC(zfs_send_no_prefetch_queue_length,
+ "Maximum send queue length for non-prefetch queues");
+
+module_param(zfs_send_queue_ff, int, 0644);
+MODULE_PARM_DESC(zfs_send_queue_ff, "Send queue fill fraction");
+
+module_param(zfs_send_no_prefetch_queue_ff, int, 0644);
+MODULE_PARM_DESC(zfs_send_no_prefetch_queue_ff,
+ "Send queue fill fraction for non-prefetch queues");
+
+module_param(zfs_override_estimate_recordsize, int, 0644);
+MODULE_PARM_DESC(zfs_override_estimate_recordsize,
+ "Override block size estimate with fixed size");
#endif