diff options
author | Matthew Ahrens <[email protected]> | 2020-03-16 11:51:56 -0700 |
---|---|---|
committer | GitHub <[email protected]> | 2020-03-16 11:51:56 -0700 |
commit | 7261fc2e8136f99b89851fa853f7862db45437c3 (patch) | |
tree | 3f0cc43fa090ad8ca1fbdadb1306f0d80ed2c2b8 | |
parent | c94fb10917c404ecdba53a46dd7d497cdeadbc30 (diff) |
Improve zfs receive performance by batching writes
For each WRITE record in the stream, `zfs receive` creates a DMU
transaction (`dmu_tx_create()`) and writes this block's data into the
object. If per-block overheads (as opposed to per-byte overheads)
dominate performance (as is often the case with small recordsize), the
per-dmu-transaction overheads can be significant. For example, in some
workloads the `receieve_writer` thread is 100% on CPU, and more than
half of its CPU time is in these per-tx routines (e.g.
dmu_tx_hold_write, dmu_tx_assign, dmu_tx_commit).
To improve performance of `zfs receive`, this commit batches WRITE
records which are to nearby offsets of the same object, and uses one DMU
transaction to write them all. By default the batch size is 1MB, which
for recordsize=8K reduces the number of DMU transactions by 128x for
full send streams (incrementals will depend on how "clumpy" the changed
blocks are).
This commit improves the performance of `dd if=stream | zfs recv`
from 78,800 blocks/sec to 98,100 blocks/sec (25% improvement).
Reviewed-by: Paul Dagnelie <[email protected]>
Reviewed-by: Brian Behlendorf <[email protected]>
Signed-off-by: Matthew Ahrens <[email protected]>
Closes #10099
-rw-r--r-- | man/man5/zfs-module-parameters.5 | 14 | ||||
-rw-r--r-- | module/zfs/dmu_recv.c | 219 |
2 files changed, 182 insertions, 51 deletions
diff --git a/man/man5/zfs-module-parameters.5 b/man/man5/zfs-module-parameters.5 index 311d898ff..3c61ac1a9 100644 --- a/man/man5/zfs-module-parameters.5 +++ b/man/man5/zfs-module-parameters.5 @@ -2998,6 +2998,20 @@ Default value: \fB16,777,216\fR. .sp .ne 2 .na +\fBzfs_recv_write_batch_size\fR (int) +.ad +.RS 12n +The maximum amount of data (in bytes) that \fBzfs receive\fR will write in +one DMU transaction. This is the uncompressed size, even when receiving a +compressed send stream. This setting will not reduce the write size below +a single block. Capped at a maximum of 32MB +.sp +Default value: \fB1MB\fR. +.RE + +.sp +.ne 2 +.na \fBzfs_override_estimate_recordsize\fR (ulong) .ad .RS 12n diff --git a/module/zfs/dmu_recv.c b/module/zfs/dmu_recv.c index 0fa0e015b..62881753f 100644 --- a/module/zfs/dmu_recv.c +++ b/module/zfs/dmu_recv.c @@ -65,6 +65,7 @@ int zfs_recv_queue_length = SPA_MAXBLOCKSIZE; int zfs_recv_queue_ff = 20; +int zfs_recv_write_batch_size = 1024 * 1024; static char *dmu_recv_tag = "dmu_recv_tag"; const char *recv_clone_name = "%recv"; @@ -110,6 +111,8 @@ struct receive_writer_arg { uint64_t max_object; /* highest object ID referenced in stream */ uint64_t bytes_read; /* bytes read when current record created */ + list_t write_batch; + /* Encryption parameters for the last received DRR_OBJECT_RANGE */ boolean_t or_crypt_params_present; uint64_t or_firstobj; @@ -1698,13 +1701,108 @@ receive_freeobjects(struct receive_writer_arg *rwa, return (0); } -noinline static int -receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, - arc_buf_t *abuf) +/* + * Note: if this fails, the caller will clean up any records left on the + * rwa->write_batch list. + */ +static int +flush_write_batch_impl(struct receive_writer_arg *rwa) { - int err; - dmu_tx_t *tx; dnode_t *dn; + int err; + + if (dnode_hold(rwa->os, rwa->last_object, FTAG, &dn) != 0) + return (SET_ERROR(EINVAL)); + + struct receive_record_arg *last_rrd = list_tail(&rwa->write_batch); + struct drr_write *last_drrw = &last_rrd->header.drr_u.drr_write; + + struct receive_record_arg *first_rrd = list_head(&rwa->write_batch); + struct drr_write *first_drrw = &first_rrd->header.drr_u.drr_write; + + ASSERT3U(rwa->last_object, ==, last_drrw->drr_object); + ASSERT3U(rwa->last_offset, ==, last_drrw->drr_offset); + + dmu_tx_t *tx = dmu_tx_create(rwa->os); + dmu_tx_hold_write_by_dnode(tx, dn, first_drrw->drr_offset, + last_drrw->drr_offset - first_drrw->drr_offset + + last_drrw->drr_logical_size); + err = dmu_tx_assign(tx, TXG_WAIT); + if (err != 0) { + dmu_tx_abort(tx); + dnode_rele(dn, FTAG); + return (err); + } + + struct receive_record_arg *rrd; + while ((rrd = list_head(&rwa->write_batch)) != NULL) { + struct drr_write *drrw = &rrd->header.drr_u.drr_write; + arc_buf_t *abuf = rrd->arc_buf; + + ASSERT3U(drrw->drr_object, ==, rwa->last_object); + + if (rwa->byteswap && !arc_is_encrypted(abuf) && + arc_get_compression(abuf) == ZIO_COMPRESS_OFF) { + dmu_object_byteswap_t byteswap = + DMU_OT_BYTESWAP(drrw->drr_type); + dmu_ot_byteswap[byteswap].ob_func(abuf->b_data, + DRR_WRITE_PAYLOAD_SIZE(drrw)); + } + + err = dmu_assign_arcbuf_by_dnode(dn, + drrw->drr_offset, abuf, tx); + if (err != 0) { + /* + * This rrd is left on the list, so the caller will + * free it (and the arc_buf). + */ + break; + } + + /* + * Note: If the receive fails, we want the resume stream to + * start with the same record that we last successfully + * received (as opposed to the next record), so that we can + * verify that we are resuming from the correct location. + */ + save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx); + + list_remove(&rwa->write_batch, rrd); + kmem_free(rrd, sizeof (*rrd)); + } + + dmu_tx_commit(tx); + dnode_rele(dn, FTAG); + return (err); +} + +noinline static int +flush_write_batch(struct receive_writer_arg *rwa) +{ + if (list_is_empty(&rwa->write_batch)) + return (0); + int err = rwa->err; + if (err == 0) + err = flush_write_batch_impl(rwa); + if (err != 0) { + struct receive_record_arg *rrd; + while ((rrd = list_remove_head(&rwa->write_batch)) != NULL) { + dmu_return_arcbuf(rrd->arc_buf); + kmem_free(rrd, sizeof (*rrd)); + } + } + ASSERT(list_is_empty(&rwa->write_batch)); + return (err); +} + +noinline static int +receive_process_write_record(struct receive_writer_arg *rwa, + struct receive_record_arg *rrd) +{ + int err = 0; + + ASSERT3U(rrd->header.drr_type, ==, DRR_WRITE); + struct drr_write *drrw = &rrd->header.drr_u.drr_write; if (drrw->drr_offset + drrw->drr_logical_size < drrw->drr_offset || !DMU_OT_IS_VALID(drrw->drr_type)) @@ -1719,52 +1817,31 @@ receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, drrw->drr_offset < rwa->last_offset)) { return (SET_ERROR(EINVAL)); } + + struct receive_record_arg *first_rrd = list_head(&rwa->write_batch); + struct drr_write *first_drrw = &first_rrd->header.drr_u.drr_write; + uint64_t batch_size = + MIN(zfs_recv_write_batch_size, DMU_MAX_ACCESS / 2); + if (first_rrd != NULL && + (drrw->drr_object != first_drrw->drr_object || + drrw->drr_offset >= first_drrw->drr_offset + batch_size)) { + err = flush_write_batch(rwa); + if (err != 0) + return (err); + } + rwa->last_object = drrw->drr_object; rwa->last_offset = drrw->drr_offset; if (rwa->last_object > rwa->max_object) rwa->max_object = rwa->last_object; - if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0) - return (SET_ERROR(EINVAL)); - - tx = dmu_tx_create(rwa->os); - dmu_tx_hold_write(tx, drrw->drr_object, - drrw->drr_offset, drrw->drr_logical_size); - err = dmu_tx_assign(tx, TXG_WAIT); - if (err != 0) { - dmu_tx_abort(tx); - return (err); - } - - if (rwa->byteswap && !arc_is_encrypted(abuf) && - arc_get_compression(abuf) == ZIO_COMPRESS_OFF) { - dmu_object_byteswap_t byteswap = - DMU_OT_BYTESWAP(drrw->drr_type); - dmu_ot_byteswap[byteswap].ob_func(abuf->b_data, - DRR_WRITE_PAYLOAD_SIZE(drrw)); - } - - /* use the bonus buf to look up the dnode in dmu_assign_arcbuf */ - VERIFY0(dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn)); - err = dmu_assign_arcbuf_by_dnode(dn, drrw->drr_offset, abuf, tx); - if (err != 0) { - dnode_rele(dn, FTAG); - dmu_tx_commit(tx); - return (err); - } - dnode_rele(dn, FTAG); - + list_insert_tail(&rwa->write_batch, rrd); /* - * Note: If the receive fails, we want the resume stream to start - * with the same record that we last successfully received (as opposed - * to the next record), so that we can verify that we are - * resuming from the correct location. + * Return EAGAIN to indicate that we will use this rrd again, + * so the caller should not free it */ - save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx); - dmu_tx_commit(tx); - - return (0); + return (EAGAIN); } /* @@ -2482,6 +2559,22 @@ receive_process_record(struct receive_writer_arg *rwa, ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read); rwa->bytes_read = rrd->bytes_read; + if (rrd->header.drr_type != DRR_WRITE) { + err = flush_write_batch(rwa); + if (err != 0) { + if (rrd->arc_buf != NULL) { + dmu_return_arcbuf(rrd->arc_buf); + rrd->arc_buf = NULL; + rrd->payload = NULL; + } else if (rrd->payload != NULL) { + kmem_free(rrd->payload, rrd->payload_size); + rrd->payload = NULL; + } + + return (err); + } + } + switch (rrd->header.drr_type) { case DRR_OBJECT: { @@ -2500,13 +2593,17 @@ receive_process_record(struct receive_writer_arg *rwa, } case DRR_WRITE: { - struct drr_write *drrw = &rrd->header.drr_u.drr_write; - err = receive_write(rwa, drrw, rrd->arc_buf); - /* if receive_write() is successful, it consumes the arc_buf */ - if (err != 0) + err = receive_process_write_record(rwa, rrd); + if (err != EAGAIN) { + /* + * On success, receive_process_write_record() returns + * EAGAIN to indicate that we do not want to free + * the rrd or arc_buf. + */ + ASSERT(err != 0); dmu_return_arcbuf(rrd->arc_buf); - rrd->arc_buf = NULL; - rrd->payload = NULL; + rrd->arc_buf = NULL; + } break; } case DRR_WRITE_BYREF: @@ -2582,8 +2679,9 @@ receive_writer_thread(void *arg) * on the queue, but we need to clear everything in it before we * can exit. */ + int err = 0; if (rwa->err == 0) { - rwa->err = receive_process_record(rwa, rrd); + err = receive_process_record(rwa, rrd); } else if (rrd->arc_buf != NULL) { dmu_return_arcbuf(rrd->arc_buf); rrd->arc_buf = NULL; @@ -2592,9 +2690,22 @@ receive_writer_thread(void *arg) kmem_free(rrd->payload, rrd->payload_size); rrd->payload = NULL; } - kmem_free(rrd, sizeof (*rrd)); + /* + * EAGAIN indicates that this record has been saved (on + * raw->write_batch), and will be used again, so we don't + * free it. + */ + if (err != EAGAIN) { + rwa->err = err; + kmem_free(rrd, sizeof (*rrd)); + } } kmem_free(rrd, sizeof (*rrd)); + + int err = flush_write_batch(rwa); + if (rwa->err == 0) + rwa->err = err; + mutex_enter(&rwa->mutex); rwa->done = B_TRUE; cv_signal(&rwa->cv); @@ -2759,6 +2870,8 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, int cleanup_fd, rwa->raw = drc->drc_raw; rwa->spill = drc->drc_spill; rwa->os->os_raw_receive = drc->drc_raw; + list_create(&rwa->write_batch, sizeof (struct receive_record_arg), + offsetof(struct receive_record_arg, node.bqn_node)); (void) thread_create(NULL, 0, receive_writer_thread, rwa, 0, curproc, TS_RUN, minclsyspri); @@ -2845,6 +2958,7 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, int cleanup_fd, cv_destroy(&rwa->cv); mutex_destroy(&rwa->mutex); bqueue_destroy(&rwa->q); + list_destroy(&rwa->write_batch); if (err == 0) err = rwa->err; @@ -3236,4 +3350,7 @@ ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, INT, ZMOD_RW, ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, INT, ZMOD_RW, "Receive queue fill fraction"); + +ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, INT, ZMOD_RW, + "Maximum amount of writes to batch into one transaction"); /* END CSTYLED */ |