aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/sys/spa_impl.h3
-rw-r--r--include/sys/zfs_context.h1
-rw-r--r--lib/libzpool/taskq.c6
-rw-r--r--module/zfs/dmu_send.c40
-rw-r--r--module/zfs/spa.c25
5 files changed, 69 insertions, 6 deletions
diff --git a/include/sys/spa_impl.h b/include/sys/spa_impl.h
index 1b12b4e3a..47dfe432e 100644
--- a/include/sys/spa_impl.h
+++ b/include/sys/spa_impl.h
@@ -250,6 +250,9 @@ extern char *spa_config_path;
extern void spa_taskq_dispatch_ent(spa_t *spa, zio_type_t t, zio_taskq_type_t q,
task_func_t *func, void *arg, uint_t flags, taskq_ent_t *ent);
+extern void spa_taskq_dispatch_sync(spa_t *, zio_type_t t, zio_taskq_type_t q,
+ task_func_t *func, void *arg, uint_t flags);
+
#ifdef __cplusplus
}
diff --git a/include/sys/zfs_context.h b/include/sys/zfs_context.h
index a23bfdcf8..0b24216b5 100644
--- a/include/sys/zfs_context.h
+++ b/include/sys/zfs_context.h
@@ -409,6 +409,7 @@ extern int taskq_empty_ent(taskq_ent_t *);
extern void taskq_init_ent(taskq_ent_t *);
extern void taskq_destroy(taskq_t *);
extern void taskq_wait(taskq_t *);
+extern void taskq_wait_id(taskq_t *, taskqid_t);
extern int taskq_member(taskq_t *, kthread_t *);
extern int taskq_cancel_id(taskq_t *, taskqid_t);
extern void system_taskq_init(void);
diff --git a/lib/libzpool/taskq.c b/lib/libzpool/taskq.c
index 64e214205..96c0d5c2b 100644
--- a/lib/libzpool/taskq.c
+++ b/lib/libzpool/taskq.c
@@ -211,6 +211,12 @@ taskq_wait(taskq_t *tq)
mutex_exit(&tq->tq_lock);
}
+void
+taskq_wait_id(taskq_t *tq, taskqid_t id)
+{
+ taskq_wait(tq);
+}
+
static void
taskq_thread(void *arg)
{
diff --git a/module/zfs/dmu_send.c b/module/zfs/dmu_send.c
index 921c3d76f..b2c6bfe2b 100644
--- a/module/zfs/dmu_send.c
+++ b/module/zfs/dmu_send.c
@@ -39,6 +39,7 @@
#include <sys/dsl_prop.h>
#include <sys/dsl_pool.h>
#include <sys/dsl_synctask.h>
+#include <sys/spa_impl.h>
#include <sys/zfs_ioctl.h>
#include <sys/zap.h>
#include <sys/zio_checksum.h>
@@ -53,21 +54,48 @@ int zfs_send_corrupt_data = B_FALSE;
static char *dmu_recv_tag = "dmu_recv_tag";
-static int
-dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
+typedef struct dump_bytes_io {
+ dmu_sendarg_t *dbi_dsp;
+ void *dbi_buf;
+ int dbi_len;
+} dump_bytes_io_t;
+
+static void
+dump_bytes_strategy(void *arg)
{
+ dump_bytes_io_t *dbi = (dump_bytes_io_t *)arg;
+ dmu_sendarg_t *dsp = dbi->dbi_dsp;
dsl_dataset_t *ds = dsp->dsa_os->os_dsl_dataset;
ssize_t resid; /* have to get resid to get detailed errno */
- ASSERT3U(len % 8, ==, 0);
+ ASSERT3U(dbi->dbi_len % 8, ==, 0);
- fletcher_4_incremental_native(buf, len, &dsp->dsa_zc);
+ fletcher_4_incremental_native(dbi->dbi_buf, dbi->dbi_len, &dsp->dsa_zc);
dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
- (caddr_t)buf, len,
+ (caddr_t)dbi->dbi_buf, dbi->dbi_len,
0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
mutex_enter(&ds->ds_sendstream_lock);
- *dsp->dsa_off += len;
+ *dsp->dsa_off += dbi->dbi_len;
mutex_exit(&ds->ds_sendstream_lock);
+}
+
+static int
+dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
+{
+ dump_bytes_io_t dbi;
+
+ dbi.dbi_dsp = dsp;
+ dbi.dbi_buf = buf;
+ dbi.dbi_len = len;
+
+ /*
+ * The vn_rdwr() call is performed in a taskq to ensure that there is
+ * always enough stack space to write safely to the target filesystem.
+ * The ZIO_TYPE_FREE threads are used because there can be a lot of
+ * them and they are used in vdev_file.c for a similar purpose.
+ */
+ spa_taskq_dispatch_sync(dmu_objset_spa(dsp->dsa_os), ZIO_TYPE_FREE,
+ ZIO_TASKQ_ISSUE, dump_bytes_strategy, &dbi, TQ_SLEEP);
return (dsp->dsa_err);
}
diff --git a/module/zfs/spa.c b/module/zfs/spa.c
index e022c3258..82ee445ab 100644
--- a/module/zfs/spa.c
+++ b/module/zfs/spa.c
@@ -926,6 +926,31 @@ spa_taskq_dispatch_ent(spa_t *spa, zio_type_t t, zio_taskq_type_t q,
taskq_dispatch_ent(tq, func, arg, flags, ent);
}
+/*
+ * Same as spa_taskq_dispatch_ent() but block on the task until completion.
+ */
+void
+spa_taskq_dispatch_sync(spa_t *spa, zio_type_t t, zio_taskq_type_t q,
+ task_func_t *func, void *arg, uint_t flags)
+{
+ spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q];
+ taskq_t *tq;
+ taskqid_t id;
+
+ ASSERT3P(tqs->stqs_taskq, !=, NULL);
+ ASSERT3U(tqs->stqs_count, !=, 0);
+
+ if (tqs->stqs_count == 1) {
+ tq = tqs->stqs_taskq[0];
+ } else {
+ tq = tqs->stqs_taskq[gethrtime() % tqs->stqs_count];
+ }
+
+ id = taskq_dispatch(tq, func, arg, flags);
+ if (id)
+ taskq_wait_id(tq, id);
+}
+
static void
spa_create_zio_taskqs(spa_t *spa)
{