aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthew Ahrens <[email protected]>2020-03-10 10:51:04 -0700
committerGitHub <[email protected]>2020-03-10 10:51:04 -0700
commit1dc32a67e93bbc8d650943f1a460abb9ff6c5083 (patch)
treeba8ab8eb3c5a7beeaabb62c85456c2e0f38d2ee0
parent9be70c37844c5d832ffef6f0ed2651a332b35dbd (diff)
Improve zfs send performance by bypassing the ARC
When doing a zfs send on a dataset with small recordsize (e.g. 8K), performance is dominated by the per-block overheads. This is especially true with `zfs send --compressed`, which further reduces the amount of data sent, for the same number of blocks. Several threads are involved, but the limiting factor is the `send_prefetch` thread, which is 100% on CPU. The main job of the `send_prefetch` thread is to issue zio's for the data that will be needed by the main thread. It does this by calling `arc_read(ARC_FLAG_PREFETCH)`. This has an immediate cost of creating an arc_hdr, which takes around 14% of one CPU. It also induces later costs by other threads: * Since the data was only prefetched, dmu_send()->dmu_dump_write() will need to call arc_read() again to get the data. This will have to look up the arc_hdr in the hash table and copy the data from the scatter ABD in the arc_hdr to a linear ABD in arc_buf. This takes 27% of one CPU. * dmu_dump_write() needs to arc_buf_destroy() This takes 11% of one CPU. * arc_adjust() will need to evict this arc_hdr, taking about 50% of one CPU. All of these costs can be avoided by bypassing the ARC if the data is not already cached. This commit changes `zfs send` to check for the data in the ARC, and if it is not found then we directly call `zio_read()`, reading the data into a linear ABD which is used by dmu_dump_write() directly. The performance improvement is best expressed in terms of how many blocks can be processed by `zfs send` in one second. This change increases the metric by 50%, from ~100,000 to ~150,000. When the amount of data per block is small (e.g. 2KB), there is a corresponding reduction in the elapsed time of `zfs send >/dev/null` (from 86 minutes to 58 minutes in this test case). In addition to improving the performance of `zfs send`, this change makes `zfs send` not pollute the ARC cache. In most cases the data will not be reused, so this allows us to keep caching useful data in the MRU (hit-once) part of the ARC. Reviewed-by: Paul Dagnelie <[email protected]> Reviewed-by: Serapheim Dimitropoulos <[email protected]> Reviewed-by: Brian Behlendorf <[email protected]> Signed-off-by: Matthew Ahrens <[email protected]> Closes #10067
-rw-r--r--include/sys/arc.h6
-rw-r--r--include/sys/arc_impl.h1
-rw-r--r--module/zfs/arc.c19
-rw-r--r--module/zfs/dmu_send.c360
4 files changed, 235 insertions, 151 deletions
diff --git a/include/sys/arc.h b/include/sys/arc.h
index f6dea3fbd..6f56f732a 100644
--- a/include/sys/arc.h
+++ b/include/sys/arc.h
@@ -147,6 +147,12 @@ typedef enum arc_flags
ARC_FLAG_SHARED_DATA = 1 << 21,
/*
+ * Fail this arc_read() (with ENOENT) if the data is not already present
+ * in cache.
+ */
+ ARC_FLAG_CACHED_ONLY = 1 << 22,
+
+ /*
* The arc buffer's compression mode is stored in the top 7 bits of the
* flags field, so these dummy flags are included so that MDB can
* interpret the enum properly.
diff --git a/include/sys/arc_impl.h b/include/sys/arc_impl.h
index 8eeee54c9..c55640f8b 100644
--- a/include/sys/arc_impl.h
+++ b/include/sys/arc_impl.h
@@ -554,6 +554,7 @@ typedef struct arc_stats {
kstat_named_t arcstat_need_free;
kstat_named_t arcstat_sys_free;
kstat_named_t arcstat_raw_size;
+ kstat_named_t arcstat_cached_only_in_progress;
} arc_stats_t;
typedef enum free_memory_reason_t {
diff --git a/module/zfs/arc.c b/module/zfs/arc.c
index 3df53d2db..d49d85db0 100644
--- a/module/zfs/arc.c
+++ b/module/zfs/arc.c
@@ -548,7 +548,8 @@ arc_stats_t arc_stats = {
{ "demand_hit_prescient_prefetch", KSTAT_DATA_UINT64 },
{ "arc_need_free", KSTAT_DATA_UINT64 },
{ "arc_sys_free", KSTAT_DATA_UINT64 },
- { "arc_raw_size", KSTAT_DATA_UINT64 }
+ { "arc_raw_size", KSTAT_DATA_UINT64 },
+ { "cached_only_in_progress", KSTAT_DATA_UINT64 },
};
#define ARCSTAT_MAX(stat, val) { \
@@ -5563,6 +5564,13 @@ top:
if (HDR_IO_IN_PROGRESS(hdr)) {
zio_t *head_zio = hdr->b_l1hdr.b_acb->acb_zio_head;
+ if (*arc_flags & ARC_FLAG_CACHED_ONLY) {
+ mutex_exit(hash_lock);
+ ARCSTAT_BUMP(arcstat_cached_only_in_progress);
+ rc = SET_ERROR(ENOENT);
+ goto out;
+ }
+
ASSERT3P(head_zio, !=, NULL);
if ((hdr->b_flags & ARC_FLAG_PRIO_ASYNC_READ) &&
priority == ZIO_PRIORITY_SYNC_READ) {
@@ -5698,12 +5706,21 @@ top:
uint64_t size;
abd_t *hdr_abd;
+ if (*arc_flags & ARC_FLAG_CACHED_ONLY) {
+ rc = SET_ERROR(ENOENT);
+ if (hash_lock != NULL)
+ mutex_exit(hash_lock);
+ goto out;
+ }
+
/*
* Gracefully handle a damaged logical block size as a
* checksum error.
*/
if (lsize > spa_maxblocksize(spa)) {
rc = SET_ERROR(ECKSUM);
+ if (hash_lock != NULL)
+ mutex_exit(hash_lock);
goto out;
}
diff --git a/module/zfs/dmu_send.c b/module/zfs/dmu_send.c
index f3630a0cf..2c7dca23e 100644
--- a/module/zfs/dmu_send.c
+++ b/module/zfs/dmu_send.c
@@ -156,8 +156,15 @@ struct send_range {
union {
struct srd {
dmu_object_type_t obj_type;
- uint32_t datablksz;
+ uint32_t datablksz; // logical size
+ uint32_t datasz; // payload size
blkptr_t bp;
+ arc_buf_t *abuf;
+ abd_t *abd;
+ kmutex_t lock;
+ kcondvar_t cv;
+ boolean_t io_outstanding;
+ int io_err;
} data;
struct srh {
uint32_t datablksz;
@@ -222,6 +229,20 @@ range_free(struct send_range *range)
size_t size = sizeof (dnode_phys_t) *
(range->sru.object.dnp->dn_extra_slots + 1);
kmem_free(range->sru.object.dnp, size);
+ } else if (range->type == DATA) {
+ mutex_enter(&range->sru.data.lock);
+ while (range->sru.data.io_outstanding)
+ cv_wait(&range->sru.data.cv, &range->sru.data.lock);
+ if (range->sru.data.abd != NULL)
+ abd_free(range->sru.data.abd);
+ if (range->sru.data.abuf != NULL) {
+ arc_buf_destroy(range->sru.data.abuf,
+ &range->sru.data.abuf);
+ }
+ mutex_exit(&range->sru.data.lock);
+
+ cv_destroy(&range->sru.data.cv);
+ mutex_destroy(&range->sru.data.lock);
}
kmem_free(range, sizeof (*range));
}
@@ -830,7 +851,7 @@ dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp,
}
static boolean_t
-send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
+send_do_embed(const blkptr_t *bp, uint64_t featureflags)
{
if (!BP_IS_EMBEDDED(bp))
return (B_FALSE);
@@ -839,7 +860,7 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
* Compression function must be legacy, or explicitly enabled.
*/
if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
- !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LZ4)))
+ !(featureflags & DMU_BACKUP_FEATURE_LZ4)))
return (B_FALSE);
/*
@@ -847,7 +868,7 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
*/
switch (BPE_GET_ETYPE(bp)) {
case BP_EMBEDDED_TYPE_DATA:
- if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
+ if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
return (B_TRUE);
break;
default:
@@ -858,8 +879,8 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
/*
* This function actually handles figuring out what kind of record needs to be
- * dumped, reading the data (which has hopefully been prefetched), and calling
- * the appropriate helper function.
+ * dumped, and calling the appropriate helper function. In most cases,
+ * the data has already been read by send_reader_thread().
*/
static int
do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
@@ -894,7 +915,6 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
blkptr_t *bp = &srdp->bp;
spa_t *spa =
dmu_objset_spa(dscp->dsc_os);
- arc_buf_t *abuf = NULL;
ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
@@ -914,6 +934,7 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
zb.zb_level = 0;
zb.zb_blkid = range->start_blkid;
+ arc_buf_t *abuf = NULL;
if (!dscp->dsc_dso->dso_dryrun && arc_read(NULL, spa,
bp, arc_getbuf_func, &abuf, ZIO_PRIORITY_ASYNC_READ,
zioflags, &aflags, &zb) != 0)
@@ -925,7 +946,7 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
arc_buf_destroy(abuf, &abuf);
return (err);
}
- if (send_do_embed(dscp, bp)) {
+ if (send_do_embed(bp, dscp->dsc_featureflags)) {
err = dump_write_embedded(dscp, range->object,
range->start_blkid * srdp->datablksz,
srdp->datablksz, bp);
@@ -936,70 +957,24 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
range->start_blkid * srdp->datablksz >=
dscp->dsc_resume_offset));
/* it's a level-0 block of a regular object */
- arc_flags_t aflags = ARC_FLAG_WAIT;
- uint64_t offset;
-
- /*
- * If we have large blocks stored on disk but the send flags
- * don't allow us to send large blocks, we split the data from
- * the arc buf into chunks.
- */
- boolean_t split_large_blocks =
- srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
- !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
-
- /*
- * Raw sends require that we always get raw data as it exists
- * on disk, so we assert that we are not splitting blocks here.
- */
- boolean_t request_raw =
- (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
-
- /*
- * We should only request compressed data from the ARC if all
- * the following are true:
- * - stream compression was requested
- * - we aren't splitting large blocks into smaller chunks
- * - the data won't need to be byteswapped before sending
- * - this isn't an embedded block
- * - this isn't metadata (if receiving on a different endian
- * system it can be byteswapped more easily)
- */
- boolean_t request_compressed =
- (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
- !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
- !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
-
- IMPLY(request_raw, !split_large_blocks);
- IMPLY(request_raw, BP_IS_PROTECTED(bp));
- if (!dscp->dsc_dso->dso_dryrun) {
- enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
-
- ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
-
- if (request_raw)
- zioflags |= ZIO_FLAG_RAW;
- else if (request_compressed)
- zioflags |= ZIO_FLAG_RAW_COMPRESS;
- zbookmark_phys_t zb;
- zb.zb_objset = dmu_objset_id(dscp->dsc_os);
- zb.zb_object = range->object;
- zb.zb_level = 0;
- zb.zb_blkid = range->start_blkid;
- err = arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
- ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, &zb);
- }
+ mutex_enter(&srdp->lock);
+ while (srdp->io_outstanding)
+ cv_wait(&srdp->cv, &srdp->lock);
+ err = srdp->io_err;
+ mutex_exit(&srdp->lock);
if (err != 0) {
if (zfs_send_corrupt_data &&
!dscp->dsc_dso->dso_dryrun) {
- /* Send a block filled with 0x"zfs badd bloc" */
- abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
- srdp->datablksz);
+ /*
+ * Send a block filled with 0x"zfs badd bloc"
+ */
+ srdp->abuf = arc_alloc_buf(spa, &srdp->abuf,
+ ARC_BUFC_DATA, srdp->datablksz);
uint64_t *ptr;
- for (ptr = abuf->b_data;
- (char *)ptr < (char *)abuf->b_data +
+ for (ptr = srdp->abuf->b_data;
+ (char *)ptr < (char *)srdp->abuf->b_data +
srdp->datablksz; ptr++)
*ptr = 0x2f5baddb10cULL;
} else {
@@ -1007,41 +982,47 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
}
}
- offset = range->start_blkid * srdp->datablksz;
+ ASSERT(dscp->dsc_dso->dso_dryrun ||
+ srdp->abuf != NULL || srdp->abd != NULL);
+
+ uint64_t offset = range->start_blkid * srdp->datablksz;
+
+ char *data = NULL;
+ if (srdp->abd != NULL) {
+ data = abd_to_buf(srdp->abd);
+ ASSERT3P(srdp->abuf, ==, NULL);
+ } else if (srdp->abuf != NULL) {
+ data = srdp->abuf->b_data;
+ }
- if (split_large_blocks) {
- ASSERT0(arc_is_encrypted(abuf));
- ASSERT3U(arc_get_compression(abuf), ==,
- ZIO_COMPRESS_OFF);
- char *buf = abuf->b_data;
+ /*
+ * If we have large blocks stored on disk but the send flags
+ * don't allow us to send large blocks, we split the data from
+ * the arc buf into chunks.
+ */
+ if (srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
+ !(dscp->dsc_featureflags &
+ DMU_BACKUP_FEATURE_LARGE_BLOCKS)) {
while (srdp->datablksz > 0 && err == 0) {
int n = MIN(srdp->datablksz,
SPA_OLD_MAXBLOCKSIZE);
err = dmu_dump_write(dscp, srdp->obj_type,
- range->object, offset, n, n, NULL, buf);
+ range->object, offset, n, n, NULL, data);
offset += n;
- buf += n;
+ /*
+ * When doing dry run, data==NULL is used as a
+ * sentinel value by
+ * dmu_dump_write()->dump_record().
+ */
+ if (data != NULL)
+ data += n;
srdp->datablksz -= n;
}
} else {
- int psize;
- if (abuf != NULL) {
- psize = arc_buf_size(abuf);
- if (arc_get_compression(abuf) !=
- ZIO_COMPRESS_OFF) {
- ASSERT3S(psize, ==, BP_GET_PSIZE(bp));
- }
- } else if (!request_compressed) {
- psize = srdp->datablksz;
- } else {
- psize = BP_GET_PSIZE(bp);
- }
err = dmu_dump_write(dscp, srdp->obj_type,
- range->object, offset, srdp->datablksz, psize, bp,
- (abuf == NULL ? NULL : abuf->b_data));
+ range->object, offset,
+ srdp->datablksz, srdp->datasz, bp, data);
}
- if (abuf != NULL)
- arc_buf_destroy(abuf, &abuf);
return (err);
}
case HOLE: {
@@ -1086,6 +1067,14 @@ range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
range->start_blkid = start_blkid;
range->end_blkid = end_blkid;
range->eos_marker = eos;
+ if (type == DATA) {
+ range->sru.data.abd = NULL;
+ range->sru.data.abuf = NULL;
+ mutex_init(&range->sru.data.lock, NULL, MUTEX_DEFAULT, NULL);
+ cv_init(&range->sru.data.cv, NULL, CV_DEFAULT, NULL);
+ range->sru.data.io_outstanding = 0;
+ range->sru.data.io_err = 0;
+ }
return (range);
}
@@ -1596,19 +1585,115 @@ send_merge_thread(void *arg)
thread_exit();
}
-struct send_prefetch_thread_arg {
+struct send_reader_thread_arg {
struct send_merge_thread_arg *smta;
bqueue_t q;
boolean_t cancel;
- boolean_t issue_prefetches;
+ boolean_t issue_reads;
+ uint64_t featureflags;
int error;
};
+static void
+dmu_send_read_done(zio_t *zio)
+{
+ struct send_range *range = zio->io_private;
+
+ mutex_enter(&range->sru.data.lock);
+ if (zio->io_error != 0) {
+ abd_free(range->sru.data.abd);
+ range->sru.data.abd = NULL;
+ range->sru.data.io_err = zio->io_error;
+ }
+
+ ASSERT(range->sru.data.io_outstanding);
+ range->sru.data.io_outstanding = B_FALSE;
+ cv_broadcast(&range->sru.data.cv);
+ mutex_exit(&range->sru.data.lock);
+}
+
+static void
+issue_data_read(struct send_reader_thread_arg *srta, struct send_range *range)
+{
+ struct srd *srdp = &range->sru.data;
+ blkptr_t *bp = &srdp->bp;
+ objset_t *os = srta->smta->os;
+
+ ASSERT3U(range->type, ==, DATA);
+ ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
+ /*
+ * If we have large blocks stored on disk but
+ * the send flags don't allow us to send large
+ * blocks, we split the data from the arc buf
+ * into chunks.
+ */
+ boolean_t split_large_blocks =
+ srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
+ !(srta->featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
+ /*
+ * We should only request compressed data from the ARC if all
+ * the following are true:
+ * - stream compression was requested
+ * - we aren't splitting large blocks into smaller chunks
+ * - the data won't need to be byteswapped before sending
+ * - this isn't an embedded block
+ * - this isn't metadata (if receiving on a different endian
+ * system it can be byteswapped more easily)
+ */
+ boolean_t request_compressed =
+ (srta->featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
+ !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
+ !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
+
+ enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
+
+ if (srta->featureflags & DMU_BACKUP_FEATURE_RAW)
+ zioflags |= ZIO_FLAG_RAW;
+ else if (request_compressed)
+ zioflags |= ZIO_FLAG_RAW_COMPRESS;
+
+ srdp->datasz = (zioflags & ZIO_FLAG_RAW_COMPRESS) ?
+ BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp);
+
+ if (!srta->issue_reads)
+ return;
+ if (BP_IS_REDACTED(bp))
+ return;
+ if (send_do_embed(bp, srta->featureflags))
+ return;
+
+ zbookmark_phys_t zb = {
+ .zb_objset = dmu_objset_id(os),
+ .zb_object = range->object,
+ .zb_level = 0,
+ .zb_blkid = range->start_blkid,
+ };
+
+ arc_flags_t aflags = ARC_FLAG_CACHED_ONLY;
+
+ int arc_err = arc_read(NULL, os->os_spa, bp,
+ arc_getbuf_func, &srdp->abuf, ZIO_PRIORITY_ASYNC_READ,
+ zioflags, &aflags, &zb);
+ /*
+ * If the data is not already cached in the ARC, we read directly
+ * from zio. This avoids the performance overhead of adding a new
+ * entry to the ARC, and we also avoid polluting the ARC cache with
+ * data that is not likely to be used in the future.
+ */
+ if (arc_err != 0) {
+ srdp->abd = abd_alloc_linear(srdp->datasz, B_FALSE);
+ srdp->io_outstanding = B_TRUE;
+ zio_nowait(zio_read(NULL, os->os_spa, bp, srdp->abd,
+ srdp->datasz, dmu_send_read_done, range,
+ ZIO_PRIORITY_ASYNC_READ, zioflags, &zb));
+ }
+}
+
/*
* Create a new record with the given values.
*/
static void
-enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
+enqueue_range(struct send_reader_thread_arg *srta, bqueue_t *q, dnode_t *dn,
uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz)
{
enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE :
@@ -1629,18 +1714,7 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
range->sru.data.datablksz = datablksz;
range->sru.data.obj_type = dn->dn_type;
range->sru.data.bp = *bp;
- if (spta->issue_prefetches) {
- zbookmark_phys_t zb = {0};
- zb.zb_objset = dmu_objset_id(dn->dn_objset);
- zb.zb_object = dn->dn_object;
- zb.zb_level = 0;
- zb.zb_blkid = blkid;
- arc_flags_t aflags = ARC_FLAG_NOWAIT |
- ARC_FLAG_PREFETCH;
- (void) arc_read(NULL, dn->dn_objset->os_spa, bp, NULL,
- NULL, ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
- ZIO_FLAG_SPECULATIVE, &aflags, &zb);
- }
+ issue_data_read(srta, range);
break;
case REDACT:
range->sru.redact.datablksz = datablksz;
@@ -1659,12 +1733,12 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
* it issues prefetches for the data we need to send.
*/
static void
-send_prefetch_thread(void *arg)
+send_reader_thread(void *arg)
{
- struct send_prefetch_thread_arg *spta = arg;
- struct send_merge_thread_arg *smta = spta->smta;
+ struct send_reader_thread_arg *srta = arg;
+ struct send_merge_thread_arg *smta = srta->smta;
bqueue_t *inq = &smta->q;
- bqueue_t *outq = &spta->q;
+ bqueue_t *outq = &srta->q;
objset_t *os = smta->os;
fstrans_cookie_t cookie = spl_fstrans_mark();
struct send_range *range = bqueue_dequeue(inq);
@@ -1680,30 +1754,14 @@ send_prefetch_thread(void *arg)
*/
uint64_t last_obj = UINT64_MAX;
uint64_t last_obj_exists = B_TRUE;
- while (!range->eos_marker && !spta->cancel && smta->error == 0 &&
+ while (!range->eos_marker && !srta->cancel && smta->error == 0 &&
err == 0) {
switch (range->type) {
- case DATA: {
- zbookmark_phys_t zb;
- zb.zb_objset = dmu_objset_id(os);
- zb.zb_object = range->object;
- zb.zb_level = 0;
- zb.zb_blkid = range->start_blkid;
- ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
- if (!BP_IS_REDACTED(&range->sru.data.bp) &&
- spta->issue_prefetches &&
- !BP_IS_EMBEDDED(&range->sru.data.bp)) {
- arc_flags_t aflags = ARC_FLAG_NOWAIT |
- ARC_FLAG_PREFETCH;
- (void) arc_read(NULL, os->os_spa,
- &range->sru.data.bp, NULL, NULL,
- ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
- ZIO_FLAG_SPECULATIVE, &aflags, &zb);
- }
+ case DATA:
+ issue_data_read(srta, range);
bqueue_enqueue(outq, range, range->sru.data.datablksz);
range = get_next_range_nofree(inq, range);
break;
- }
case HOLE:
case OBJECT:
case OBJECT_RANGE:
@@ -1805,7 +1863,7 @@ send_prefetch_thread(void *arg)
datablksz);
uint64_t nblks = (offset / datablksz) -
blkid;
- enqueue_range(spta, outq, dn, blkid,
+ enqueue_range(srta, outq, dn, blkid,
nblks, NULL, datablksz);
blkid += nblks;
}
@@ -1816,7 +1874,7 @@ send_prefetch_thread(void *arg)
if (err != 0)
break;
ASSERT(!BP_IS_HOLE(&bp));
- enqueue_range(spta, outq, dn, blkid, 1, &bp,
+ enqueue_range(srta, outq, dn, blkid, 1, &bp,
datablksz);
}
rw_exit(&dn->dn_struct_rwlock);
@@ -1825,11 +1883,11 @@ send_prefetch_thread(void *arg)
}
}
}
- if (spta->cancel || err != 0) {
+ if (srta->cancel || err != 0) {
smta->cancel = B_TRUE;
- spta->error = err;
+ srta->error = err;
} else if (smta->error != 0) {
- spta->error = smta->error;
+ srta->error = smta->error;
}
while (!range->eos_marker)
range = get_next_range(inq, range);
@@ -2052,15 +2110,17 @@ setup_merge_thread(struct send_merge_thread_arg *smt_arg,
}
static void
-setup_prefetch_thread(struct send_prefetch_thread_arg *spt_arg,
- struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg)
+setup_reader_thread(struct send_reader_thread_arg *srt_arg,
+ struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg,
+ uint64_t featureflags)
{
- VERIFY0(bqueue_init(&spt_arg->q, zfs_send_queue_ff,
+ VERIFY0(bqueue_init(&srt_arg->q, zfs_send_queue_ff,
MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
offsetof(struct send_range, ln)));
- spt_arg->smta = smt_arg;
- spt_arg->issue_prefetches = !dspp->dso->dso_dryrun;
- (void) thread_create(NULL, 0, send_prefetch_thread, spt_arg, 0,
+ srt_arg->smta = smt_arg;
+ srt_arg->issue_reads = !dspp->dso->dso_dryrun;
+ srt_arg->featureflags = featureflags;
+ (void) thread_create(NULL, 0, send_reader_thread, srt_arg, 0,
curproc, TS_RUN, minclsyspri);
}
@@ -2265,7 +2325,7 @@ dmu_send_impl(struct dmu_send_params *dspp)
struct send_thread_arg *to_arg;
struct redact_list_thread_arg *rlt_arg;
struct send_merge_thread_arg *smt_arg;
- struct send_prefetch_thread_arg *spt_arg;
+ struct send_reader_thread_arg *srt_arg;
struct send_range *range;
redaction_list_t *from_rl = NULL;
redaction_list_t *redact_rl = NULL;
@@ -2348,7 +2408,7 @@ dmu_send_impl(struct dmu_send_params *dspp)
to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
- spt_arg = kmem_zalloc(sizeof (*spt_arg), KM_SLEEP);
+ srt_arg = kmem_zalloc(sizeof (*srt_arg), KM_SLEEP);
drr = create_begin_record(dspp, os, featureflags);
dssp = setup_send_progress(dspp);
@@ -2457,12 +2517,12 @@ dmu_send_impl(struct dmu_send_params *dspp)
setup_from_thread(from_arg, from_rl, dssp);
setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp);
setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os);
- setup_prefetch_thread(spt_arg, dspp, smt_arg);
+ setup_reader_thread(srt_arg, dspp, smt_arg, featureflags);
- range = bqueue_dequeue(&spt_arg->q);
+ range = bqueue_dequeue(&srt_arg->q);
while (err == 0 && !range->eos_marker) {
err = do_dump(&dsc, range);
- range = get_next_range(&spt_arg->q, range);
+ range = get_next_range(&srt_arg->q, range);
if (issig(JUSTLOOKING) && issig(FORREAL))
err = SET_ERROR(EINTR);
}
@@ -2474,22 +2534,22 @@ dmu_send_impl(struct dmu_send_params *dspp)
* pending records before exiting.
*/
if (err != 0) {
- spt_arg->cancel = B_TRUE;
+ srt_arg->cancel = B_TRUE;
while (!range->eos_marker) {
- range = get_next_range(&spt_arg->q, range);
+ range = get_next_range(&srt_arg->q, range);
}
}
range_free(range);
- bqueue_destroy(&spt_arg->q);
+ bqueue_destroy(&srt_arg->q);
bqueue_destroy(&smt_arg->q);
if (dspp->redactbook != NULL)
bqueue_destroy(&rlt_arg->q);
bqueue_destroy(&to_arg->q);
bqueue_destroy(&from_arg->q);
- if (err == 0 && spt_arg->error != 0)
- err = spt_arg->error;
+ if (err == 0 && srt_arg->error != 0)
+ err = srt_arg->error;
if (err != 0)
goto out;
@@ -2532,7 +2592,7 @@ out:
kmem_free(to_arg, sizeof (*to_arg));
kmem_free(rlt_arg, sizeof (*rlt_arg));
kmem_free(smt_arg, sizeof (*smt_arg));
- kmem_free(spt_arg, sizeof (*spt_arg));
+ kmem_free(srt_arg, sizeof (*srt_arg));
dsl_dataset_long_rele(to_ds, FTAG);
if (from_rl != NULL) {