diff options
Diffstat (limited to 'module/zfs/dmu_recv.c')
-rw-r--r-- | module/zfs/dmu_recv.c | 1254 |
1 files changed, 691 insertions, 563 deletions
diff --git a/module/zfs/dmu_recv.c b/module/zfs/dmu_recv.c index 65a031b42..5a7c9d49c 100644 --- a/module/zfs/dmu_recv.c +++ b/module/zfs/dmu_recv.c @@ -21,16 +21,16 @@ /* * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. * Copyright 2011 Nexenta Systems, Inc. All rights reserved. - * Copyright (c) 2011, 2015 by Delphix. All rights reserved. + * Copyright (c) 2011, 2018 by Delphix. All rights reserved. * Copyright (c) 2014, Joyent, Inc. All rights reserved. * Copyright 2014 HybridCluster. All rights reserved. - * Copyright 2016 RackTop Systems. - * Copyright (c) 2016 Actifio, Inc. All rights reserved. * Copyright (c) 2018, loli10K <[email protected]>. All rights reserved. */ #include <sys/dmu.h> #include <sys/dmu_impl.h> +#include <sys/dmu_send.h> +#include <sys/dmu_recv.h> #include <sys/dmu_tx.h> #include <sys/dbuf.h> #include <sys/dnode.h> @@ -42,30 +42,89 @@ #include <sys/dsl_prop.h> #include <sys/dsl_pool.h> #include <sys/dsl_synctask.h> -#include <sys/spa_impl.h> #include <sys/zfs_ioctl.h> #include <sys/zap.h> +#include <sys/zvol.h> #include <sys/zio_checksum.h> #include <sys/zfs_znode.h> #include <zfs_fletcher.h> #include <sys/avl.h> #include <sys/ddt.h> #include <sys/zfs_onexit.h> -#include <sys/dmu_recv.h> +#include <sys/dmu_send.h> #include <sys/dsl_destroy.h> #include <sys/blkptr.h> #include <sys/dsl_bookmark.h> #include <sys/zfeature.h> #include <sys/bqueue.h> -#include <sys/zvol.h> -#include <sys/policy.h> +#include <sys/objlist.h> +#ifdef _KERNEL +#include <sys/zfs_vfsops.h> +#endif int zfs_recv_queue_length = SPA_MAXBLOCKSIZE; +int zfs_recv_queue_ff = 20; static char *dmu_recv_tag = "dmu_recv_tag"; const char *recv_clone_name = "%recv"; -static void byteswap_record(dmu_replay_record_t *drr); +static int receive_read_payload_and_next_header(dmu_recv_cookie_t *ra, int len, + void *buf); + +struct receive_record_arg { + dmu_replay_record_t header; + void *payload; /* Pointer to a buffer containing the payload */ + /* + * If the record is a write, pointer to the arc_buf_t containing the + * payload. + */ + arc_buf_t *arc_buf; + int payload_size; + uint64_t bytes_read; /* bytes read from stream when record created */ + boolean_t eos_marker; /* Marks the end of the stream */ + bqueue_node_t node; +}; + +struct receive_writer_arg { + objset_t *os; + boolean_t byteswap; + bqueue_t q; + + /* + * These three args are used to signal to the main thread that we're + * done. + */ + kmutex_t mutex; + kcondvar_t cv; + boolean_t done; + + int err; + /* A map from guid to dataset to help handle dedup'd streams. */ + avl_tree_t *guid_to_ds_map; + boolean_t resumable; + boolean_t raw; /* DMU_BACKUP_FEATURE_RAW set */ + boolean_t spill; /* DRR_FLAG_SPILL_BLOCK set */ + uint64_t last_object; + uint64_t last_offset; + uint64_t max_object; /* highest object ID referenced in stream */ + uint64_t bytes_read; /* bytes read when current record created */ + + /* Encryption parameters for the last received DRR_OBJECT_RANGE */ + boolean_t or_crypt_params_present; + uint64_t or_firstobj; + uint64_t or_numslots; + uint8_t or_salt[ZIO_DATA_SALT_LEN]; + uint8_t or_iv[ZIO_DATA_IV_LEN]; + uint8_t or_mac[ZIO_DATA_MAC_LEN]; + boolean_t or_byteorder; +}; + +typedef struct guid_map_entry { + uint64_t guid; + boolean_t raw; + dsl_dataset_t *gme_ds; + avl_node_t avlnode; +} guid_map_entry_t; typedef struct dmu_recv_begin_arg { const char *drba_origin; @@ -74,6 +133,211 @@ typedef struct dmu_recv_begin_arg { dsl_crypto_params_t *drba_dcp; } dmu_recv_begin_arg_t; +static void +byteswap_record(dmu_replay_record_t *drr) +{ +#define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X)) +#define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X)) + drr->drr_type = BSWAP_32(drr->drr_type); + drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen); + + switch (drr->drr_type) { + case DRR_BEGIN: + DO64(drr_begin.drr_magic); + DO64(drr_begin.drr_versioninfo); + DO64(drr_begin.drr_creation_time); + DO32(drr_begin.drr_type); + DO32(drr_begin.drr_flags); + DO64(drr_begin.drr_toguid); + DO64(drr_begin.drr_fromguid); + break; + case DRR_OBJECT: + DO64(drr_object.drr_object); + DO32(drr_object.drr_type); + DO32(drr_object.drr_bonustype); + DO32(drr_object.drr_blksz); + DO32(drr_object.drr_bonuslen); + DO32(drr_object.drr_raw_bonuslen); + DO64(drr_object.drr_toguid); + DO64(drr_object.drr_maxblkid); + break; + case DRR_FREEOBJECTS: + DO64(drr_freeobjects.drr_firstobj); + DO64(drr_freeobjects.drr_numobjs); + DO64(drr_freeobjects.drr_toguid); + break; + case DRR_WRITE: + DO64(drr_write.drr_object); + DO32(drr_write.drr_type); + DO64(drr_write.drr_offset); + DO64(drr_write.drr_logical_size); + DO64(drr_write.drr_toguid); + ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum); + DO64(drr_write.drr_key.ddk_prop); + DO64(drr_write.drr_compressed_size); + break; + case DRR_WRITE_BYREF: + DO64(drr_write_byref.drr_object); + DO64(drr_write_byref.drr_offset); + DO64(drr_write_byref.drr_length); + DO64(drr_write_byref.drr_toguid); + DO64(drr_write_byref.drr_refguid); + DO64(drr_write_byref.drr_refobject); + DO64(drr_write_byref.drr_refoffset); + ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref. + drr_key.ddk_cksum); + DO64(drr_write_byref.drr_key.ddk_prop); + break; + case DRR_WRITE_EMBEDDED: + DO64(drr_write_embedded.drr_object); + DO64(drr_write_embedded.drr_offset); + DO64(drr_write_embedded.drr_length); + DO64(drr_write_embedded.drr_toguid); + DO32(drr_write_embedded.drr_lsize); + DO32(drr_write_embedded.drr_psize); + break; + case DRR_FREE: + DO64(drr_free.drr_object); + DO64(drr_free.drr_offset); + DO64(drr_free.drr_length); + DO64(drr_free.drr_toguid); + break; + case DRR_SPILL: + DO64(drr_spill.drr_object); + DO64(drr_spill.drr_length); + DO64(drr_spill.drr_toguid); + DO64(drr_spill.drr_compressed_size); + DO32(drr_spill.drr_type); + break; + case DRR_OBJECT_RANGE: + DO64(drr_object_range.drr_firstobj); + DO64(drr_object_range.drr_numslots); + DO64(drr_object_range.drr_toguid); + break; + case DRR_REDACT: + DO64(drr_redact.drr_object); + DO64(drr_redact.drr_offset); + DO64(drr_redact.drr_length); + DO64(drr_redact.drr_toguid); + break; + case DRR_END: + DO64(drr_end.drr_toguid); + ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum); + break; + default: + break; + } + + if (drr->drr_type != DRR_BEGIN) { + ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum); + } + +#undef DO64 +#undef DO32 +} + +static boolean_t +redact_snaps_contains(uint64_t *snaps, uint64_t num_snaps, uint64_t guid) +{ + for (int i = 0; i < num_snaps; i++) { + if (snaps[i] == guid) + return (B_TRUE); + } + return (B_FALSE); +} + +/* + * Check that the new stream we're trying to receive is redacted with respect to + * a subset of the snapshots that the origin was redacted with respect to. For + * the reasons behind this, see the man page on redacted zfs sends and receives. + */ +static boolean_t +compatible_redact_snaps(uint64_t *origin_snaps, uint64_t origin_num_snaps, + uint64_t *redact_snaps, uint64_t num_redact_snaps) +{ + /* + * Short circuit the comparison; if we are redacted with respect to + * more snapshots than the origin, we can't be redacted with respect + * to a subset. + */ + if (num_redact_snaps > origin_num_snaps) { + return (B_FALSE); + } + + for (int i = 0; i < num_redact_snaps; i++) { + if (!redact_snaps_contains(origin_snaps, origin_num_snaps, + redact_snaps[i])) { + return (B_FALSE); + } + } + return (B_TRUE); +} + +static boolean_t +redact_check(dmu_recv_begin_arg_t *drba, dsl_dataset_t *origin) +{ + uint64_t *origin_snaps; + uint64_t origin_num_snaps; + dmu_recv_cookie_t *drc = drba->drba_cookie; + struct drr_begin *drrb = drc->drc_drrb; + int featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); + int err = 0; + boolean_t ret = B_TRUE; + uint64_t *redact_snaps; + uint_t numredactsnaps; + + /* + * If this is a full send stream, we're safe no matter what. + */ + if (drrb->drr_fromguid == 0) + return (ret); + + VERIFY(dsl_dataset_get_uint64_array_feature(origin, + SPA_FEATURE_REDACTED_DATASETS, &origin_num_snaps, &origin_snaps)); + + if (nvlist_lookup_uint64_array(drc->drc_begin_nvl, + BEGINNV_REDACT_FROM_SNAPS, &redact_snaps, &numredactsnaps) == + 0) { + /* + * If the send stream was sent from the redaction bookmark or + * the redacted version of the dataset, then we're safe. Verify + * that this is from the a compatible redaction bookmark or + * redacted dataset. + */ + if (!compatible_redact_snaps(origin_snaps, origin_num_snaps, + redact_snaps, numredactsnaps)) { + err = EINVAL; + } + } else if (featureflags & DMU_BACKUP_FEATURE_REDACTED) { + /* + * If the stream is redacted, it must be redacted with respect + * to a subset of what the origin is redacted with respect to. + * See case number 2 in the zfs man page section on redacted zfs + * send. + */ + err = nvlist_lookup_uint64_array(drc->drc_begin_nvl, + BEGINNV_REDACT_SNAPS, &redact_snaps, &numredactsnaps); + + if (err != 0 || !compatible_redact_snaps(origin_snaps, + origin_num_snaps, redact_snaps, numredactsnaps)) { + err = EINVAL; + } + } else if (!redact_snaps_contains(origin_snaps, origin_num_snaps, + drrb->drr_toguid)) { + /* + * If the stream isn't redacted but the origin is, this must be + * one of the snapshots the origin is redacted with respect to. + * See case number 1 in the zfs man page section on redacted zfs + * send. + */ + err = EINVAL; + } + + if (err != 0) + ret = B_FALSE; + return (ret); +} + static int recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, uint64_t fromguid, uint64_t featureflags) @@ -91,14 +355,14 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name, 8, 1, &val); if (error != ENOENT) - return (error == 0 ? EBUSY : error); + return (error == 0 ? SET_ERROR(EBUSY) : error); /* new snapshot name must not exist */ error = zap_lookup(dp->dp_meta_objset, dsl_dataset_phys(ds)->ds_snapnames_zapobj, drba->drba_cookie->drc_tosnap, 8, 1, &val); if (error != ENOENT) - return (error == 0 ? EEXIST : error); + return (error == 0 ? SET_ERROR(EEXIST) : error); /* must not have children if receiving a ZVOL */ error = zap_count(dp->dp_meta_objset, @@ -127,7 +391,7 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, dsl_dataset_t *snap; uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj; - /* Can't raw receive on top of an unencrypted dataset */ + /* Can't perform a raw receive on top of a non-raw receive */ if (!encrypted && raw) return (SET_ERROR(EINVAL)); @@ -175,6 +439,13 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, ds->ds_prev->ds_object; } + if (dsl_dataset_feature_is_active(snap, + SPA_FEATURE_REDACTED_DATASETS) && !redact_check(drba, + snap)) { + dsl_dataset_rele(snap, FTAG); + return (SET_ERROR(EINVAL)); + } + dsl_dataset_rele(snap, FTAG); } else { /* if full, then must be forced */ @@ -206,14 +477,66 @@ recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, if (will_encrypt && embed) return (SET_ERROR(EINVAL)); } - - drba->drba_cookie->drc_fromsnapobj = 0; } return (0); } +/* + * Check that any feature flags used in the data stream we're receiving are + * supported by the pool we are receiving into. + * + * Note that some of the features we explicitly check here have additional + * (implicit) features they depend on, but those dependencies are enforced + * through the zfeature_register() calls declaring the features that we + * explicitly check. + */ +static int +recv_begin_check_feature_flags_impl(uint64_t featureflags, spa_t *spa) +{ + /* + * Check if there are any unsupported feature flags. + */ + if (!DMU_STREAM_SUPPORTED(featureflags)) { + return (SET_ERROR(ZFS_ERR_UNKNOWN_SEND_STREAM_FEATURE)); + } + + /* Verify pool version supports SA if SA_SPILL feature set */ + if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && + spa_version(spa) < SPA_VERSION_SA) + return (SET_ERROR(ENOTSUP)); + + /* + * LZ4 compressed, embedded, mooched, large blocks, and large_dnodes + * in the stream can only be used if those pool features are enabled + * because we don't attempt to decompress / un-embed / un-mooch / + * split up the blocks / dnodes during the receive process. + */ + if ((featureflags & DMU_BACKUP_FEATURE_LZ4) && + !spa_feature_is_enabled(spa, SPA_FEATURE_LZ4_COMPRESS)) + return (SET_ERROR(ENOTSUP)); + if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && + !spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA)) + return (SET_ERROR(ENOTSUP)); + if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && + !spa_feature_is_enabled(spa, SPA_FEATURE_LARGE_BLOCKS)) + return (SET_ERROR(ENOTSUP)); + if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) && + !spa_feature_is_enabled(spa, SPA_FEATURE_LARGE_DNODE)) + return (SET_ERROR(ENOTSUP)); + + /* + * Receiving redacted streams requires that redacted datasets are + * enabled. + */ + if ((featureflags & DMU_BACKUP_FEATURE_REDACTED) && + !spa_feature_is_enabled(spa, SPA_FEATURE_REDACTED_DATASETS)) + return (SET_ERROR(ENOTSUP)); + + return (0); +} + static int dmu_recv_begin_check(void *arg, dmu_tx_t *tx) { @@ -224,7 +547,7 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx) int flags = drrb->drr_flags; ds_hold_flags_t dsflags = 0; int error; - uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); + uint64_t featureflags = drba->drba_cookie->drc_featureflags; dsl_dataset_t *ds; const char *tofs = drba->drba_cookie->drc_tofs; @@ -238,41 +561,15 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx) ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL)) return (SET_ERROR(EINVAL)); - /* Verify pool version supports SA if SA_SPILL feature set */ - if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && - spa_version(dp->dp_spa) < SPA_VERSION_SA) - return (SET_ERROR(ENOTSUP)); + error = recv_begin_check_feature_flags_impl(featureflags, dp->dp_spa); + if (error != 0) + return (error); + /* Resumable receives require extensible datasets */ if (drba->drba_cookie->drc_resumable && !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET)) return (SET_ERROR(ENOTSUP)); - /* - * The receiving code doesn't know how to translate a WRITE_EMBEDDED - * record to a plain WRITE record, so the pool must have the - * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED - * records. Same with WRITE_EMBEDDED records that use LZ4 compression. - */ - if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && - !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) - return (SET_ERROR(ENOTSUP)); - if ((featureflags & DMU_BACKUP_FEATURE_LZ4) && - !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) - return (SET_ERROR(ENOTSUP)); - - /* - * The receiving code doesn't know how to translate large blocks - * to smaller ones, so the pool must have the LARGE_BLOCKS - * feature enabled if the stream has LARGE_BLOCKS. Same with - * large dnodes. - */ - if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && - !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS)) - return (SET_ERROR(ENOTSUP)); - if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) && - !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE)) - return (SET_ERROR(ENOTSUP)); - if (featureflags & DMU_BACKUP_FEATURE_RAW) { /* raw receives require the encryption feature */ if (!spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_ENCRYPTION)) @@ -311,7 +608,7 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx) * If it's a non-clone incremental, we are missing the * target fs, so fail the recv. */ - if (fromguid != 0 && !(flags & DRR_FLAG_CLONE || + if (fromguid != 0 && !((flags & DRR_FLAG_CLONE) || drba->drba_origin)) return (SET_ERROR(ENOENT)); @@ -320,7 +617,7 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx) * contain all the necessary free records and freeobject * records, reject it. */ - if (fromguid == 0 && drba->drba_origin && + if (fromguid == 0 && drba->drba_origin != NULL && !(flags & DRR_FLAG_FREERECORDS)) return (SET_ERROR(EINVAL)); @@ -388,7 +685,6 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx) if (drba->drba_origin != NULL) { dsl_dataset_t *origin; - error = dsl_dataset_hold_flags(dp, drba->drba_origin, dsflags, FTAG, &origin); if (error != 0) { @@ -406,14 +702,31 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx) dsl_dataset_rele_flags(ds, dsflags, FTAG); return (SET_ERROR(ENODEV)); } + if (origin->ds_dir->dd_crypto_obj != 0 && (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)) { dsl_dataset_rele_flags(origin, dsflags, FTAG); dsl_dataset_rele_flags(ds, dsflags, FTAG); return (SET_ERROR(EINVAL)); } - dsl_dataset_rele_flags(origin, - dsflags, FTAG); + + /* + * If the origin is redacted we need to verify that this + * send stream can safely be received on top of the + * origin. + */ + if (dsl_dataset_feature_is_active(origin, + SPA_FEATURE_REDACTED_DATASETS)) { + if (!redact_check(drba, origin)) { + dsl_dataset_rele_flags(origin, dsflags, + FTAG); + dsl_dataset_rele_flags(ds, dsflags, + FTAG); + return (SET_ERROR(EINVAL)); + } + } + + dsl_dataset_rele_flags(origin, dsflags, FTAG); } dsl_dataset_rele_flags(ds, dsflags, FTAG); @@ -428,9 +741,10 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) dmu_recv_begin_arg_t *drba = arg; dsl_pool_t *dp = dmu_tx_pool(tx); objset_t *mos = dp->dp_meta_objset; - struct drr_begin *drrb = drba->drba_cookie->drc_drrb; - const char *tofs = drba->drba_cookie->drc_tofs; - uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); + dmu_recv_cookie_t *drc = drba->drba_cookie; + struct drr_begin *drrb = drc->drc_drrb; + const char *tofs = drc->drc_tofs; + uint64_t featureflags = drc->drc_featureflags; dsl_dataset_t *ds, *newds; objset_t *os; uint64_t dsobj; @@ -451,7 +765,7 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) * the raw cmd set. Raw incremental recvs do not use a dcp * since the encryption parameters are already set in stone. */ - if (dcp == NULL && drba->drba_cookie->drc_fromsnapobj == 0 && + if (dcp == NULL && drrb->drr_fromguid == 0 && drba->drba_origin == NULL) { ASSERT3P(dcp, ==, NULL); dcp = &dummy_dcp; @@ -470,7 +784,6 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) drba->drba_cookie->drc_fromsnapobj, FTAG, &snap)); ASSERT3P(dcp, ==, NULL); } - dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name, snap, crflags, drba->drba_cred, dcp, tx); if (drba->drba_cookie->drc_fromsnapobj != 0) @@ -495,13 +808,24 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) if (origin != NULL) dsl_dataset_rele(origin, FTAG); dsl_dir_rele(dd, FTAG); - drba->drba_cookie->drc_newfs = B_TRUE; + drc->drc_newfs = B_TRUE; + } + VERIFY0(dsl_dataset_own_obj_force(dp, dsobj, dsflags, dmu_recv_tag, + &newds)); + if (dsl_dataset_feature_is_active(newds, + SPA_FEATURE_REDACTED_DATASETS)) { + /* + * If the origin dataset is redacted, the child will be redacted + * when we create it. We clear the new dataset's + * redaction info; if it should be redacted, we'll fill + * in its information later. + */ + dsl_dataset_deactivate_feature(newds, + SPA_FEATURE_REDACTED_DATASETS, tx); } - - VERIFY0(dsl_dataset_own_obj(dp, dsobj, dsflags, dmu_recv_tag, &newds)); VERIFY0(dmu_objset_from_ds(newds, &os)); - if (drba->drba_cookie->drc_resumable) { + if (drc->drc_resumable) { dsl_dataset_zapify(newds, tx); if (drrb->drr_fromguid != 0) { VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID, @@ -535,6 +859,17 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_RAWOK, 8, 1, &one, tx)); } + + uint64_t *redact_snaps; + uint_t numredactsnaps; + if (nvlist_lookup_uint64_array(drc->drc_begin_nvl, + BEGINNV_REDACT_FROM_SNAPS, &redact_snaps, + &numredactsnaps) == 0) { + VERIFY0(zap_add(mos, dsobj, + DS_FIELD_RESUME_REDACT_BOOKMARK_SNAPS, + sizeof (*redact_snaps), numredactsnaps, + redact_snaps, tx)); + } } /* @@ -547,6 +882,16 @@ dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) drba->drba_cookie->drc_raw = B_TRUE; } + + if (featureflags & DMU_BACKUP_FEATURE_REDACTED) { + uint64_t *redact_snaps; + uint_t numredactsnaps; + VERIFY0(nvlist_lookup_uint64_array(drc->drc_begin_nvl, + BEGINNV_REDACT_SNAPS, &redact_snaps, &numredactsnaps)); + dsl_dataset_activate_redaction(newds, redact_snaps, + numredactsnaps, tx); + } + dmu_buf_will_dirty(newds->ds_dbuf, tx); dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT; @@ -573,60 +918,39 @@ static int dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx) { dmu_recv_begin_arg_t *drba = arg; + dmu_recv_cookie_t *drc = drba->drba_cookie; dsl_pool_t *dp = dmu_tx_pool(tx); - struct drr_begin *drrb = drba->drba_cookie->drc_drrb; + struct drr_begin *drrb = drc->drc_drrb; int error; ds_hold_flags_t dsflags = 0; - uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); dsl_dataset_t *ds; - const char *tofs = drba->drba_cookie->drc_tofs; + const char *tofs = drc->drc_tofs; /* already checked */ ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); - ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING); + ASSERT(drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING); if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == DMU_COMPOUNDSTREAM || drrb->drr_type >= DMU_OST_NUMTYPES) return (SET_ERROR(EINVAL)); - /* Verify pool version supports SA if SA_SPILL feature set */ - if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && - spa_version(dp->dp_spa) < SPA_VERSION_SA) - return (SET_ERROR(ENOTSUP)); - - /* - * The receiving code doesn't know how to translate a WRITE_EMBEDDED - * record to a plain WRITE record, so the pool must have the - * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED - * records. Same with WRITE_EMBEDDED records that use LZ4 compression. - */ - if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && - !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) - return (SET_ERROR(ENOTSUP)); - if ((featureflags & DMU_BACKUP_FEATURE_LZ4) && - !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) - return (SET_ERROR(ENOTSUP)); - /* - * The receiving code doesn't know how to translate large blocks - * to smaller ones, so the pool must have the LARGE_BLOCKS - * feature enabled if the stream has LARGE_BLOCKS. Same with - * large dnodes. + * This is mostly a sanity check since we should have already done these + * checks during a previous attempt to receive the data. */ - if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && - !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS)) - return (SET_ERROR(ENOTSUP)); - if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) && - !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE)) - return (SET_ERROR(ENOTSUP)); + error = recv_begin_check_feature_flags_impl(drc->drc_featureflags, + dp->dp_spa); + if (error != 0) + return (error); /* 6 extra bytes for /%recv */ char recvname[ZFS_MAX_DATASET_NAME_LEN + 6]; + (void) snprintf(recvname, sizeof (recvname), "%s/%s", tofs, recv_clone_name); - if (featureflags & DMU_BACKUP_FEATURE_RAW) { + if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) { /* raw receives require spill block allocation flag */ if (!(drrb->drr_flags & DRR_FLAG_SPILL_BLOCK)) return (SET_ERROR(ZFS_ERR_SPILL_BLOCK_FLAG_MISSING)); @@ -690,6 +1014,40 @@ dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx) return (SET_ERROR(EINVAL)); } + /* + * If we're resuming, and the send is redacted, then the original send + * must have been redacted, and must have been redacted with respect to + * the same snapshots. + */ + if (drc->drc_featureflags & DMU_BACKUP_FEATURE_REDACTED) { + uint64_t num_ds_redact_snaps; + uint64_t *ds_redact_snaps; + + uint_t num_stream_redact_snaps; + uint64_t *stream_redact_snaps; + + if (nvlist_lookup_uint64_array(drc->drc_begin_nvl, + BEGINNV_REDACT_SNAPS, &stream_redact_snaps, + &num_stream_redact_snaps) != 0) { + dsl_dataset_rele_flags(ds, dsflags, FTAG); + return (SET_ERROR(EINVAL)); + } + + if (!dsl_dataset_get_uint64_array_feature(ds, + SPA_FEATURE_REDACTED_DATASETS, &num_ds_redact_snaps, + &ds_redact_snaps)) { + dsl_dataset_rele_flags(ds, dsflags, FTAG); + return (SET_ERROR(EINVAL)); + } + + for (int i = 0; i < num_ds_redact_snaps; i++) { + if (!redact_snaps_contains(ds_redact_snaps, + num_ds_redact_snaps, stream_redact_snaps[i])) { + dsl_dataset_rele_flags(ds, dsflags, FTAG); + return (SET_ERROR(EINVAL)); + } + } + } dsl_dataset_rele_flags(ds, dsflags, FTAG); return (0); } @@ -700,17 +1058,14 @@ dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx) dmu_recv_begin_arg_t *drba = arg; dsl_pool_t *dp = dmu_tx_pool(tx); const char *tofs = drba->drba_cookie->drc_tofs; - struct drr_begin *drrb = drba->drba_cookie->drc_drrb; - uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); + uint64_t featureflags = drba->drba_cookie->drc_featureflags; dsl_dataset_t *ds; - objset_t *os; ds_hold_flags_t dsflags = 0; - uint64_t dsobj; /* 6 extra bytes for /%recv */ char recvname[ZFS_MAX_DATASET_NAME_LEN + 6]; - (void) snprintf(recvname, sizeof (recvname), "%s/%s", - tofs, recv_clone_name); + (void) snprintf(recvname, sizeof (recvname), "%s/%s", tofs, + recv_clone_name); if (featureflags & DMU_BACKUP_FEATURE_RAW) { drba->drba_cookie->drc_raw = B_TRUE; @@ -718,25 +1073,15 @@ dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx) dsflags |= DS_HOLD_FLAG_DECRYPT; } - if (dsl_dataset_hold_flags(dp, recvname, dsflags, FTAG, &ds) != 0) { + if (dsl_dataset_own_force(dp, recvname, dsflags, dmu_recv_tag, &ds) + != 0) { /* %recv does not exist; continue in tofs */ - VERIFY0(dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds)); + VERIFY0(dsl_dataset_own_force(dp, tofs, dsflags, dmu_recv_tag, + &ds)); drba->drba_cookie->drc_newfs = B_TRUE; } - /* clear the inconsistent flag so that we can own it */ ASSERT(DS_IS_INCONSISTENT(ds)); - dmu_buf_will_dirty(ds->ds_dbuf, tx); - dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; - dsobj = ds->ds_object; - dsl_dataset_rele_flags(ds, dsflags, FTAG); - - VERIFY0(dsl_dataset_own_obj(dp, dsobj, dsflags, dmu_recv_tag, &ds)); - VERIFY0(dmu_objset_from_ds(ds, &os)); - - dmu_buf_will_dirty(ds->ds_dbuf, tx); - dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT; - rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG); ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)) || drba->drba_cookie->drc_raw); @@ -754,9 +1099,11 @@ dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx) int dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, boolean_t force, boolean_t resumable, nvlist_t *localprops, - nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc) + nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc, vnode_t *vp, + offset_t *voffp) { dmu_recv_begin_arg_t drba = { 0 }; + int err; bzero(drc, sizeof (dmu_recv_cookie_t)); drc->drc_drr_begin = drr_begin; @@ -780,6 +1127,33 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, return (SET_ERROR(EINVAL)); } + drc->drc_vp = vp; + drc->drc_voff = *voffp; + drc->drc_featureflags = + DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo); + + uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen; + void *payload = NULL; + if (payloadlen != 0) + payload = kmem_alloc(payloadlen, KM_SLEEP); + + err = receive_read_payload_and_next_header(drc, payloadlen, + payload); + if (err != 0) { + kmem_free(payload, payloadlen); + return (err); + } + if (payloadlen != 0) { + err = nvlist_unpack(payload, payloadlen, &drc->drc_begin_nvl, + KM_SLEEP); + kmem_free(payload, payloadlen); + if (err != 0) { + kmem_free(drc->drc_next_rrd, + sizeof (*drc->drc_next_rrd)); + return (err); + } + } + if (drc->drc_drrb->drr_flags & DRR_FLAG_SPILL_BLOCK) drc->drc_spill = B_TRUE; @@ -787,13 +1161,11 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, drba.drba_cookie = drc; drba.drba_cred = CRED(); - if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) & - DMU_BACKUP_FEATURE_RESUMING) { - return (dsl_sync_task(tofs, + if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING) { + err = dsl_sync_task(tofs, dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync, - &drba, 5, ZFS_SPACE_CHECK_NORMAL)); - } else { - int err; + &drba, 5, ZFS_SPACE_CHECK_NORMAL); + } else { /* * For non-raw, non-incremental, non-resuming receives the @@ -810,110 +1182,23 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, origin == NULL && drc->drc_drrb->drr_fromguid == 0) { err = dsl_crypto_params_create_nvlist(DCP_CMD_NONE, localprops, hidden_args, &drba.drba_dcp); - if (err != 0) - return (err); } - err = dsl_sync_task(tofs, - dmu_recv_begin_check, dmu_recv_begin_sync, - &drba, 5, ZFS_SPACE_CHECK_NORMAL); - dsl_crypto_params_free(drba.drba_dcp, !!err); + if (err == 0) { + err = dsl_sync_task(tofs, + dmu_recv_begin_check, dmu_recv_begin_sync, + &drba, 5, ZFS_SPACE_CHECK_NORMAL); + dsl_crypto_params_free(drba.drba_dcp, !!err); + } + } - return (err); + if (err != 0) { + kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd)); + nvlist_free(drc->drc_begin_nvl); } + return (err); } -struct receive_record_arg { - dmu_replay_record_t header; - void *payload; /* Pointer to a buffer containing the payload */ - /* - * If the record is a write, pointer to the arc_buf_t containing the - * payload. - */ - arc_buf_t *arc_buf; - int payload_size; - uint64_t bytes_read; /* bytes read from stream when record created */ - boolean_t eos_marker; /* Marks the end of the stream */ - bqueue_node_t node; -}; - -struct receive_writer_arg { - objset_t *os; - boolean_t byteswap; - bqueue_t q; - - /* - * These three args are used to signal to the main thread that we're - * done. - */ - kmutex_t mutex; - kcondvar_t cv; - boolean_t done; - - int err; - /* A map from guid to dataset to help handle dedup'd streams. */ - avl_tree_t *guid_to_ds_map; - boolean_t resumable; - boolean_t raw; /* DMU_BACKUP_FEATURE_RAW set */ - boolean_t spill; /* DRR_FLAG_SPILL_BLOCK set */ - uint64_t last_object; - uint64_t last_offset; - uint64_t max_object; /* highest object ID referenced in stream */ - uint64_t bytes_read; /* bytes read when current record created */ - - /* Encryption parameters for the last received DRR_OBJECT_RANGE */ - boolean_t or_crypt_params_present; - uint64_t or_firstobj; - uint64_t or_numslots; - uint8_t or_salt[ZIO_DATA_SALT_LEN]; - uint8_t or_iv[ZIO_DATA_IV_LEN]; - uint8_t or_mac[ZIO_DATA_MAC_LEN]; - boolean_t or_byteorder; -}; - -struct objlist { - list_t list; /* List of struct receive_objnode. */ - /* - * Last object looked up. Used to assert that objects are being looked - * up in ascending order. - */ - uint64_t last_lookup; -}; - -struct receive_objnode { - list_node_t node; - uint64_t object; -}; - -struct receive_arg { - objset_t *os; - vnode_t *vp; /* The vnode to read the stream from */ - uint64_t voff; /* The current offset in the stream */ - uint64_t bytes_read; - /* - * A record that has had its payload read in, but hasn't yet been handed - * off to the worker thread. - */ - struct receive_record_arg *rrd; - /* A record that has had its header read in, but not its payload. */ - struct receive_record_arg *next_rrd; - zio_cksum_t cksum; - zio_cksum_t prev_cksum; - int err; - boolean_t byteswap; - boolean_t raw; - uint64_t featureflags; - /* Sorted list of objects not to issue prefetches for. */ - struct objlist ignore_objlist; -}; - -typedef struct guid_map_entry { - uint64_t guid; - boolean_t raw; - dsl_dataset_t *gme_ds; - avl_node_t avlnode; -} guid_map_entry_t; - static int guid_compare(const void *arg1, const void *arg2) { @@ -946,7 +1231,7 @@ free_guid_map_onexit(void *arg) } static int -receive_read(struct receive_arg *ra, int len, void *buf) +receive_read(dmu_recv_cookie_t *drc, int len, void *buf) { int done = 0; @@ -955,14 +1240,14 @@ receive_read(struct receive_arg *ra, int len, void *buf) * comment in dump_bytes. */ ASSERT(len % 8 == 0 || - (ra->featureflags & DMU_BACKUP_FEATURE_RAW) != 0); + (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0); while (done < len) { ssize_t resid; - ra->err = vn_rdwr(UIO_READ, ra->vp, + drc->drc_err = vn_rdwr(UIO_READ, drc->drc_vp, (char *)buf + done, len - done, - ra->voff, UIO_SYSSPACE, FAPPEND, + drc->drc_voff, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid); if (resid == len - done) { @@ -970,117 +1255,20 @@ receive_read(struct receive_arg *ra, int len, void *buf) * Note: ECKSUM indicates that the receive * was interrupted and can potentially be resumed. */ - ra->err = SET_ERROR(ECKSUM); + drc->drc_err = SET_ERROR(ECKSUM); } - ra->voff += len - done - resid; + drc->drc_voff += len - done - resid; done = len - resid; - if (ra->err != 0) - return (ra->err); + if (drc->drc_err != 0) + return (drc->drc_err); } - ra->bytes_read += len; + drc->drc_bytes_read += len; ASSERT3U(done, ==, len); return (0); } -noinline static void -byteswap_record(dmu_replay_record_t *drr) -{ -#define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X)) -#define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X)) - drr->drr_type = BSWAP_32(drr->drr_type); - drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen); - - switch (drr->drr_type) { - case DRR_BEGIN: - DO64(drr_begin.drr_magic); - DO64(drr_begin.drr_versioninfo); - DO64(drr_begin.drr_creation_time); - DO32(drr_begin.drr_type); - DO32(drr_begin.drr_flags); - DO64(drr_begin.drr_toguid); - DO64(drr_begin.drr_fromguid); - break; - case DRR_OBJECT: - DO64(drr_object.drr_object); - DO32(drr_object.drr_type); - DO32(drr_object.drr_bonustype); - DO32(drr_object.drr_blksz); - DO32(drr_object.drr_bonuslen); - DO32(drr_object.drr_raw_bonuslen); - DO64(drr_object.drr_toguid); - DO64(drr_object.drr_maxblkid); - break; - case DRR_FREEOBJECTS: - DO64(drr_freeobjects.drr_firstobj); - DO64(drr_freeobjects.drr_numobjs); - DO64(drr_freeobjects.drr_toguid); - break; - case DRR_WRITE: - DO64(drr_write.drr_object); - DO32(drr_write.drr_type); - DO64(drr_write.drr_offset); - DO64(drr_write.drr_logical_size); - DO64(drr_write.drr_toguid); - ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum); - DO64(drr_write.drr_key.ddk_prop); - DO64(drr_write.drr_compressed_size); - break; - case DRR_WRITE_BYREF: - DO64(drr_write_byref.drr_object); - DO64(drr_write_byref.drr_offset); - DO64(drr_write_byref.drr_length); - DO64(drr_write_byref.drr_toguid); - DO64(drr_write_byref.drr_refguid); - DO64(drr_write_byref.drr_refobject); - DO64(drr_write_byref.drr_refoffset); - ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref. - drr_key.ddk_cksum); - DO64(drr_write_byref.drr_key.ddk_prop); - break; - case DRR_WRITE_EMBEDDED: - DO64(drr_write_embedded.drr_object); - DO64(drr_write_embedded.drr_offset); - DO64(drr_write_embedded.drr_length); - DO64(drr_write_embedded.drr_toguid); - DO32(drr_write_embedded.drr_lsize); - DO32(drr_write_embedded.drr_psize); - break; - case DRR_FREE: - DO64(drr_free.drr_object); - DO64(drr_free.drr_offset); - DO64(drr_free.drr_length); - DO64(drr_free.drr_toguid); - break; - case DRR_SPILL: - DO64(drr_spill.drr_object); - DO64(drr_spill.drr_length); - DO64(drr_spill.drr_toguid); - DO64(drr_spill.drr_compressed_size); - DO32(drr_spill.drr_type); - break; - case DRR_OBJECT_RANGE: - DO64(drr_object_range.drr_firstobj); - DO64(drr_object_range.drr_numslots); - DO64(drr_object_range.drr_toguid); - break; - case DRR_END: - DO64(drr_end.drr_toguid); - ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum); - break; - default: - break; - } - - if (drr->drr_type != DRR_BEGIN) { - ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum); - } - -#undef DO64 -#undef DO32 -} - static inline uint8_t deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size) { @@ -1152,7 +1340,7 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, drro->drr_bonuslen > DN_BONUS_SIZE(spa_maxdnodesize(dmu_objset_spa(rwa->os))) || dn_slots > - (spa_maxdnodesize(dmu_objset_spa(rwa->os)) >> DNODE_SHIFT)) { + (spa_maxdnodesize(dmu_objset_spa(rwa->os)) >> DNODE_SHIFT)) { return (SET_ERROR(EINVAL)); } @@ -1187,6 +1375,7 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, } err = dmu_object_info(rwa->os, drro->drr_object, &doi); + if (err != 0 && err != ENOENT && err != EEXIST) return (SET_ERROR(EINVAL)); @@ -1231,8 +1420,8 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, (rwa->raw && (indblksz != doi.doi_metadata_block_size || drro->drr_nlevels < doi.doi_indirection))) { - err = dmu_free_long_range(rwa->os, - drro->drr_object, 0, DMU_OBJECT_END); + err = dmu_free_long_range(rwa->os, drro->drr_object, + 0, DMU_OBJECT_END); if (err != 0) return (SET_ERROR(EINVAL)); else @@ -1477,7 +1666,8 @@ receive_freeobjects(struct receive_writer_arg *rwa, return (SET_ERROR(EINVAL)); for (obj = drrfo->drr_firstobj == 0 ? 1 : drrfo->drr_firstobj; - obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0; + obj < drrfo->drr_firstobj + drrfo->drr_numobjs && + obj < DN_MAX_OBJECT && next_err == 0; next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) { dmu_object_info_t doi; int err; @@ -1492,9 +1682,6 @@ receive_freeobjects(struct receive_writer_arg *rwa, if (err != 0) return (err); - - if (obj > rwa->max_object) - rwa->max_object = obj; } if (next_err != ESRCH) return (next_err); @@ -1548,6 +1735,7 @@ receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, DRR_WRITE_PAYLOAD_SIZE(drrw)); } + /* use the bonus buf to look up the dnode in dmu_assign_arcbuf */ VERIFY0(dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn)); err = dmu_assign_arcbuf_by_dnode(dn, drrw->drr_offset, abuf, tx); if (err != 0) { @@ -1576,7 +1764,7 @@ receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, * finds the earlier copy of the data, and uses that copy instead of * data from the stream to fulfill this write. */ -static int +noinline static int receive_write_byref(struct receive_writer_arg *rwa, struct drr_write_byref *drrwbr) { @@ -1696,7 +1884,6 @@ receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs, dmu_tx_t *tx; dmu_buf_t *db, *db_spill; int err; - uint32_t flags = 0; if (drrs->drr_length < SPA_MINBLOCKSIZE || drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os))) @@ -1718,8 +1905,6 @@ receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs, drrs->drr_compressiontype >= ZIO_COMPRESS_FUNCTIONS || drrs->drr_compressed_size == 0) return (SET_ERROR(EINVAL)); - - flags |= DMU_READ_NO_DECRYPT; } if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0) @@ -1781,7 +1966,7 @@ receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf) { int err; - if (drrf->drr_length != DMU_OBJECT_END && + if (drrf->drr_length != -1ULL && drrf->drr_offset + drrf->drr_length < drrf->drr_offset) return (SET_ERROR(EINVAL)); @@ -1846,6 +2031,22 @@ receive_object_range(struct receive_writer_arg *rwa, return (0); } +/* + * Until we have the ability to redact large ranges of data efficiently, we + * process these records as frees. + */ +/* ARGSUSED */ +noinline static int +receive_redact(struct receive_writer_arg *rwa, struct drr_redact *drrr) +{ + struct drr_free drrf = {0}; + drrf.drr_length = drrr->drr_length; + drrf.drr_object = drrr->drr_object; + drrf.drr_offset = drrr->drr_offset; + drrf.drr_toguid = drrr->drr_toguid; + return (receive_free(rwa, &drrf)); +} + /* used to destroy the drc_ds on error */ static void dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc) @@ -1877,61 +2078,60 @@ dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc) } static void -receive_cksum(struct receive_arg *ra, int len, void *buf) +receive_cksum(dmu_recv_cookie_t *drc, int len, void *buf) { - if (ra->byteswap) { - (void) fletcher_4_incremental_byteswap(buf, len, &ra->cksum); + if (drc->drc_byteswap) { + (void) fletcher_4_incremental_byteswap(buf, len, + &drc->drc_cksum); } else { - (void) fletcher_4_incremental_native(buf, len, &ra->cksum); + (void) fletcher_4_incremental_native(buf, len, &drc->drc_cksum); } } /* * Read the payload into a buffer of size len, and update the current record's * payload field. - * Allocate ra->next_rrd and read the next record's header into - * ra->next_rrd->header. + * Allocate drc->drc_next_rrd and read the next record's header into + * drc->drc_next_rrd->header. * Verify checksum of payload and next record. */ static int -receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf) +receive_read_payload_and_next_header(dmu_recv_cookie_t *drc, int len, void *buf) { int err; - zio_cksum_t cksum_orig; - zio_cksum_t *cksump; if (len != 0) { ASSERT3U(len, <=, SPA_MAXBLOCKSIZE); - err = receive_read(ra, len, buf); + err = receive_read(drc, len, buf); if (err != 0) return (err); - receive_cksum(ra, len, buf); + receive_cksum(drc, len, buf); /* note: rrd is NULL when reading the begin record's payload */ - if (ra->rrd != NULL) { - ra->rrd->payload = buf; - ra->rrd->payload_size = len; - ra->rrd->bytes_read = ra->bytes_read; + if (drc->drc_rrd != NULL) { + drc->drc_rrd->payload = buf; + drc->drc_rrd->payload_size = len; + drc->drc_rrd->bytes_read = drc->drc_bytes_read; } } else { ASSERT3P(buf, ==, NULL); } - ra->prev_cksum = ra->cksum; + drc->drc_prev_cksum = drc->drc_cksum; - ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP); - err = receive_read(ra, sizeof (ra->next_rrd->header), - &ra->next_rrd->header); - ra->next_rrd->bytes_read = ra->bytes_read; + drc->drc_next_rrd = kmem_zalloc(sizeof (*drc->drc_next_rrd), KM_SLEEP); + err = receive_read(drc, sizeof (drc->drc_next_rrd->header), + &drc->drc_next_rrd->header); + drc->drc_next_rrd->bytes_read = drc->drc_bytes_read; if (err != 0) { - kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); - ra->next_rrd = NULL; + kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd)); + drc->drc_next_rrd = NULL; return (err); } - if (ra->next_rrd->header.drr_type == DRR_BEGIN) { - kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); - ra->next_rrd = NULL; + if (drc->drc_next_rrd->header.drr_type == DRR_BEGIN) { + kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd)); + drc->drc_next_rrd = NULL; return (SET_ERROR(EINVAL)); } @@ -1941,90 +2141,30 @@ receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf) */ ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); - receive_cksum(ra, + receive_cksum(drc, offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), - &ra->next_rrd->header); + &drc->drc_next_rrd->header); - cksum_orig = ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; - cksump = &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; + zio_cksum_t cksum_orig = + drc->drc_next_rrd->header.drr_u.drr_checksum.drr_checksum; + zio_cksum_t *cksump = + &drc->drc_next_rrd->header.drr_u.drr_checksum.drr_checksum; - if (ra->byteswap) - byteswap_record(&ra->next_rrd->header); + if (drc->drc_byteswap) + byteswap_record(&drc->drc_next_rrd->header); if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) && - !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) { - kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); - ra->next_rrd = NULL; + !ZIO_CHECKSUM_EQUAL(drc->drc_cksum, *cksump)) { + kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd)); + drc->drc_next_rrd = NULL; return (SET_ERROR(ECKSUM)); } - receive_cksum(ra, sizeof (cksum_orig), &cksum_orig); + receive_cksum(drc, sizeof (cksum_orig), &cksum_orig); return (0); } -static void -objlist_create(struct objlist *list) -{ - list_create(&list->list, sizeof (struct receive_objnode), - offsetof(struct receive_objnode, node)); - list->last_lookup = 0; -} - -static void -objlist_destroy(struct objlist *list) -{ - for (struct receive_objnode *n = list_remove_head(&list->list); - n != NULL; n = list_remove_head(&list->list)) { - kmem_free(n, sizeof (*n)); - } - list_destroy(&list->list); -} - -/* - * This function looks through the objlist to see if the specified object number - * is contained in the objlist. In the process, it will remove all object - * numbers in the list that are smaller than the specified object number. Thus, - * any lookup of an object number smaller than a previously looked up object - * number will always return false; therefore, all lookups should be done in - * ascending order. - */ -static boolean_t -objlist_exists(struct objlist *list, uint64_t object) -{ - struct receive_objnode *node = list_head(&list->list); - ASSERT3U(object, >=, list->last_lookup); - list->last_lookup = object; - while (node != NULL && node->object < object) { - VERIFY3P(node, ==, list_remove_head(&list->list)); - kmem_free(node, sizeof (*node)); - node = list_head(&list->list); - } - return (node != NULL && node->object == object); -} - -/* - * The objlist is a list of object numbers stored in ascending order. However, - * the insertion of new object numbers does not seek out the correct location to - * store a new object number; instead, it appends it to the list for simplicity. - * Thus, any users must take care to only insert new object numbers in ascending - * order. - */ -static void -objlist_insert(struct objlist *list, uint64_t object) -{ - struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP); - node->object = object; -#ifdef ZFS_DEBUG - { - struct receive_objnode *last_object = list_tail(&list->list); - uint64_t last_objnum = (last_object != NULL ? last_object->object : 0); - ASSERT3U(node->object, >, last_objnum); - } -#endif - list_insert_tail(&list->list, node); -} - /* * Issue the prefetch reads for any necessary indirect blocks. * @@ -2044,11 +2184,11 @@ objlist_insert(struct objlist *list, uint64_t object) */ /* ARGSUSED */ static void -receive_read_prefetch(struct receive_arg *ra, - uint64_t object, uint64_t offset, uint64_t length) +receive_read_prefetch(dmu_recv_cookie_t *drc, uint64_t object, uint64_t offset, + uint64_t length) { - if (!objlist_exists(&ra->ignore_objlist, object)) { - dmu_prefetch(ra->os, object, 1, offset, length, + if (!objlist_exists(drc->drc_ignore_objlist, object)) { + dmu_prefetch(drc->drc_os, object, 1, offset, length, ZIO_PRIORITY_SYNC_READ); } } @@ -2057,14 +2197,15 @@ receive_read_prefetch(struct receive_arg *ra, * Read records off the stream, issuing any necessary prefetches. */ static int -receive_read_record(struct receive_arg *ra) +receive_read_record(dmu_recv_cookie_t *drc) { int err; - switch (ra->rrd->header.drr_type) { + switch (drc->drc_rrd->header.drr_type) { case DRR_OBJECT: { - struct drr_object *drro = &ra->rrd->header.drr_u.drr_object; + struct drr_object *drro = + &drc->drc_rrd->header.drr_u.drr_object; uint32_t size = DRR_OBJECT_PAYLOAD_SIZE(drro); void *buf = NULL; dmu_object_info_t doi; @@ -2072,40 +2213,41 @@ receive_read_record(struct receive_arg *ra) if (size != 0) buf = kmem_zalloc(size, KM_SLEEP); - err = receive_read_payload_and_next_header(ra, size, buf); + err = receive_read_payload_and_next_header(drc, size, buf); if (err != 0) { kmem_free(buf, size); return (err); } - err = dmu_object_info(ra->os, drro->drr_object, &doi); + err = dmu_object_info(drc->drc_os, drro->drr_object, &doi); /* * See receive_read_prefetch for an explanation why we're * storing this object in the ignore_obj_list. */ if (err == ENOENT || err == EEXIST || (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) { - objlist_insert(&ra->ignore_objlist, drro->drr_object); + objlist_insert(drc->drc_ignore_objlist, + drro->drr_object); err = 0; } return (err); } case DRR_FREEOBJECTS: { - err = receive_read_payload_and_next_header(ra, 0, NULL); + err = receive_read_payload_and_next_header(drc, 0, NULL); return (err); } case DRR_WRITE: { - struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write; + struct drr_write *drrw = &drc->drc_rrd->header.drr_u.drr_write; arc_buf_t *abuf; boolean_t is_meta = DMU_OT_IS_METADATA(drrw->drr_type); - if (ra->raw) { + if (drc->drc_raw) { boolean_t byteorder = ZFS_HOST_BYTEORDER ^ !!DRR_IS_RAW_BYTESWAPPED(drrw->drr_flags) ^ - ra->byteswap; + drc->drc_byteswap; - abuf = arc_loan_raw_buf(dmu_objset_spa(ra->os), + abuf = arc_loan_raw_buf(dmu_objset_spa(drc->drc_os), drrw->drr_object, byteorder, drrw->drr_salt, drrw->drr_iv, drrw->drr_mac, drrw->drr_type, drrw->drr_compressed_size, drrw->drr_logical_size, @@ -2116,109 +2258,110 @@ receive_read_record(struct receive_arg *ra) drrw->drr_compressed_size); ASSERT(!is_meta); abuf = arc_loan_compressed_buf( - dmu_objset_spa(ra->os), + dmu_objset_spa(drc->drc_os), drrw->drr_compressed_size, drrw->drr_logical_size, drrw->drr_compressiontype); } else { - abuf = arc_loan_buf(dmu_objset_spa(ra->os), + abuf = arc_loan_buf(dmu_objset_spa(drc->drc_os), is_meta, drrw->drr_logical_size); } - err = receive_read_payload_and_next_header(ra, + err = receive_read_payload_and_next_header(drc, DRR_WRITE_PAYLOAD_SIZE(drrw), abuf->b_data); if (err != 0) { dmu_return_arcbuf(abuf); return (err); } - ra->rrd->arc_buf = abuf; - receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset, + drc->drc_rrd->arc_buf = abuf; + receive_read_prefetch(drc, drrw->drr_object, drrw->drr_offset, drrw->drr_logical_size); return (err); } case DRR_WRITE_BYREF: { struct drr_write_byref *drrwb = - &ra->rrd->header.drr_u.drr_write_byref; - err = receive_read_payload_and_next_header(ra, 0, NULL); - receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset, + &drc->drc_rrd->header.drr_u.drr_write_byref; + err = receive_read_payload_and_next_header(drc, 0, NULL); + receive_read_prefetch(drc, drrwb->drr_object, drrwb->drr_offset, drrwb->drr_length); return (err); } case DRR_WRITE_EMBEDDED: { struct drr_write_embedded *drrwe = - &ra->rrd->header.drr_u.drr_write_embedded; + &drc->drc_rrd->header.drr_u.drr_write_embedded; uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8); void *buf = kmem_zalloc(size, KM_SLEEP); - err = receive_read_payload_and_next_header(ra, size, buf); + err = receive_read_payload_and_next_header(drc, size, buf); if (err != 0) { kmem_free(buf, size); return (err); } - receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset, + receive_read_prefetch(drc, drrwe->drr_object, drrwe->drr_offset, drrwe->drr_length); return (err); } case DRR_FREE: + case DRR_REDACT: { /* * It might be beneficial to prefetch indirect blocks here, but * we don't really have the data to decide for sure. */ - err = receive_read_payload_and_next_header(ra, 0, NULL); + err = receive_read_payload_and_next_header(drc, 0, NULL); return (err); } case DRR_END: { - struct drr_end *drre = &ra->rrd->header.drr_u.drr_end; - if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum)) + struct drr_end *drre = &drc->drc_rrd->header.drr_u.drr_end; + if (!ZIO_CHECKSUM_EQUAL(drc->drc_prev_cksum, + drre->drr_checksum)) return (SET_ERROR(ECKSUM)); return (0); } case DRR_SPILL: { - struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill; + struct drr_spill *drrs = &drc->drc_rrd->header.drr_u.drr_spill; arc_buf_t *abuf; - int len = DRR_SPILL_PAYLOAD_SIZE(drrs); - /* DRR_SPILL records are either raw or uncompressed */ - if (ra->raw) { + if (drc->drc_raw) { boolean_t byteorder = ZFS_HOST_BYTEORDER ^ !!DRR_IS_RAW_BYTESWAPPED(drrs->drr_flags) ^ - ra->byteswap; + drc->drc_byteswap; - abuf = arc_loan_raw_buf(dmu_objset_spa(ra->os), - dmu_objset_id(ra->os), byteorder, drrs->drr_salt, + abuf = arc_loan_raw_buf(dmu_objset_spa(drc->drc_os), + drrs->drr_object, byteorder, drrs->drr_salt, drrs->drr_iv, drrs->drr_mac, drrs->drr_type, drrs->drr_compressed_size, drrs->drr_length, drrs->drr_compressiontype); } else { - abuf = arc_loan_buf(dmu_objset_spa(ra->os), + abuf = arc_loan_buf(dmu_objset_spa(drc->drc_os), DMU_OT_IS_METADATA(drrs->drr_type), drrs->drr_length); } - - err = receive_read_payload_and_next_header(ra, len, - abuf->b_data); - if (err != 0) { + err = receive_read_payload_and_next_header(drc, + DRR_SPILL_PAYLOAD_SIZE(drrs), abuf->b_data); + if (err != 0) dmu_return_arcbuf(abuf); - return (err); - } - ra->rrd->arc_buf = abuf; + else + drc->drc_rrd->arc_buf = abuf; return (err); } case DRR_OBJECT_RANGE: { - err = receive_read_payload_and_next_header(ra, 0, NULL); + err = receive_read_payload_and_next_header(drc, 0, NULL); return (err); + } default: return (SET_ERROR(EINVAL)); } } + + static void dprintf_drr(struct receive_record_arg *rrd, int err) { @@ -2382,7 +2525,6 @@ receive_process_record(struct receive_writer_arg *rwa, { struct drr_spill *drrs = &rrd->header.drr_u.drr_spill; err = receive_spill(rwa, drrs, rrd->arc_buf); - /* if receive_spill() is successful, it consumes the arc_buf */ if (err != 0) dmu_return_arcbuf(rrd->arc_buf); rrd->arc_buf = NULL; @@ -2396,6 +2538,12 @@ receive_process_record(struct receive_writer_arg *rwa, err = receive_object_range(rwa, drror); break; } + case DRR_REDACT: + { + struct drr_redact *drrr = &rrd->header.drr_u.drr_redact; + err = receive_redact(rwa, drrr); + break; + } default: err = (SET_ERROR(EINVAL)); } @@ -2446,11 +2594,11 @@ receive_writer_thread(void *arg) } static int -resume_check(struct receive_arg *ra, nvlist_t *begin_nvl) +resume_check(dmu_recv_cookie_t *drc, nvlist_t *begin_nvl) { uint64_t val; - objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset; - uint64_t dsobj = dmu_objset_id(ra->os); + objset_t *mos = dmu_objset_pool(drc->drc_os)->dp_meta_objset; + uint64_t dsobj = dmu_objset_id(drc->drc_os); uint64_t resume_obj, resume_off; if (nvlist_lookup_uint64(begin_nvl, @@ -2484,33 +2632,21 @@ resume_check(struct receive_arg *ra, nvlist_t *begin_nvl) * NB: callers *must* call dmu_recv_end() if this succeeds. */ int -dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp, - int cleanup_fd, uint64_t *action_handlep) +dmu_recv_stream(dmu_recv_cookie_t *drc, int cleanup_fd, + uint64_t *action_handlep, offset_t *voffp) { int err = 0; - struct receive_arg *ra; - struct receive_writer_arg *rwa; - int featureflags; - uint32_t payloadlen; - void *payload; - nvlist_t *begin_nvl = NULL; - - ra = kmem_zalloc(sizeof (*ra), KM_SLEEP); - rwa = kmem_zalloc(sizeof (*rwa), KM_SLEEP); - - ra->byteswap = drc->drc_byteswap; - ra->raw = drc->drc_raw; - ra->cksum = drc->drc_cksum; - ra->vp = vp; - ra->voff = *voffp; + struct receive_writer_arg *rwa = kmem_zalloc(sizeof (*rwa), KM_SLEEP); if (dsl_dataset_is_zapified(drc->drc_ds)) { + uint64_t bytes; (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset, drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES, - sizeof (ra->bytes_read), 1, &ra->bytes_read); + sizeof (bytes), 1, &bytes); + drc->drc_bytes_read += bytes; } - objlist_create(&ra->ignore_objlist); + drc->drc_ignore_objlist = objlist_create(); /* these were verified in dmu_recv_begin */ ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==, @@ -2520,18 +2656,13 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp, /* * Open the objset we are modifying. */ - VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra->os)); - + VERIFY0(dmu_objset_from_ds(drc->drc_ds, &drc->drc_os)); ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT); - - featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo); - ra->featureflags = featureflags; - - ASSERT0(ra->os->os_encrypted && - (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)); + ASSERT0(drc->drc_os->os_encrypted && + (drc->drc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)); /* if this stream is dedup'ed, set up the avl tree for guid mapping */ - if (featureflags & DMU_BACKUP_FEATURE_DEDUP) { + if (drc->drc_featureflags & DMU_BACKUP_FEATURE_DEDUP) { minor_t minor; if (cleanup_fd == -1) { @@ -2565,32 +2696,15 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp, drc->drc_guid_to_ds_map = rwa->guid_to_ds_map; } - payloadlen = drc->drc_drr_begin->drr_payloadlen; - payload = NULL; - if (payloadlen != 0) - payload = kmem_alloc(payloadlen, KM_SLEEP); - - err = receive_read_payload_and_next_header(ra, payloadlen, payload); - if (err != 0) { - if (payloadlen != 0) - kmem_free(payload, payloadlen); - goto out; - } - if (payloadlen != 0) { - err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP); - kmem_free(payload, payloadlen); - if (err != 0) - goto out; - } - /* handle DSL encryption key payload */ - if (featureflags & DMU_BACKUP_FEATURE_RAW) { + if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) { nvlist_t *keynvl = NULL; - ASSERT(ra->os->os_encrypted); + ASSERT(drc->drc_os->os_encrypted); ASSERT(drc->drc_raw); - err = nvlist_lookup_nvlist(begin_nvl, "crypt_keydata", &keynvl); + err = nvlist_lookup_nvlist(drc->drc_begin_nvl, "crypt_keydata", + &keynvl); if (err != 0) goto out; @@ -2600,7 +2714,7 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp, * are sure the rest of the receive succeeded so we stash * the keynvl away until then. */ - err = dsl_crypto_recv_raw(spa_name(ra->os->os_spa), + err = dsl_crypto_recv_raw(spa_name(drc->drc_os->os_spa), drc->drc_ds->ds_object, drc->drc_fromsnapobj, drc->drc_drrb->drr_type, keynvl, drc->drc_newfs); if (err != 0) @@ -2615,18 +2729,18 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp, drc->drc_keynvl = fnvlist_dup(keynvl); } - if (featureflags & DMU_BACKUP_FEATURE_RESUMING) { - err = resume_check(ra, begin_nvl); + if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING) { + err = resume_check(drc, drc->drc_begin_nvl); if (err != 0) goto out; } - (void) bqueue_init(&rwa->q, + (void) bqueue_init(&rwa->q, zfs_recv_queue_ff, MAX(zfs_recv_queue_length, 2 * zfs_max_recordsize), offsetof(struct receive_record_arg, node)); cv_init(&rwa->cv, NULL, CV_DEFAULT, NULL); mutex_init(&rwa->mutex, NULL, MUTEX_DEFAULT, NULL); - rwa->os = ra->os; + rwa->os = drc->drc_os; rwa->byteswap = drc->drc_byteswap; rwa->resumable = drc->drc_resumable; rwa->raw = drc->drc_raw; @@ -2645,10 +2759,10 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp, * We can leave this loop in 3 ways: First, if rwa->err is * non-zero. In that case, the writer thread will free the rrd we just * pushed. Second, if we're interrupted; in that case, either it's the - * first loop and ra->rrd was never allocated, or it's later and ra->rrd - * has been handed off to the writer thread who will free it. Finally, - * if receive_read_record fails or we're at the end of the stream, then - * we free ra->rrd and exit. + * first loop and drc->drc_rrd was never allocated, or it's later, and + * drc->drc_rrd has been handed off to the writer thread who will free + * it. Finally, if receive_read_record fails or we're at the end of the + * stream, then we free drc->drc_rrd and exit. */ while (rwa->err == 0) { if (issig(JUSTLOOKING) && issig(FORREAL)) { @@ -2656,30 +2770,36 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp, break; } - ASSERT3P(ra->rrd, ==, NULL); - ra->rrd = ra->next_rrd; - ra->next_rrd = NULL; - /* Allocates and loads header into ra->next_rrd */ - err = receive_read_record(ra); + ASSERT3P(drc->drc_rrd, ==, NULL); + drc->drc_rrd = drc->drc_next_rrd; + drc->drc_next_rrd = NULL; + /* Allocates and loads header into drc->drc_next_rrd */ + err = receive_read_record(drc); - if (ra->rrd->header.drr_type == DRR_END || err != 0) { - kmem_free(ra->rrd, sizeof (*ra->rrd)); - ra->rrd = NULL; + if (drc->drc_rrd->header.drr_type == DRR_END || err != 0) { + kmem_free(drc->drc_rrd, sizeof (*drc->drc_rrd)); + drc->drc_rrd = NULL; break; } - bqueue_enqueue(&rwa->q, ra->rrd, - sizeof (struct receive_record_arg) + ra->rrd->payload_size); - ra->rrd = NULL; + bqueue_enqueue(&rwa->q, drc->drc_rrd, + sizeof (struct receive_record_arg) + + drc->drc_rrd->payload_size); + drc->drc_rrd = NULL; } - ASSERT3P(ra->rrd, ==, NULL); - ra->rrd = kmem_zalloc(sizeof (*ra->rrd), KM_SLEEP); - ra->rrd->eos_marker = B_TRUE; - bqueue_enqueue(&rwa->q, ra->rrd, 1); + + ASSERT3P(drc->drc_rrd, ==, NULL); + drc->drc_rrd = kmem_zalloc(sizeof (*drc->drc_rrd), KM_SLEEP); + drc->drc_rrd->eos_marker = B_TRUE; + bqueue_enqueue_flush(&rwa->q, drc->drc_rrd, 1); mutex_enter(&rwa->mutex); while (!rwa->done) { - cv_wait(&rwa->cv, &rwa->mutex); + /* + * We need to use cv_wait_sig() so that any process that may + * be sleeping here can still fork. + */ + (void) cv_wait_sig(&rwa->cv, &rwa->mutex); } mutex_exit(&rwa->mutex); @@ -2721,11 +2841,13 @@ out: * we need to clean up the next_rrd we create by processing the * DRR_BEGIN record. */ - if (ra->next_rrd != NULL) - kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); + if (drc->drc_next_rrd != NULL) + kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd)); - nvlist_free(begin_nvl); - if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1)) + kmem_free(rwa, sizeof (*rwa)); + nvlist_free(drc->drc_begin_nvl); + if ((drc->drc_featureflags & DMU_BACKUP_FEATURE_DEDUP) && + (cleanup_fd != -1)) zfs_onexit_fd_rele(cleanup_fd); if (err != 0) { @@ -2738,10 +2860,9 @@ out: nvlist_free(drc->drc_keynvl); } - *voffp = ra->voff; - objlist_destroy(&ra->ignore_objlist); - kmem_free(ra, sizeof (*ra)); - kmem_free(rwa, sizeof (*rwa)); + objlist_destroy(drc->drc_ignore_objlist); + drc->drc_ignore_objlist = NULL; + *voffp = drc->drc_voff; return (err); } @@ -2866,7 +2987,8 @@ dmu_recv_end_sync(void *arg, dmu_tx_t *tx) drc->drc_keynvl = NULL; } - VERIFY3P(drc->drc_ds->ds_prev, ==, origin_head->ds_prev); + VERIFY3P(drc->drc_ds->ds_prev, ==, + origin_head->ds_prev); dsl_dataset_clone_swap_sync_impl(drc->drc_ds, origin_head, tx); @@ -2923,6 +3045,8 @@ dmu_recv_end_sync(void *arg, dmu_tx_t *tx) DS_FIELD_RESUME_TOGUID, tx); (void) zap_remove(dp->dp_meta_objset, ds->ds_object, DS_FIELD_RESUME_TONAME, tx); + (void) zap_remove(dp->dp_meta_objset, ds->ds_object, + DS_FIELD_RESUME_REDACT_BOOKMARK_SNAPS, tx); } drc->drc_newsnapobj = dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj; @@ -2982,6 +3106,7 @@ add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj, return (err); gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP); err = dsl_dataset_own_obj(dp, snapobj, dsflags, gmep, &snapds); + if (err == 0) { /* * If this is a deduplicated raw send stream, we need @@ -3075,4 +3200,7 @@ dmu_objset_is_receiving(objset_t *os) #if defined(_KERNEL) module_param(zfs_recv_queue_length, int, 0644); MODULE_PARM_DESC(zfs_recv_queue_length, "Maximum receive queue length"); + +module_param(zfs_recv_queue_ff, int, 0644); +MODULE_PARM_DESC(zfs_recv_queue_ff, "Receive queue fill fraction"); #endif |