diff options
Diffstat (limited to 'module/zfs/dmu_send.c')
-rw-r--r-- | module/zfs/dmu_send.c | 360 |
1 files changed, 210 insertions, 150 deletions
diff --git a/module/zfs/dmu_send.c b/module/zfs/dmu_send.c index f3630a0cf..2c7dca23e 100644 --- a/module/zfs/dmu_send.c +++ b/module/zfs/dmu_send.c @@ -156,8 +156,15 @@ struct send_range { union { struct srd { dmu_object_type_t obj_type; - uint32_t datablksz; + uint32_t datablksz; // logical size + uint32_t datasz; // payload size blkptr_t bp; + arc_buf_t *abuf; + abd_t *abd; + kmutex_t lock; + kcondvar_t cv; + boolean_t io_outstanding; + int io_err; } data; struct srh { uint32_t datablksz; @@ -222,6 +229,20 @@ range_free(struct send_range *range) size_t size = sizeof (dnode_phys_t) * (range->sru.object.dnp->dn_extra_slots + 1); kmem_free(range->sru.object.dnp, size); + } else if (range->type == DATA) { + mutex_enter(&range->sru.data.lock); + while (range->sru.data.io_outstanding) + cv_wait(&range->sru.data.cv, &range->sru.data.lock); + if (range->sru.data.abd != NULL) + abd_free(range->sru.data.abd); + if (range->sru.data.abuf != NULL) { + arc_buf_destroy(range->sru.data.abuf, + &range->sru.data.abuf); + } + mutex_exit(&range->sru.data.lock); + + cv_destroy(&range->sru.data.cv); + mutex_destroy(&range->sru.data.lock); } kmem_free(range, sizeof (*range)); } @@ -830,7 +851,7 @@ dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp, } static boolean_t -send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp) +send_do_embed(const blkptr_t *bp, uint64_t featureflags) { if (!BP_IS_EMBEDDED(bp)) return (B_FALSE); @@ -839,7 +860,7 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp) * Compression function must be legacy, or explicitly enabled. */ if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS && - !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LZ4))) + !(featureflags & DMU_BACKUP_FEATURE_LZ4))) return (B_FALSE); /* @@ -847,7 +868,7 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp) */ switch (BPE_GET_ETYPE(bp)) { case BP_EMBEDDED_TYPE_DATA: - if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) + if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) return (B_TRUE); break; default: @@ -858,8 +879,8 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp) /* * This function actually handles figuring out what kind of record needs to be - * dumped, reading the data (which has hopefully been prefetched), and calling - * the appropriate helper function. + * dumped, and calling the appropriate helper function. In most cases, + * the data has already been read by send_reader_thread(). */ static int do_dump(dmu_send_cookie_t *dscp, struct send_range *range) @@ -894,7 +915,6 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range) blkptr_t *bp = &srdp->bp; spa_t *spa = dmu_objset_spa(dscp->dsc_os); - arc_buf_t *abuf = NULL; ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp)); ASSERT3U(range->start_blkid + 1, ==, range->end_blkid); @@ -914,6 +934,7 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range) zb.zb_level = 0; zb.zb_blkid = range->start_blkid; + arc_buf_t *abuf = NULL; if (!dscp->dsc_dso->dso_dryrun && arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, &zb) != 0) @@ -925,7 +946,7 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range) arc_buf_destroy(abuf, &abuf); return (err); } - if (send_do_embed(dscp, bp)) { + if (send_do_embed(bp, dscp->dsc_featureflags)) { err = dump_write_embedded(dscp, range->object, range->start_blkid * srdp->datablksz, srdp->datablksz, bp); @@ -936,70 +957,24 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range) range->start_blkid * srdp->datablksz >= dscp->dsc_resume_offset)); /* it's a level-0 block of a regular object */ - arc_flags_t aflags = ARC_FLAG_WAIT; - uint64_t offset; - - /* - * If we have large blocks stored on disk but the send flags - * don't allow us to send large blocks, we split the data from - * the arc buf into chunks. - */ - boolean_t split_large_blocks = - srdp->datablksz > SPA_OLD_MAXBLOCKSIZE && - !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS); - - /* - * Raw sends require that we always get raw data as it exists - * on disk, so we assert that we are not splitting blocks here. - */ - boolean_t request_raw = - (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0; - - /* - * We should only request compressed data from the ARC if all - * the following are true: - * - stream compression was requested - * - we aren't splitting large blocks into smaller chunks - * - the data won't need to be byteswapped before sending - * - this isn't an embedded block - * - this isn't metadata (if receiving on a different endian - * system it can be byteswapped more easily) - */ - boolean_t request_compressed = - (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) && - !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) && - !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp)); - - IMPLY(request_raw, !split_large_blocks); - IMPLY(request_raw, BP_IS_PROTECTED(bp)); - if (!dscp->dsc_dso->dso_dryrun) { - enum zio_flag zioflags = ZIO_FLAG_CANFAIL; - - ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp)); - - if (request_raw) - zioflags |= ZIO_FLAG_RAW; - else if (request_compressed) - zioflags |= ZIO_FLAG_RAW_COMPRESS; - zbookmark_phys_t zb; - zb.zb_objset = dmu_objset_id(dscp->dsc_os); - zb.zb_object = range->object; - zb.zb_level = 0; - zb.zb_blkid = range->start_blkid; - err = arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, - ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, &zb); - } + mutex_enter(&srdp->lock); + while (srdp->io_outstanding) + cv_wait(&srdp->cv, &srdp->lock); + err = srdp->io_err; + mutex_exit(&srdp->lock); if (err != 0) { if (zfs_send_corrupt_data && !dscp->dsc_dso->dso_dryrun) { - /* Send a block filled with 0x"zfs badd bloc" */ - abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA, - srdp->datablksz); + /* + * Send a block filled with 0x"zfs badd bloc" + */ + srdp->abuf = arc_alloc_buf(spa, &srdp->abuf, + ARC_BUFC_DATA, srdp->datablksz); uint64_t *ptr; - for (ptr = abuf->b_data; - (char *)ptr < (char *)abuf->b_data + + for (ptr = srdp->abuf->b_data; + (char *)ptr < (char *)srdp->abuf->b_data + srdp->datablksz; ptr++) *ptr = 0x2f5baddb10cULL; } else { @@ -1007,41 +982,47 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range) } } - offset = range->start_blkid * srdp->datablksz; + ASSERT(dscp->dsc_dso->dso_dryrun || + srdp->abuf != NULL || srdp->abd != NULL); + + uint64_t offset = range->start_blkid * srdp->datablksz; + + char *data = NULL; + if (srdp->abd != NULL) { + data = abd_to_buf(srdp->abd); + ASSERT3P(srdp->abuf, ==, NULL); + } else if (srdp->abuf != NULL) { + data = srdp->abuf->b_data; + } - if (split_large_blocks) { - ASSERT0(arc_is_encrypted(abuf)); - ASSERT3U(arc_get_compression(abuf), ==, - ZIO_COMPRESS_OFF); - char *buf = abuf->b_data; + /* + * If we have large blocks stored on disk but the send flags + * don't allow us to send large blocks, we split the data from + * the arc buf into chunks. + */ + if (srdp->datablksz > SPA_OLD_MAXBLOCKSIZE && + !(dscp->dsc_featureflags & + DMU_BACKUP_FEATURE_LARGE_BLOCKS)) { while (srdp->datablksz > 0 && err == 0) { int n = MIN(srdp->datablksz, SPA_OLD_MAXBLOCKSIZE); err = dmu_dump_write(dscp, srdp->obj_type, - range->object, offset, n, n, NULL, buf); + range->object, offset, n, n, NULL, data); offset += n; - buf += n; + /* + * When doing dry run, data==NULL is used as a + * sentinel value by + * dmu_dump_write()->dump_record(). + */ + if (data != NULL) + data += n; srdp->datablksz -= n; } } else { - int psize; - if (abuf != NULL) { - psize = arc_buf_size(abuf); - if (arc_get_compression(abuf) != - ZIO_COMPRESS_OFF) { - ASSERT3S(psize, ==, BP_GET_PSIZE(bp)); - } - } else if (!request_compressed) { - psize = srdp->datablksz; - } else { - psize = BP_GET_PSIZE(bp); - } err = dmu_dump_write(dscp, srdp->obj_type, - range->object, offset, srdp->datablksz, psize, bp, - (abuf == NULL ? NULL : abuf->b_data)); + range->object, offset, + srdp->datablksz, srdp->datasz, bp, data); } - if (abuf != NULL) - arc_buf_destroy(abuf, &abuf); return (err); } case HOLE: { @@ -1086,6 +1067,14 @@ range_alloc(enum type type, uint64_t object, uint64_t start_blkid, range->start_blkid = start_blkid; range->end_blkid = end_blkid; range->eos_marker = eos; + if (type == DATA) { + range->sru.data.abd = NULL; + range->sru.data.abuf = NULL; + mutex_init(&range->sru.data.lock, NULL, MUTEX_DEFAULT, NULL); + cv_init(&range->sru.data.cv, NULL, CV_DEFAULT, NULL); + range->sru.data.io_outstanding = 0; + range->sru.data.io_err = 0; + } return (range); } @@ -1596,19 +1585,115 @@ send_merge_thread(void *arg) thread_exit(); } -struct send_prefetch_thread_arg { +struct send_reader_thread_arg { struct send_merge_thread_arg *smta; bqueue_t q; boolean_t cancel; - boolean_t issue_prefetches; + boolean_t issue_reads; + uint64_t featureflags; int error; }; +static void +dmu_send_read_done(zio_t *zio) +{ + struct send_range *range = zio->io_private; + + mutex_enter(&range->sru.data.lock); + if (zio->io_error != 0) { + abd_free(range->sru.data.abd); + range->sru.data.abd = NULL; + range->sru.data.io_err = zio->io_error; + } + + ASSERT(range->sru.data.io_outstanding); + range->sru.data.io_outstanding = B_FALSE; + cv_broadcast(&range->sru.data.cv); + mutex_exit(&range->sru.data.lock); +} + +static void +issue_data_read(struct send_reader_thread_arg *srta, struct send_range *range) +{ + struct srd *srdp = &range->sru.data; + blkptr_t *bp = &srdp->bp; + objset_t *os = srta->smta->os; + + ASSERT3U(range->type, ==, DATA); + ASSERT3U(range->start_blkid + 1, ==, range->end_blkid); + /* + * If we have large blocks stored on disk but + * the send flags don't allow us to send large + * blocks, we split the data from the arc buf + * into chunks. + */ + boolean_t split_large_blocks = + srdp->datablksz > SPA_OLD_MAXBLOCKSIZE && + !(srta->featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS); + /* + * We should only request compressed data from the ARC if all + * the following are true: + * - stream compression was requested + * - we aren't splitting large blocks into smaller chunks + * - the data won't need to be byteswapped before sending + * - this isn't an embedded block + * - this isn't metadata (if receiving on a different endian + * system it can be byteswapped more easily) + */ + boolean_t request_compressed = + (srta->featureflags & DMU_BACKUP_FEATURE_COMPRESSED) && + !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) && + !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp)); + + enum zio_flag zioflags = ZIO_FLAG_CANFAIL; + + if (srta->featureflags & DMU_BACKUP_FEATURE_RAW) + zioflags |= ZIO_FLAG_RAW; + else if (request_compressed) + zioflags |= ZIO_FLAG_RAW_COMPRESS; + + srdp->datasz = (zioflags & ZIO_FLAG_RAW_COMPRESS) ? + BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp); + + if (!srta->issue_reads) + return; + if (BP_IS_REDACTED(bp)) + return; + if (send_do_embed(bp, srta->featureflags)) + return; + + zbookmark_phys_t zb = { + .zb_objset = dmu_objset_id(os), + .zb_object = range->object, + .zb_level = 0, + .zb_blkid = range->start_blkid, + }; + + arc_flags_t aflags = ARC_FLAG_CACHED_ONLY; + + int arc_err = arc_read(NULL, os->os_spa, bp, + arc_getbuf_func, &srdp->abuf, ZIO_PRIORITY_ASYNC_READ, + zioflags, &aflags, &zb); + /* + * If the data is not already cached in the ARC, we read directly + * from zio. This avoids the performance overhead of adding a new + * entry to the ARC, and we also avoid polluting the ARC cache with + * data that is not likely to be used in the future. + */ + if (arc_err != 0) { + srdp->abd = abd_alloc_linear(srdp->datasz, B_FALSE); + srdp->io_outstanding = B_TRUE; + zio_nowait(zio_read(NULL, os->os_spa, bp, srdp->abd, + srdp->datasz, dmu_send_read_done, range, + ZIO_PRIORITY_ASYNC_READ, zioflags, &zb)); + } +} + /* * Create a new record with the given values. */ static void -enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn, +enqueue_range(struct send_reader_thread_arg *srta, bqueue_t *q, dnode_t *dn, uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz) { enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE : @@ -1629,18 +1714,7 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn, range->sru.data.datablksz = datablksz; range->sru.data.obj_type = dn->dn_type; range->sru.data.bp = *bp; - if (spta->issue_prefetches) { - zbookmark_phys_t zb = {0}; - zb.zb_objset = dmu_objset_id(dn->dn_objset); - zb.zb_object = dn->dn_object; - zb.zb_level = 0; - zb.zb_blkid = blkid; - arc_flags_t aflags = ARC_FLAG_NOWAIT | - ARC_FLAG_PREFETCH; - (void) arc_read(NULL, dn->dn_objset->os_spa, bp, NULL, - NULL, ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL | - ZIO_FLAG_SPECULATIVE, &aflags, &zb); - } + issue_data_read(srta, range); break; case REDACT: range->sru.redact.datablksz = datablksz; @@ -1659,12 +1733,12 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn, * it issues prefetches for the data we need to send. */ static void -send_prefetch_thread(void *arg) +send_reader_thread(void *arg) { - struct send_prefetch_thread_arg *spta = arg; - struct send_merge_thread_arg *smta = spta->smta; + struct send_reader_thread_arg *srta = arg; + struct send_merge_thread_arg *smta = srta->smta; bqueue_t *inq = &smta->q; - bqueue_t *outq = &spta->q; + bqueue_t *outq = &srta->q; objset_t *os = smta->os; fstrans_cookie_t cookie = spl_fstrans_mark(); struct send_range *range = bqueue_dequeue(inq); @@ -1680,30 +1754,14 @@ send_prefetch_thread(void *arg) */ uint64_t last_obj = UINT64_MAX; uint64_t last_obj_exists = B_TRUE; - while (!range->eos_marker && !spta->cancel && smta->error == 0 && + while (!range->eos_marker && !srta->cancel && smta->error == 0 && err == 0) { switch (range->type) { - case DATA: { - zbookmark_phys_t zb; - zb.zb_objset = dmu_objset_id(os); - zb.zb_object = range->object; - zb.zb_level = 0; - zb.zb_blkid = range->start_blkid; - ASSERT3U(range->start_blkid + 1, ==, range->end_blkid); - if (!BP_IS_REDACTED(&range->sru.data.bp) && - spta->issue_prefetches && - !BP_IS_EMBEDDED(&range->sru.data.bp)) { - arc_flags_t aflags = ARC_FLAG_NOWAIT | - ARC_FLAG_PREFETCH; - (void) arc_read(NULL, os->os_spa, - &range->sru.data.bp, NULL, NULL, - ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL | - ZIO_FLAG_SPECULATIVE, &aflags, &zb); - } + case DATA: + issue_data_read(srta, range); bqueue_enqueue(outq, range, range->sru.data.datablksz); range = get_next_range_nofree(inq, range); break; - } case HOLE: case OBJECT: case OBJECT_RANGE: @@ -1805,7 +1863,7 @@ send_prefetch_thread(void *arg) datablksz); uint64_t nblks = (offset / datablksz) - blkid; - enqueue_range(spta, outq, dn, blkid, + enqueue_range(srta, outq, dn, blkid, nblks, NULL, datablksz); blkid += nblks; } @@ -1816,7 +1874,7 @@ send_prefetch_thread(void *arg) if (err != 0) break; ASSERT(!BP_IS_HOLE(&bp)); - enqueue_range(spta, outq, dn, blkid, 1, &bp, + enqueue_range(srta, outq, dn, blkid, 1, &bp, datablksz); } rw_exit(&dn->dn_struct_rwlock); @@ -1825,11 +1883,11 @@ send_prefetch_thread(void *arg) } } } - if (spta->cancel || err != 0) { + if (srta->cancel || err != 0) { smta->cancel = B_TRUE; - spta->error = err; + srta->error = err; } else if (smta->error != 0) { - spta->error = smta->error; + srta->error = smta->error; } while (!range->eos_marker) range = get_next_range(inq, range); @@ -2052,15 +2110,17 @@ setup_merge_thread(struct send_merge_thread_arg *smt_arg, } static void -setup_prefetch_thread(struct send_prefetch_thread_arg *spt_arg, - struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg) +setup_reader_thread(struct send_reader_thread_arg *srt_arg, + struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg, + uint64_t featureflags) { - VERIFY0(bqueue_init(&spt_arg->q, zfs_send_queue_ff, + VERIFY0(bqueue_init(&srt_arg->q, zfs_send_queue_ff, MAX(zfs_send_queue_length, 2 * zfs_max_recordsize), offsetof(struct send_range, ln))); - spt_arg->smta = smt_arg; - spt_arg->issue_prefetches = !dspp->dso->dso_dryrun; - (void) thread_create(NULL, 0, send_prefetch_thread, spt_arg, 0, + srt_arg->smta = smt_arg; + srt_arg->issue_reads = !dspp->dso->dso_dryrun; + srt_arg->featureflags = featureflags; + (void) thread_create(NULL, 0, send_reader_thread, srt_arg, 0, curproc, TS_RUN, minclsyspri); } @@ -2265,7 +2325,7 @@ dmu_send_impl(struct dmu_send_params *dspp) struct send_thread_arg *to_arg; struct redact_list_thread_arg *rlt_arg; struct send_merge_thread_arg *smt_arg; - struct send_prefetch_thread_arg *spt_arg; + struct send_reader_thread_arg *srt_arg; struct send_range *range; redaction_list_t *from_rl = NULL; redaction_list_t *redact_rl = NULL; @@ -2348,7 +2408,7 @@ dmu_send_impl(struct dmu_send_params *dspp) to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP); rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP); smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP); - spt_arg = kmem_zalloc(sizeof (*spt_arg), KM_SLEEP); + srt_arg = kmem_zalloc(sizeof (*srt_arg), KM_SLEEP); drr = create_begin_record(dspp, os, featureflags); dssp = setup_send_progress(dspp); @@ -2457,12 +2517,12 @@ dmu_send_impl(struct dmu_send_params *dspp) setup_from_thread(from_arg, from_rl, dssp); setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp); setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os); - setup_prefetch_thread(spt_arg, dspp, smt_arg); + setup_reader_thread(srt_arg, dspp, smt_arg, featureflags); - range = bqueue_dequeue(&spt_arg->q); + range = bqueue_dequeue(&srt_arg->q); while (err == 0 && !range->eos_marker) { err = do_dump(&dsc, range); - range = get_next_range(&spt_arg->q, range); + range = get_next_range(&srt_arg->q, range); if (issig(JUSTLOOKING) && issig(FORREAL)) err = SET_ERROR(EINTR); } @@ -2474,22 +2534,22 @@ dmu_send_impl(struct dmu_send_params *dspp) * pending records before exiting. */ if (err != 0) { - spt_arg->cancel = B_TRUE; + srt_arg->cancel = B_TRUE; while (!range->eos_marker) { - range = get_next_range(&spt_arg->q, range); + range = get_next_range(&srt_arg->q, range); } } range_free(range); - bqueue_destroy(&spt_arg->q); + bqueue_destroy(&srt_arg->q); bqueue_destroy(&smt_arg->q); if (dspp->redactbook != NULL) bqueue_destroy(&rlt_arg->q); bqueue_destroy(&to_arg->q); bqueue_destroy(&from_arg->q); - if (err == 0 && spt_arg->error != 0) - err = spt_arg->error; + if (err == 0 && srt_arg->error != 0) + err = srt_arg->error; if (err != 0) goto out; @@ -2532,7 +2592,7 @@ out: kmem_free(to_arg, sizeof (*to_arg)); kmem_free(rlt_arg, sizeof (*rlt_arg)); kmem_free(smt_arg, sizeof (*smt_arg)); - kmem_free(spt_arg, sizeof (*spt_arg)); + kmem_free(srt_arg, sizeof (*srt_arg)); dsl_dataset_long_rele(to_ds, FTAG); if (from_rl != NULL) { |