aboutsummaryrefslogtreecommitdiffstats
path: root/cmd/zstream
diff options
context:
space:
mode:
authorRob Norris <[email protected]>2024-07-05 15:01:57 +1000
committerTony Hutter <[email protected]>2024-08-22 16:22:24 -0700
commitb4d81b1a6a8c1254910f7e8b48e2f58fe77b769a (patch)
tree1d057595f6decb4b0e0daed38940e31307f1f14c /cmd/zstream
parent5eede0d5fde556107321fae6b41d6f83eeaf28a1 (diff)
zstream: use zio_compress calls for compression
This is updating zstream to use the zio_compress calls rather than using its own dispatch. Since that was fairly entangled, some refactoring included. Sponsored-by: Klara, Inc. Sponsored-by: Wasabi Technology, Inc. Signed-off-by: Rob Norris <[email protected]>
Diffstat (limited to 'cmd/zstream')
-rw-r--r--cmd/zstream/zstream_decompress.c123
-rw-r--r--cmd/zstream/zstream_recompress.c93
2 files changed, 98 insertions, 118 deletions
diff --git a/cmd/zstream/zstream_decompress.c b/cmd/zstream/zstream_decompress.c
index 0cef36c04..f5f66080d 100644
--- a/cmd/zstream/zstream_decompress.c
+++ b/cmd/zstream/zstream_decompress.c
@@ -22,6 +22,8 @@
/*
* Copyright 2022 Axcient. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2024, Klara, Inc.
*/
#include <err.h>
@@ -257,83 +259,64 @@ zstream_do_decompress(int argc, char *argv[])
ENTRY e = {.key = key};
p = hsearch(e, FIND);
- if (p != NULL) {
- zio_decompress_func_t *xfunc = NULL;
- switch ((enum zio_compress)(intptr_t)p->data) {
- case ZIO_COMPRESS_OFF:
- xfunc = NULL;
- break;
- case ZIO_COMPRESS_LZJB:
- xfunc = lzjb_decompress;
- break;
- case ZIO_COMPRESS_GZIP_1:
- xfunc = gzip_decompress;
- break;
- case ZIO_COMPRESS_ZLE:
- xfunc = zle_decompress;
- break;
- case ZIO_COMPRESS_LZ4:
- xfunc = lz4_decompress_zfs;
- break;
- case ZIO_COMPRESS_ZSTD:
- xfunc = zfs_zstd_decompress;
- break;
- default:
- assert(B_FALSE);
- }
-
-
+ if (p == NULL) {
/*
- * Read and decompress the block
+ * Read the contents of the block unaltered
*/
- char *lzbuf = safe_calloc(payload_size);
- (void) sfread(lzbuf, payload_size, stdin);
- if (xfunc == NULL) {
- memcpy(buf, lzbuf, payload_size);
- drrw->drr_compressiontype =
- ZIO_COMPRESS_OFF;
- if (verbose)
- fprintf(stderr, "Resetting "
- "compression type to off "
- "for ino %llu offset "
- "%llu\n",
- (u_longlong_t)
- drrw->drr_object,
- (u_longlong_t)
- drrw->drr_offset);
- } else if (0 != xfunc(lzbuf, buf,
- payload_size, payload_size, 0)) {
- /*
- * The block must not be compressed,
- * at least not with this compression
- * type, possibly because it gets
- * written multiple times in this
- * stream.
- */
- warnx("decompression failed for "
- "ino %llu offset %llu",
- (u_longlong_t)drrw->drr_object,
- (u_longlong_t)drrw->drr_offset);
- memcpy(buf, lzbuf, payload_size);
- } else if (verbose) {
- drrw->drr_compressiontype =
- ZIO_COMPRESS_OFF;
- fprintf(stderr, "successfully "
- "decompressed ino %llu "
- "offset %llu\n",
+ (void) sfread(buf, payload_size, stdin);
+ break;
+ }
+
+ /*
+ * Read and decompress the block
+ */
+ enum zio_compress c =
+ (enum zio_compress)(intptr_t)p->data;
+
+ if (c == ZIO_COMPRESS_OFF) {
+ (void) sfread(buf, payload_size, stdin);
+ drrw->drr_compressiontype = ZIO_COMPRESS_OFF;
+ if (verbose)
+ fprintf(stderr,
+ "Resetting compression type to "
+ "off for ino %llu offset %llu\n",
(u_longlong_t)drrw->drr_object,
(u_longlong_t)drrw->drr_offset);
- } else {
- drrw->drr_compressiontype =
- ZIO_COMPRESS_OFF;
- }
- free(lzbuf);
- } else {
+ break;
+ }
+
+ char *lzbuf = safe_calloc(payload_size);
+ (void) sfread(lzbuf, payload_size, stdin);
+
+ abd_t sabd;
+ abd_get_from_buf_struct(&sabd, lzbuf, payload_size);
+ int err = zio_decompress_data(c, &sabd, buf,
+ payload_size, payload_size, NULL);
+ abd_free(&sabd);
+
+ if (err != 0) {
/*
- * Read the contents of the block unaltered
+ * The block must not be compressed, at least
+ * not with this compression type, possibly
+ * because it gets written multiple times in
+ * this stream.
*/
- (void) sfread(buf, payload_size, stdin);
+ warnx("decompression failed for "
+ "ino %llu offset %llu",
+ (u_longlong_t)drrw->drr_object,
+ (u_longlong_t)drrw->drr_offset);
+ memcpy(buf, lzbuf, payload_size);
+ } else if (verbose) {
+ drrw->drr_compressiontype = ZIO_COMPRESS_OFF;
+ fprintf(stderr, "successfully decompressed "
+ "ino %llu offset %llu\n",
+ (u_longlong_t)drrw->drr_object,
+ (u_longlong_t)drrw->drr_offset);
+ } else {
+ drrw->drr_compressiontype = ZIO_COMPRESS_OFF;
}
+
+ free(lzbuf);
break;
}
diff --git a/cmd/zstream/zstream_recompress.c b/cmd/zstream/zstream_recompress.c
index f9e01d1aa..0e5cc9cd8 100644
--- a/cmd/zstream/zstream_recompress.c
+++ b/cmd/zstream/zstream_recompress.c
@@ -22,10 +22,9 @@
/*
* Copyright 2022 Axcient. All rights reserved.
* Use is subject to license terms.
- */
-
-/*
+ *
* Copyright (c) 2022 by Delphix. All rights reserved.
+ * Copyright (c) 2024, Klara, Inc.
*/
#include <err.h>
@@ -72,7 +71,7 @@ zstream_do_recompress(int argc, char *argv[])
dmu_replay_record_t *drr = &thedrr;
zio_cksum_t stream_cksum;
int c;
- int level = -1;
+ int level = 0;
while ((c = getopt(argc, argv, "l:")) != -1) {
switch (c) {
@@ -97,34 +96,22 @@ zstream_do_recompress(int argc, char *argv[])
if (argc != 1)
zstream_usage();
- int type = 0;
- zio_compress_info_t *cinfo = NULL;
- if (0 == strcmp(argv[0], "off")) {
- type = ZIO_COMPRESS_OFF;
- cinfo = &zio_compress_table[type];
- } else if (0 == strcmp(argv[0], "inherit") ||
- 0 == strcmp(argv[0], "empty") ||
- 0 == strcmp(argv[0], "on")) {
- // Fall through to invalid compression type case
+
+ enum zio_compress ctype;
+ if (strcmp(argv[0], "off") == 0) {
+ ctype = ZIO_COMPRESS_OFF;
} else {
- for (int i = 0; i < ZIO_COMPRESS_FUNCTIONS; i++) {
- if (0 == strcmp(zio_compress_table[i].ci_name,
- argv[0])) {
- cinfo = &zio_compress_table[i];
- type = i;
+ for (ctype = 0; ctype < ZIO_COMPRESS_FUNCTIONS; ctype++) {
+ if (strcmp(argv[0],
+ zio_compress_table[ctype].ci_name) == 0)
break;
- }
}
- }
- if (cinfo == NULL) {
- fprintf(stderr, "Invalid compression type %s.\n",
- argv[0]);
- exit(2);
- }
-
- if (cinfo->ci_compress == NULL) {
- type = 0;
- cinfo = &zio_compress_table[0];
+ if (ctype == ZIO_COMPRESS_FUNCTIONS ||
+ zio_compress_table[ctype].ci_compress == NULL) {
+ fprintf(stderr, "Invalid compression type %s.\n",
+ argv[0]);
+ exit(2);
+ }
}
if (isatty(STDIN_FILENO)) {
@@ -135,6 +122,7 @@ zstream_do_recompress(int argc, char *argv[])
exit(1);
}
+ abd_init();
fletcher_4_init();
zio_init();
zstd_init();
@@ -247,53 +235,60 @@ zstream_do_recompress(int argc, char *argv[])
(void) sfread(buf, payload_size, stdin);
break;
}
- if (drrw->drr_compressiontype >=
- ZIO_COMPRESS_FUNCTIONS) {
+ enum zio_compress dtype = drrw->drr_compressiontype;
+ if (dtype >= ZIO_COMPRESS_FUNCTIONS) {
fprintf(stderr, "Invalid compression type in "
- "stream: %d\n", drrw->drr_compressiontype);
+ "stream: %d\n", dtype);
exit(3);
}
- zio_compress_info_t *dinfo =
- &zio_compress_table[drrw->drr_compressiontype];
+ if (zio_compress_table[dtype].ci_decompress == NULL)
+ dtype = ZIO_COMPRESS_OFF;
/* Set up buffers to minimize memcpys */
char *cbuf, *dbuf;
- if (cinfo->ci_compress == NULL)
+ if (ctype == ZIO_COMPRESS_OFF)
dbuf = buf;
else
dbuf = safe_calloc(bufsz);
- if (dinfo->ci_decompress == NULL)
+ if (dtype == ZIO_COMPRESS_OFF)
cbuf = dbuf;
else
cbuf = safe_calloc(payload_size);
/* Read and decompress the payload */
(void) sfread(cbuf, payload_size, stdin);
- if (dinfo->ci_decompress != NULL) {
- if (0 != dinfo->ci_decompress(cbuf, dbuf,
- payload_size, MIN(bufsz,
- drrw->drr_logical_size), dinfo->ci_level)) {
+ if (dtype != ZIO_COMPRESS_OFF) {
+ abd_t cabd;
+ abd_get_from_buf_struct(&cabd,
+ cbuf, payload_size);
+ if (zio_decompress_data(dtype, &cabd, dbuf,
+ payload_size,
+ MIN(bufsz, drrw->drr_logical_size),
+ NULL) != 0) {
warnx("decompression type %d failed "
"for ino %llu offset %llu",
- type,
+ dtype,
(u_longlong_t)drrw->drr_object,
(u_longlong_t)drrw->drr_offset);
exit(4);
}
payload_size = drrw->drr_logical_size;
+ abd_free(&cabd);
free(cbuf);
}
/* Recompress the payload */
- if (cinfo->ci_compress != NULL) {
- payload_size = P2ROUNDUP(cinfo->ci_compress(
- dbuf, buf, drrw->drr_logical_size,
- MIN(payload_size, bufsz), (level == -1 ?
- cinfo->ci_level : level)),
+ if (ctype != ZIO_COMPRESS_OFF) {
+ abd_t dabd;
+ abd_get_from_buf_struct(&dabd,
+ dbuf, drrw->drr_logical_size);
+ payload_size = P2ROUNDUP(zio_compress_data(
+ ctype, &dabd, (void **)&buf,
+ drrw->drr_logical_size, level),
SPA_MINBLOCKSIZE);
if (payload_size != drrw->drr_logical_size) {
- drrw->drr_compressiontype = type;
+ drrw->drr_compressiontype = ctype;
drrw->drr_compressed_size =
payload_size;
} else {
@@ -301,9 +296,10 @@ zstream_do_recompress(int argc, char *argv[])
drrw->drr_compressiontype = 0;
drrw->drr_compressed_size = 0;
}
+ abd_free(&dabd);
free(dbuf);
} else {
- drrw->drr_compressiontype = type;
+ drrw->drr_compressiontype = ctype;
drrw->drr_compressed_size = 0;
}
break;
@@ -371,6 +367,7 @@ zstream_do_recompress(int argc, char *argv[])
fletcher_4_fini();
zio_fini();
zstd_fini();
+ abd_fini();
return (0);
}