From 93cf20764a1be64a603020f54b45200e37b3877e Mon Sep 17 00:00:00 2001
From: George Wilson <george.wilson@delphix.com>
Date: Tue, 1 Oct 2013 13:25:53 -0800
Subject: Illumos #4101, #4102, #4103, #4105, #4106

4101 metaslab_debug should allow for fine-grained control
4102 space_maps should store more information about themselves
4103 space map object blocksize should be increased
4105 removing a mirrored log device results in a leaked object
4106 asynchronously load metaslab
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Adam Leventhal <ahl@delphix.com>
Reviewed by: Sebastien Roy <seb@delphix.com>
Approved by: Garrett D'Amore <garrett@damore.org>

Prior to this patch, space_maps were preferred solely based on the
amount of free space left in each. Unfortunately, this heuristic didn't
contain any information about the make-up of that free space, which
meant we could keep preferring and loading a highly fragmented space map
that wouldn't actually have enough contiguous space to satisfy the
allocation; then unloading that space_map and repeating the process.

This change modifies the space_map's to store additional information
about the contiguous space in the space_map, so that we can use this
information to make a better decision about which space_map to load.
This requires reallocating all space_map objects to increase their
bonus buffer size sizes enough to fit the new metadata.

The above feature can be enabled via a new feature flag introduced by
this change: com.delphix:spacemap_histogram

In addition to the above, this patch allows the space_map block size to
be increase. Currently the block size is set to be 4K in size, which has
certain implications including the following:

    * 4K sector devices will not see any compression benefit
    * large space_maps require more metadata on-disk
    * large space_maps require more time to load (typically random reads)

Now the space_map block size can adjust as needed up to the maximum size
set via the space_map_max_blksz variable.

A bug was fixed which resulted in potentially leaking an object when
removing a mirrored log device. The previous logic for vdev_remove() did
not deal with removing top-level vdevs that are interior vdevs (i.e.
mirror) correctly. The problem would occur when removing a mirrored log
device, and result in the DTL space map object being leaked; because
top-level vdevs don't have DTL space map objects associated with them.

References:
  https://www.illumos.org/issues/4101
  https://www.illumos.org/issues/4102
  https://www.illumos.org/issues/4103
  https://www.illumos.org/issues/4105
  https://www.illumos.org/issues/4106
  https://github.com/illumos/illumos-gate/commit/0713e23

Porting notes:

A handful of kmem_alloc() calls were converted to kmem_zalloc(). Also,
the KM_PUSHPAGE and TQ_PUSHPAGE flags were used as necessary.

Ported-by: Tim Chase <tim@chase2k.com>
Signed-off-by: Prakash Surya <surya1@llnl.gov>
Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
Closes #2488
---
 module/zfs/Makefile.in       |    2 +
 module/zfs/dnode.c           |    2 +-
 module/zfs/metaslab.c        | 1264 ++++++++++++++++++++++++------------------
 module/zfs/range_tree.c      |  391 +++++++++++++
 module/zfs/spa.c             |   45 +-
 module/zfs/spa_misc.c        |    6 +-
 module/zfs/space_map.c       |  874 ++++++++++++++---------------
 module/zfs/space_reftree.c   |  159 ++++++
 module/zfs/vdev.c            |  293 +++++-----
 module/zfs/vdev_label.c      |    5 +-
 module/zfs/zfeature.c        |   34 +-
 module/zfs/zfeature_common.c |    5 +-
 12 files changed, 1916 insertions(+), 1164 deletions(-)
 create mode 100644 module/zfs/range_tree.c
 create mode 100644 module/zfs/space_reftree.c

(limited to 'module')

diff --git a/module/zfs/Makefile.in b/module/zfs/Makefile.in
index 5552436ad..9701ff2bb 100644
--- a/module/zfs/Makefile.in
+++ b/module/zfs/Makefile.in
@@ -35,6 +35,7 @@ $(MODULE)-objs += @top_srcdir@/module/zfs/gzip.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/lzjb.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/lz4.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/metaslab.o
+$(MODULE)-objs += @top_srcdir@/module/zfs/range_tree.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/refcount.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/rrwlock.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/sa.o
@@ -47,6 +48,7 @@ $(MODULE)-objs += @top_srcdir@/module/zfs/spa_history.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/spa_misc.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/spa_stats.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/space_map.o
+$(MODULE)-objs += @top_srcdir@/module/zfs/space_reftree.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/txg.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/uberblock.o
 $(MODULE)-objs += @top_srcdir@/module/zfs/unique.o
diff --git a/module/zfs/dnode.c b/module/zfs/dnode.c
index 5cb5fcc18..f95066ddd 100644
--- a/module/zfs/dnode.c
+++ b/module/zfs/dnode.c
@@ -1335,7 +1335,7 @@ dnode_set_blksz(dnode_t *dn, uint64_t size, int ibs, dmu_tx_t *tx)
 	rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
 
 	/* Check for any allocated blocks beyond the first */
-	if (dn->dn_phys->dn_maxblkid != 0)
+	if (dn->dn_maxblkid != 0)
 		goto fail;
 
 	mutex_enter(&dn->dn_dbufs_mtx);
diff --git a/module/zfs/metaslab.c b/module/zfs/metaslab.c
index fe2428ac0..b4e390c98 100644
--- a/module/zfs/metaslab.c
+++ b/module/zfs/metaslab.c
@@ -31,6 +31,7 @@
 #include <sys/metaslab_impl.h>
 #include <sys/vdev_impl.h>
 #include <sys/zio.h>
+#include <sys/spa_impl.h>
 
 #define	WITH_DF_BLOCK_ALLOCATOR
 
@@ -46,6 +47,11 @@
 	(!((flags) & (METASLAB_GANG_CHILD | METASLAB_GANG_HEADER | \
 	METASLAB_GANG_AVOID)))
 
+#define	METASLAB_WEIGHT_PRIMARY		(1ULL << 63)
+#define	METASLAB_WEIGHT_SECONDARY	(1ULL << 62)
+#define	METASLAB_ACTIVE_MASK		\
+	(METASLAB_WEIGHT_PRIMARY | METASLAB_WEIGHT_SECONDARY)
+
 uint64_t metaslab_aliquot = 512ULL << 10;
 uint64_t metaslab_gang_bang = SPA_MAXBLOCKSIZE + 1;	/* force gang blocks */
 
@@ -113,27 +119,45 @@ int metaslab_df_free_pct = 4;
 uint64_t metaslab_min_alloc_size = DMU_MAX_ACCESS;
 
 /*
- * Max number of space_maps to prefetch.
+ * Percentage of all cpus that can be used by the metaslab taskq.
  */
-int metaslab_prefetch_limit = SPA_DVAS_PER_BP;
+int metaslab_load_pct = 50;
 
 /*
- * Percentage bonus multiplier for metaslabs that are in the bonus area.
+ * Determines how many txgs a metaslab may remain loaded without having any
+ * allocations from it. As long as a metaslab continues to be used we will
+ * keep it loaded.
  */
-int metaslab_smo_bonus_pct = 150;
+int metaslab_unload_delay = TXG_SIZE * 2;
 
 /*
  * Should we be willing to write data to degraded vdevs?
  */
 boolean_t zfs_write_to_degraded = B_FALSE;
 
+/*
+ * Max number of metaslabs per group to preload.
+ */
+int metaslab_preload_limit = SPA_DVAS_PER_BP;
+
+/*
+ * Enable/disable preloading of metaslab.
+ */
+boolean_t metaslab_preload_enabled = B_TRUE;
+
+/*
+ * Enable/disable additional weight factor for each metaslab.
+ */
+boolean_t metaslab_weight_factor_enable = B_FALSE;
+
+
 /*
  * ==========================================================================
  * Metaslab classes
  * ==========================================================================
  */
 metaslab_class_t *
-metaslab_class_create(spa_t *spa, space_map_ops_t *ops)
+metaslab_class_create(spa_t *spa, metaslab_ops_t *ops)
 {
 	metaslab_class_t *mc;
 
@@ -239,9 +263,9 @@ metaslab_compare(const void *x1, const void *x2)
 	/*
 	 * If the weights are identical, use the offset to force uniqueness.
 	 */
-	if (m1->ms_map->sm_start < m2->ms_map->sm_start)
+	if (m1->ms_start < m2->ms_start)
 		return (-1);
-	if (m1->ms_map->sm_start > m2->ms_map->sm_start)
+	if (m1->ms_start > m2->ms_start)
 		return (1);
 
 	ASSERT3P(m1, ==, m2);
@@ -309,6 +333,9 @@ metaslab_group_create(metaslab_class_t *mc, vdev_t *vd)
 	mg->mg_class = mc;
 	mg->mg_activation_count = 0;
 
+	mg->mg_taskq = taskq_create("metaslab_group_tasksq", metaslab_load_pct,
+	    minclsyspri, 10, INT_MAX, TASKQ_THREADS_CPU_PCT);
+
 	return (mg);
 }
 
@@ -377,6 +404,8 @@ metaslab_group_passivate(metaslab_group_t *mg)
 		return;
 	}
 
+	taskq_wait(mg->mg_taskq);
+
 	mgprev = mg->mg_prev;
 	mgnext = mg->mg_next;
 
@@ -456,135 +485,205 @@ metaslab_group_allocatable(metaslab_group_t *mg)
 
 /*
  * ==========================================================================
- * Common allocator routines
+ * Range tree callbacks
  * ==========================================================================
  */
+
+/*
+ * Comparison function for the private size-ordered tree. Tree is sorted
+ * by size, larger sizes at the end of the tree.
+ */
 static int
-metaslab_segsize_compare(const void *x1, const void *x2)
+metaslab_rangesize_compare(const void *x1, const void *x2)
 {
-	const space_seg_t *s1 = x1;
-	const space_seg_t *s2 = x2;
-	uint64_t ss_size1 = s1->ss_end - s1->ss_start;
-	uint64_t ss_size2 = s2->ss_end - s2->ss_start;
+	const range_seg_t *r1 = x1;
+	const range_seg_t *r2 = x2;
+	uint64_t rs_size1 = r1->rs_end - r1->rs_start;
+	uint64_t rs_size2 = r2->rs_end - r2->rs_start;
 
-	if (ss_size1 < ss_size2)
+	if (rs_size1 < rs_size2)
 		return (-1);
-	if (ss_size1 > ss_size2)
+	if (rs_size1 > rs_size2)
 		return (1);
 
-	if (s1->ss_start < s2->ss_start)
+	if (r1->rs_start < r2->rs_start)
 		return (-1);
-	if (s1->ss_start > s2->ss_start)
+
+	if (r1->rs_start > r2->rs_start)
 		return (1);
 
 	return (0);
 }
 
-#if defined(WITH_FF_BLOCK_ALLOCATOR) || \
-    defined(WITH_DF_BLOCK_ALLOCATOR) || \
-    defined(WITH_CDF_BLOCK_ALLOCATOR)
 /*
- * This is a helper function that can be used by the allocator to find
- * a suitable block to allocate. This will search the specified AVL
- * tree looking for a block that matches the specified criteria.
+ * Create any block allocator specific components. The current allocators
+ * rely on using both a size-ordered range_tree_t and an array of uint64_t's.
  */
-static uint64_t
-metaslab_block_picker(avl_tree_t *t, uint64_t *cursor, uint64_t size,
-    uint64_t align)
+static void
+metaslab_rt_create(range_tree_t *rt, void *arg)
 {
-	space_seg_t *ss, ssearch;
-	avl_index_t where;
-
-	ssearch.ss_start = *cursor;
-	ssearch.ss_end = *cursor + size;
+	metaslab_t *msp = arg;
 
-	ss = avl_find(t, &ssearch, &where);
-	if (ss == NULL)
-		ss = avl_nearest(t, where, AVL_AFTER);
+	ASSERT3P(rt->rt_arg, ==, msp);
+	ASSERT(msp->ms_tree == NULL);
 
-	while (ss != NULL) {
-		uint64_t offset = P2ROUNDUP(ss->ss_start, align);
-
-		if (offset + size <= ss->ss_end) {
-			*cursor = offset + size;
-			return (offset);
-		}
-		ss = AVL_NEXT(t, ss);
-	}
-
-	/*
-	 * If we know we've searched the whole map (*cursor == 0), give up.
-	 * Otherwise, reset the cursor to the beginning and try again.
-	 */
-	if (*cursor == 0)
-		return (-1ULL);
-
-	*cursor = 0;
-	return (metaslab_block_picker(t, cursor, size, align));
+	avl_create(&msp->ms_size_tree, metaslab_rangesize_compare,
+	    sizeof (range_seg_t), offsetof(range_seg_t, rs_pp_node));
 }
-#endif /* WITH_FF/DF/CDF_BLOCK_ALLOCATOR */
 
+/*
+ * Destroy the block allocator specific components.
+ */
 static void
-metaslab_pp_load(space_map_t *sm)
+metaslab_rt_destroy(range_tree_t *rt, void *arg)
 {
-	space_seg_t *ss;
-
-	ASSERT(sm->sm_ppd == NULL);
-	sm->sm_ppd = kmem_zalloc(64 * sizeof (uint64_t), KM_PUSHPAGE);
+	metaslab_t *msp = arg;
 
-	sm->sm_pp_root = kmem_alloc(sizeof (avl_tree_t), KM_PUSHPAGE);
-	avl_create(sm->sm_pp_root, metaslab_segsize_compare,
-	    sizeof (space_seg_t), offsetof(struct space_seg, ss_pp_node));
+	ASSERT3P(rt->rt_arg, ==, msp);
+	ASSERT3P(msp->ms_tree, ==, rt);
+	ASSERT0(avl_numnodes(&msp->ms_size_tree));
 
-	for (ss = avl_first(&sm->sm_root); ss; ss = AVL_NEXT(&sm->sm_root, ss))
-		avl_add(sm->sm_pp_root, ss);
+	avl_destroy(&msp->ms_size_tree);
 }
 
 static void
-metaslab_pp_unload(space_map_t *sm)
+metaslab_rt_add(range_tree_t *rt, range_seg_t *rs, void *arg)
 {
-	void *cookie = NULL;
-
-	kmem_free(sm->sm_ppd, 64 * sizeof (uint64_t));
-	sm->sm_ppd = NULL;
-
-	while (avl_destroy_nodes(sm->sm_pp_root, &cookie) != NULL) {
-		/* tear down the tree */
-	}
+	metaslab_t *msp = arg;
 
-	avl_destroy(sm->sm_pp_root);
-	kmem_free(sm->sm_pp_root, sizeof (avl_tree_t));
-	sm->sm_pp_root = NULL;
+	ASSERT3P(rt->rt_arg, ==, msp);
+	ASSERT3P(msp->ms_tree, ==, rt);
+	VERIFY(!msp->ms_condensing);
+	avl_add(&msp->ms_size_tree, rs);
 }
 
-/* ARGSUSED */
 static void
-metaslab_pp_claim(space_map_t *sm, uint64_t start, uint64_t size)
+metaslab_rt_remove(range_tree_t *rt, range_seg_t *rs, void *arg)
 {
-	/* No need to update cursor */
+	metaslab_t *msp = arg;
+
+	ASSERT3P(rt->rt_arg, ==, msp);
+	ASSERT3P(msp->ms_tree, ==, rt);
+	VERIFY(!msp->ms_condensing);
+	avl_remove(&msp->ms_size_tree, rs);
 }
 
-/* ARGSUSED */
 static void
-metaslab_pp_free(space_map_t *sm, uint64_t start, uint64_t size)
+metaslab_rt_vacate(range_tree_t *rt, void *arg)
 {
-	/* No need to update cursor */
+	metaslab_t *msp = arg;
+
+	ASSERT3P(rt->rt_arg, ==, msp);
+	ASSERT3P(msp->ms_tree, ==, rt);
+
+	/*
+	 * Normally one would walk the tree freeing nodes along the way.
+	 * Since the nodes are shared with the range trees we can avoid
+	 * walking all nodes and just reinitialize the avl tree. The nodes
+	 * will be freed by the range tree, so we don't want to free them here.
+	 */
+	avl_create(&msp->ms_size_tree, metaslab_rangesize_compare,
+	    sizeof (range_seg_t), offsetof(range_seg_t, rs_pp_node));
 }
 
+static range_tree_ops_t metaslab_rt_ops = {
+	metaslab_rt_create,
+	metaslab_rt_destroy,
+	metaslab_rt_add,
+	metaslab_rt_remove,
+	metaslab_rt_vacate
+};
+
+/*
+ * ==========================================================================
+ * Metaslab block operations
+ * ==========================================================================
+ */
+
 /*
  * Return the maximum contiguous segment within the metaslab.
  */
 uint64_t
-metaslab_pp_maxsize(space_map_t *sm)
+metaslab_block_maxsize(metaslab_t *msp)
 {
-	avl_tree_t *t = sm->sm_pp_root;
-	space_seg_t *ss;
+	avl_tree_t *t = &msp->ms_size_tree;
+	range_seg_t *rs;
 
-	if (t == NULL || (ss = avl_last(t)) == NULL)
+	if (t == NULL || (rs = avl_last(t)) == NULL)
 		return (0ULL);
 
-	return (ss->ss_end - ss->ss_start);
+	return (rs->rs_end - rs->rs_start);
+}
+
+uint64_t
+metaslab_block_alloc(metaslab_t *msp, uint64_t size)
+{
+	uint64_t start;
+	range_tree_t *rt = msp->ms_tree;
+
+	VERIFY(!msp->ms_condensing);
+
+	start = msp->ms_ops->msop_alloc(msp, size);
+	if (start != -1ULL) {
+		vdev_t *vd = msp->ms_group->mg_vd;
+
+		VERIFY0(P2PHASE(start, 1ULL << vd->vdev_ashift));
+		VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
+		VERIFY3U(range_tree_space(rt) - size, <=, msp->ms_size);
+		range_tree_remove(rt, start, size);
+	}
+	return (start);
+}
+
+/*
+ * ==========================================================================
+ * Common allocator routines
+ * ==========================================================================
+ */
+
+#if defined(WITH_FF_BLOCK_ALLOCATOR) || \
+    defined(WITH_DF_BLOCK_ALLOCATOR) || \
+    defined(WITH_CF_BLOCK_ALLOCATOR)
+/*
+ * This is a helper function that can be used by the allocator to find
+ * a suitable block to allocate. This will search the specified AVL
+ * tree looking for a block that matches the specified criteria.
+ */
+static uint64_t
+metaslab_block_picker(avl_tree_t *t, uint64_t *cursor, uint64_t size,
+    uint64_t align)
+{
+	range_seg_t *rs, rsearch;
+	avl_index_t where;
+
+	rsearch.rs_start = *cursor;
+	rsearch.rs_end = *cursor + size;
+
+	rs = avl_find(t, &rsearch, &where);
+	if (rs == NULL)
+		rs = avl_nearest(t, where, AVL_AFTER);
+
+	while (rs != NULL) {
+		uint64_t offset = P2ROUNDUP(rs->rs_start, align);
+
+		if (offset + size <= rs->rs_end) {
+			*cursor = offset + size;
+			return (offset);
+		}
+		rs = AVL_NEXT(t, rs);
+	}
+
+	/*
+	 * If we know we've searched the whole map (*cursor == 0), give up.
+	 * Otherwise, reset the cursor to the beginning and try again.
+	 */
+	if (*cursor == 0)
+		return (-1ULL);
+
+	*cursor = 0;
+	return (metaslab_block_picker(t, cursor, size, align));
 }
+#endif /* WITH_FF/DF/CF_BLOCK_ALLOCATOR */
 
 #if defined(WITH_FF_BLOCK_ALLOCATOR)
 /*
@@ -593,33 +692,35 @@ metaslab_pp_maxsize(space_map_t *sm)
  * ==========================================================================
  */
 static uint64_t
-metaslab_ff_alloc(space_map_t *sm, uint64_t size)
+metaslab_ff_alloc(metaslab_t *msp, uint64_t size)
 {
-	avl_tree_t *t = &sm->sm_root;
+	/*
+	 * Find the largest power of 2 block size that evenly divides the
+	 * requested size. This is used to try to allocate blocks with similar
+	 * alignment from the same area of the metaslab (i.e. same cursor
+	 * bucket) but it does not guarantee that other allocations sizes
+	 * may exist in the same region.
+	 */
 	uint64_t align = size & -size;
-	uint64_t *cursor = (uint64_t *)sm->sm_ppd + highbit(align) - 1;
+	uint64_t *cursor = &msp->ms_lbas[highbit(align) - 1];
+	avl_tree_t *t = &msp->ms_tree->rt_root;
 
 	return (metaslab_block_picker(t, cursor, size, align));
 }
 
 /* ARGSUSED */
-boolean_t
-metaslab_ff_fragmented(space_map_t *sm)
+static boolean_t
+metaslab_ff_fragmented(metaslab_t *msp)
 {
 	return (B_TRUE);
 }
 
-static space_map_ops_t metaslab_ff_ops = {
-	metaslab_pp_load,
-	metaslab_pp_unload,
+static metaslab_ops_t metaslab_ff_ops = {
 	metaslab_ff_alloc,
-	metaslab_pp_claim,
-	metaslab_pp_free,
-	metaslab_pp_maxsize,
 	metaslab_ff_fragmented
 };
 
-space_map_ops_t *zfs_metaslab_ops = &metaslab_ff_ops;
+metaslab_ops_t *zfs_metaslab_ops = &metaslab_ff_ops;
 #endif /* WITH_FF_BLOCK_ALLOCATOR */
 
 #if defined(WITH_DF_BLOCK_ALLOCATOR)
@@ -632,16 +733,24 @@ space_map_ops_t *zfs_metaslab_ops = &metaslab_ff_ops;
  * ==========================================================================
  */
 static uint64_t
-metaslab_df_alloc(space_map_t *sm, uint64_t size)
+metaslab_df_alloc(metaslab_t *msp, uint64_t size)
 {
-	avl_tree_t *t = &sm->sm_root;
+	/*
+	 * Find the largest power of 2 block size that evenly divides the
+	 * requested size. This is used to try to allocate blocks with similar
+	 * alignment from the same area of the metaslab (i.e. same cursor
+	 * bucket) but it does not guarantee that other allocations sizes
+	 * may exist in the same region.
+	 */
 	uint64_t align = size & -size;
-	uint64_t *cursor = (uint64_t *)sm->sm_ppd + highbit(align) - 1;
-	uint64_t max_size = metaslab_pp_maxsize(sm);
-	int free_pct = sm->sm_space * 100 / sm->sm_size;
+	uint64_t *cursor = &msp->ms_lbas[highbit(align) - 1];
+	range_tree_t *rt = msp->ms_tree;
+	avl_tree_t *t = &rt->rt_root;
+	uint64_t max_size = metaslab_block_maxsize(msp);
+	int free_pct = range_tree_space(rt) * 100 / msp->ms_size;
 
-	ASSERT(MUTEX_HELD(sm->sm_lock));
-	ASSERT3U(avl_numnodes(&sm->sm_root), ==, avl_numnodes(sm->sm_pp_root));
+	ASSERT(MUTEX_HELD(&msp->ms_lock));
+	ASSERT3U(avl_numnodes(t), ==, avl_numnodes(&msp->ms_size_tree));
 
 	if (max_size < size)
 		return (-1ULL);
@@ -652,7 +761,7 @@ metaslab_df_alloc(space_map_t *sm, uint64_t size)
 	 */
 	if (max_size < metaslab_df_alloc_threshold ||
 	    free_pct < metaslab_df_free_pct) {
-		t = sm->sm_pp_root;
+		t = &msp->ms_size_tree;
 		*cursor = 0;
 	}
 
@@ -660,203 +769,253 @@ metaslab_df_alloc(space_map_t *sm, uint64_t size)
 }
 
 static boolean_t
-metaslab_df_fragmented(space_map_t *sm)
+metaslab_df_fragmented(metaslab_t *msp)
 {
-	uint64_t max_size = metaslab_pp_maxsize(sm);
-	int free_pct = sm->sm_space * 100 / sm->sm_size;
+	range_tree_t *rt = msp->ms_tree;
+	uint64_t max_size = metaslab_block_maxsize(msp);
+	int free_pct = range_tree_space(rt) * 100 / msp->ms_size;
 
 	if (max_size >= metaslab_df_alloc_threshold &&
 	    free_pct >= metaslab_df_free_pct)
 		return (B_FALSE);
 
+
 	return (B_TRUE);
 }
 
-static space_map_ops_t metaslab_df_ops = {
-	metaslab_pp_load,
-	metaslab_pp_unload,
+static metaslab_ops_t metaslab_df_ops = {
 	metaslab_df_alloc,
-	metaslab_pp_claim,
-	metaslab_pp_free,
-	metaslab_pp_maxsize,
 	metaslab_df_fragmented
 };
 
-space_map_ops_t *zfs_metaslab_ops = &metaslab_df_ops;
+metaslab_ops_t *zfs_metaslab_ops = &metaslab_df_ops;
 #endif /* WITH_DF_BLOCK_ALLOCATOR */
 
+#if defined(WITH_CF_BLOCK_ALLOCATOR)
 /*
  * ==========================================================================
- * Other experimental allocators
+ * Cursor fit block allocator -
+ * Select the largest region in the metaslab, set the cursor to the beginning
+ * of the range and the cursor_end to the end of the range. As allocations
+ * are made advance the cursor. Continue allocating from the cursor until
+ * the range is exhausted and then find a new range.
  * ==========================================================================
  */
-#if defined(WITH_CDF_BLOCK_ALLOCATOR)
 static uint64_t
-metaslab_cdf_alloc(space_map_t *sm, uint64_t size)
+metaslab_cf_alloc(metaslab_t *msp, uint64_t size)
 {
-	avl_tree_t *t = &sm->sm_root;
-	uint64_t *cursor = (uint64_t *)sm->sm_ppd;
-	uint64_t *extent_end = (uint64_t *)sm->sm_ppd + 1;
-	uint64_t max_size = metaslab_pp_maxsize(sm);
-	uint64_t rsize = size;
+	range_tree_t *rt = msp->ms_tree;
+	avl_tree_t *t = &msp->ms_size_tree;
+	uint64_t *cursor = &msp->ms_lbas[0];
+	uint64_t *cursor_end = &msp->ms_lbas[1];
 	uint64_t offset = 0;
 
-	ASSERT(MUTEX_HELD(sm->sm_lock));
-	ASSERT3U(avl_numnodes(&sm->sm_root), ==, avl_numnodes(sm->sm_pp_root));
+	ASSERT(MUTEX_HELD(&msp->ms_lock));
+	ASSERT3U(avl_numnodes(t), ==, avl_numnodes(&rt->rt_root));
 
-	if (max_size < size)
-		return (-1ULL);
+	ASSERT3U(*cursor_end, >=, *cursor);
 
-	ASSERT3U(*extent_end, >=, *cursor);
+	if ((*cursor + size) > *cursor_end) {
+		range_seg_t *rs;
 
-	/*
-	 * If we're running low on space switch to using the size
-	 * sorted AVL tree (best-fit).
-	 */
-	if ((*cursor + size) > *extent_end) {
-
-		t = sm->sm_pp_root;
-		*cursor = *extent_end = 0;
+		rs = avl_last(&msp->ms_size_tree);
+		if (rs == NULL || (rs->rs_end - rs->rs_start) < size)
+			return (-1ULL);
 
-		if (max_size > 2 * SPA_MAXBLOCKSIZE)
-			rsize = MIN(metaslab_min_alloc_size, max_size);
-		offset = metaslab_block_picker(t, extent_end, rsize, 1ULL);
-		if (offset != -1)
-			*cursor = offset + size;
-	} else {
-		offset = metaslab_block_picker(t, cursor, rsize, 1ULL);
+		*cursor = rs->rs_start;
+		*cursor_end = rs->rs_end;
 	}
-	ASSERT3U(*cursor, <=, *extent_end);
+
+	offset = *cursor;
+	*cursor += size;
+
 	return (offset);
 }
 
 static boolean_t
-metaslab_cdf_fragmented(space_map_t *sm)
+metaslab_cf_fragmented(metaslab_t *msp)
 {
-	uint64_t max_size = metaslab_pp_maxsize(sm);
-
-	if (max_size > (metaslab_min_alloc_size * 10))
-		return (B_FALSE);
-	return (B_TRUE);
+	return (metaslab_block_maxsize(msp) < metaslab_min_alloc_size);
 }
 
-static space_map_ops_t metaslab_cdf_ops = {
-	metaslab_pp_load,
-	metaslab_pp_unload,
-	metaslab_cdf_alloc,
-	metaslab_pp_claim,
-	metaslab_pp_free,
-	metaslab_pp_maxsize,
-	metaslab_cdf_fragmented
+static metaslab_ops_t metaslab_cf_ops = {
+	metaslab_cf_alloc,
+	metaslab_cf_fragmented
 };
 
-space_map_ops_t *zfs_metaslab_ops = &metaslab_cdf_ops;
-#endif /* WITH_CDF_BLOCK_ALLOCATOR */
+metaslab_ops_t *zfs_metaslab_ops = &metaslab_cf_ops;
+#endif /* WITH_CF_BLOCK_ALLOCATOR */
 
 #if defined(WITH_NDF_BLOCK_ALLOCATOR)
+/*
+ * ==========================================================================
+ * New dynamic fit allocator -
+ * Select a region that is large enough to allocate 2^metaslab_ndf_clump_shift
+ * contiguous blocks. If no region is found then just use the largest segment
+ * that remains.
+ * ==========================================================================
+ */
+
+/*
+ * Determines desired number of contiguous blocks (2^metaslab_ndf_clump_shift)
+ * to request from the allocator.
+ */
 uint64_t metaslab_ndf_clump_shift = 4;
 
 static uint64_t
-metaslab_ndf_alloc(space_map_t *sm, uint64_t size)
+metaslab_ndf_alloc(metaslab_t *msp, uint64_t size)
 {
-	avl_tree_t *t = &sm->sm_root;
+	avl_tree_t *t = &msp->ms_tree->rt_root;
 	avl_index_t where;
-	space_seg_t *ss, ssearch;
+	range_seg_t *rs, rsearch;
 	uint64_t hbit = highbit(size);
-	uint64_t *cursor = (uint64_t *)sm->sm_ppd + hbit - 1;
-	uint64_t max_size = metaslab_pp_maxsize(sm);
+	uint64_t *cursor = &msp->ms_lbas[hbit - 1];
+	uint64_t max_size = metaslab_block_maxsize(msp);
 
-	ASSERT(MUTEX_HELD(sm->sm_lock));
-	ASSERT3U(avl_numnodes(&sm->sm_root), ==, avl_numnodes(sm->sm_pp_root));
+	ASSERT(MUTEX_HELD(&msp->ms_lock));
+	ASSERT3U(avl_numnodes(t), ==, avl_numnodes(&msp->ms_size_tree));
 
 	if (max_size < size)
 		return (-1ULL);
 
-	ssearch.ss_start = *cursor;
-	ssearch.ss_end = *cursor + size;
+	rsearch.rs_start = *cursor;
+	rsearch.rs_end = *cursor + size;
 
-	ss = avl_find(t, &ssearch, &where);
-	if (ss == NULL || (ss->ss_start + size > ss->ss_end)) {
-		t = sm->sm_pp_root;
+	rs = avl_find(t, &rsearch, &where);
+	if (rs == NULL || (rs->rs_end - rs->rs_start) < size) {
+		t = &msp->ms_size_tree;
 
-		ssearch.ss_start = 0;
-		ssearch.ss_end = MIN(max_size,
+		rsearch.rs_start = 0;
+		rsearch.rs_end = MIN(max_size,
 		    1ULL << (hbit + metaslab_ndf_clump_shift));
-		ss = avl_find(t, &ssearch, &where);
-		if (ss == NULL)
-			ss = avl_nearest(t, where, AVL_AFTER);
-		ASSERT(ss != NULL);
+		rs = avl_find(t, &rsearch, &where);
+		if (rs == NULL)
+			rs = avl_nearest(t, where, AVL_AFTER);
+		ASSERT(rs != NULL);
 	}
 
-	if (ss != NULL) {
-		if (ss->ss_start + size <= ss->ss_end) {
-			*cursor = ss->ss_start + size;
-			return (ss->ss_start);
-		}
+	if ((rs->rs_end - rs->rs_start) >= size) {
+		*cursor = rs->rs_start + size;
+		return (rs->rs_start);
 	}
 	return (-1ULL);
 }
 
 static boolean_t
-metaslab_ndf_fragmented(space_map_t *sm)
+metaslab_ndf_fragmented(metaslab_t *msp)
 {
-	uint64_t max_size = metaslab_pp_maxsize(sm);
-
-	if (max_size > (metaslab_min_alloc_size << metaslab_ndf_clump_shift))
-		return (B_FALSE);
-	return (B_TRUE);
+	return (metaslab_block_maxsize(msp) <=
+	    (metaslab_min_alloc_size << metaslab_ndf_clump_shift));
 }
 
-
-static space_map_ops_t metaslab_ndf_ops = {
-	metaslab_pp_load,
-	metaslab_pp_unload,
+static metaslab_ops_t metaslab_ndf_ops = {
 	metaslab_ndf_alloc,
-	metaslab_pp_claim,
-	metaslab_pp_free,
-	metaslab_pp_maxsize,
 	metaslab_ndf_fragmented
 };
 
-space_map_ops_t *zfs_metaslab_ops = &metaslab_ndf_ops;
+metaslab_ops_t *zfs_metaslab_ops = &metaslab_ndf_ops;
 #endif /* WITH_NDF_BLOCK_ALLOCATOR */
 
+
 /*
  * ==========================================================================
  * Metaslabs
  * ==========================================================================
  */
+
+/*
+ * Wait for any in-progress metaslab loads to complete.
+ */
+void
+metaslab_load_wait(metaslab_t *msp)
+{
+	ASSERT(MUTEX_HELD(&msp->ms_lock));
+
+	while (msp->ms_loading) {
+		ASSERT(!msp->ms_loaded);
+		cv_wait(&msp->ms_load_cv, &msp->ms_lock);
+	}
+}
+
+int
+metaslab_load(metaslab_t *msp)
+{
+	int error = 0;
+	int t;
+
+	ASSERT(MUTEX_HELD(&msp->ms_lock));
+	ASSERT(!msp->ms_loaded);
+	ASSERT(!msp->ms_loading);
+
+	msp->ms_loading = B_TRUE;
+
+	/*
+	 * If the space map has not been allocated yet, then treat
+	 * all the space in the metaslab as free and add it to the
+	 * ms_tree.
+	 */
+	if (msp->ms_sm != NULL)
+		error = space_map_load(msp->ms_sm, msp->ms_tree, SM_FREE);
+	else
+		range_tree_add(msp->ms_tree, msp->ms_start, msp->ms_size);
+
+	msp->ms_loaded = (error == 0);
+	msp->ms_loading = B_FALSE;
+
+	if (msp->ms_loaded) {
+		for (t = 0; t < TXG_DEFER_SIZE; t++) {
+			range_tree_walk(msp->ms_defertree[t],
+			    range_tree_remove, msp->ms_tree);
+		}
+	}
+	cv_broadcast(&msp->ms_load_cv);
+	return (error);
+}
+
+void
+metaslab_unload(metaslab_t *msp)
+{
+	ASSERT(MUTEX_HELD(&msp->ms_lock));
+	range_tree_vacate(msp->ms_tree, NULL, NULL);
+	msp->ms_loaded = B_FALSE;
+	msp->ms_weight &= ~METASLAB_ACTIVE_MASK;
+}
+
 metaslab_t *
-metaslab_init(metaslab_group_t *mg, space_map_obj_t *smo,
-	uint64_t start, uint64_t size, uint64_t txg)
+metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg)
 {
 	vdev_t *vd = mg->mg_vd;
+	objset_t *mos = vd->vdev_spa->spa_meta_objset;
 	metaslab_t *msp;
 
 	msp = kmem_zalloc(sizeof (metaslab_t), KM_PUSHPAGE);
 	mutex_init(&msp->ms_lock, NULL, MUTEX_DEFAULT, NULL);
+	cv_init(&msp->ms_load_cv, NULL, CV_DEFAULT, NULL);
+	msp->ms_id = id;
+	msp->ms_start = id << vd->vdev_ms_shift;
+	msp->ms_size = 1ULL << vd->vdev_ms_shift;
 
-	msp->ms_smo_syncing = *smo;
+	/*
+	 * We only open space map objects that already exist. All others
+	 * will be opened when we finally allocate an object for it.
+	 */
+	if (object != 0) {
+		VERIFY0(space_map_open(&msp->ms_sm, mos, object, msp->ms_start,
+		    msp->ms_size, vd->vdev_ashift, &msp->ms_lock));
+		ASSERT(msp->ms_sm != NULL);
+	}
 
 	/*
-	 * We create the main space map here, but we don't create the
-	 * allocmaps and freemaps until metaslab_sync_done().  This serves
+	 * We create the main range tree here, but we don't create the
+	 * alloctree and freetree until metaslab_sync_done().  This serves
 	 * two purposes: it allows metaslab_sync_done() to detect the
 	 * addition of new space; and for debugging, it ensures that we'd
 	 * data fault on any attempt to use this metaslab before it's ready.
 	 */
-	msp->ms_map = kmem_zalloc(sizeof (space_map_t), KM_PUSHPAGE);
-	space_map_create(msp->ms_map, start, size,
-	    vd->vdev_ashift, &msp->ms_lock);
-
+	msp->ms_tree = range_tree_create(&metaslab_rt_ops, msp, &msp->ms_lock);
 	metaslab_group_add(mg, msp);
 
-	if (metaslab_debug_load && smo->smo_object != 0) {
-		mutex_enter(&msp->ms_lock);
-		VERIFY(space_map_load(msp->ms_map, mg->mg_class->mc_ops,
-		    SM_FREE, smo, spa_meta_objset(vd->vdev_spa)) == 0);
-		mutex_exit(&msp->ms_lock);
-	}
+	msp->ms_ops = mg->mg_class->mc_ops;
 
 	/*
 	 * If we're opening an existing pool (txg == 0) or creating
@@ -867,6 +1026,17 @@ metaslab_init(metaslab_group_t *mg, space_map_obj_t *smo,
 	if (txg <= TXG_INITIAL)
 		metaslab_sync_done(msp, 0);
 
+	/*
+	 * If metaslab_debug_load is set and we're initializing a metaslab
+	 * that has an allocated space_map object then load the its space
+	 * map so that can verify frees.
+	 */
+	if (metaslab_debug_load && msp->ms_sm != NULL) {
+		mutex_enter(&msp->ms_lock);
+		VERIFY0(metaslab_load(msp));
+		mutex_exit(&msp->ms_lock);
+	}
+
 	if (txg != 0) {
 		vdev_dirty(vd, 0, NULL, txg);
 		vdev_dirty(vd, VDD_METASLAB, msp, txg);
@@ -878,51 +1048,107 @@ metaslab_init(metaslab_group_t *mg, space_map_obj_t *smo,
 void
 metaslab_fini(metaslab_t *msp)
 {
-	metaslab_group_t *mg = msp->ms_group;
 	int t;
 
-	vdev_space_update(mg->mg_vd,
-	    -msp->ms_smo.smo_alloc, 0, -msp->ms_map->sm_size);
+	metaslab_group_t *mg = msp->ms_group;
 
 	metaslab_group_remove(mg, msp);
 
 	mutex_enter(&msp->ms_lock);
 
-	space_map_unload(msp->ms_map);
-	space_map_destroy(msp->ms_map);
-	kmem_free(msp->ms_map, sizeof (*msp->ms_map));
+	VERIFY(msp->ms_group == NULL);
+	vdev_space_update(mg->mg_vd, -space_map_allocated(msp->ms_sm),
+	    0, -msp->ms_size);
+	space_map_close(msp->ms_sm);
+
+	metaslab_unload(msp);
+	range_tree_destroy(msp->ms_tree);
 
 	for (t = 0; t < TXG_SIZE; t++) {
-		space_map_destroy(msp->ms_allocmap[t]);
-		space_map_destroy(msp->ms_freemap[t]);
-		kmem_free(msp->ms_allocmap[t], sizeof (*msp->ms_allocmap[t]));
-		kmem_free(msp->ms_freemap[t], sizeof (*msp->ms_freemap[t]));
+		range_tree_destroy(msp->ms_alloctree[t]);
+		range_tree_destroy(msp->ms_freetree[t]);
 	}
 
 	for (t = 0; t < TXG_DEFER_SIZE; t++) {
-		space_map_destroy(msp->ms_defermap[t]);
-		kmem_free(msp->ms_defermap[t], sizeof (*msp->ms_defermap[t]));
+		range_tree_destroy(msp->ms_defertree[t]);
 	}
 
 	ASSERT0(msp->ms_deferspace);
 
 	mutex_exit(&msp->ms_lock);
+	cv_destroy(&msp->ms_load_cv);
 	mutex_destroy(&msp->ms_lock);
 
 	kmem_free(msp, sizeof (metaslab_t));
 }
 
-#define	METASLAB_WEIGHT_PRIMARY		(1ULL << 63)
-#define	METASLAB_WEIGHT_SECONDARY	(1ULL << 62)
-#define	METASLAB_ACTIVE_MASK		\
-	(METASLAB_WEIGHT_PRIMARY | METASLAB_WEIGHT_SECONDARY)
+/*
+ * Apply a weighting factor based on the histogram information for this
+ * metaslab. The current weighting factor is somewhat arbitrary and requires
+ * additional investigation. The implementation provides a measure of
+ * "weighted" free space and gives a higher weighting for larger contiguous
+ * regions. The weighting factor is determined by counting the number of
+ * sm_shift sectors that exist in each region represented by the histogram.
+ * That value is then multiplied by the power of 2 exponent and the sm_shift
+ * value.
+ *
+ * For example, assume the 2^21 histogram bucket has 4 2MB regions and the
+ * metaslab has an sm_shift value of 9 (512B):
+ *
+ * 1) calculate the number of sm_shift sectors in the region:
+ *	2^21 / 2^9 = 2^12 = 4096 * 4 (number of regions) = 16384
+ * 2) multiply by the power of 2 exponent and the sm_shift value:
+ *	16384 * 21 * 9 = 3096576
+ * This value will be added to the weighting of the metaslab.
+ */
+static uint64_t
+metaslab_weight_factor(metaslab_t *msp)
+{
+	uint64_t factor = 0;
+	uint64_t sectors;
+	int i;
+
+	/*
+	 * A null space map means that the entire metaslab is free,
+	 * calculate a weight factor that spans the entire size of the
+	 * metaslab.
+	 */
+	if (msp->ms_sm == NULL) {
+		vdev_t *vd = msp->ms_group->mg_vd;
+
+		i = highbit(msp->ms_size) - 1;
+		sectors = msp->ms_size >> vd->vdev_ashift;
+		return (sectors * i * vd->vdev_ashift);
+	}
+
+	if (msp->ms_sm->sm_dbuf->db_size != sizeof (space_map_phys_t))
+		return (0);
+
+	for (i = 0; i < SPACE_MAP_HISTOGRAM_SIZE(msp->ms_sm); i++) {
+		if (msp->ms_sm->sm_phys->smp_histogram[i] == 0)
+			continue;
+
+		/*
+		 * Determine the number of sm_shift sectors in the region
+		 * indicated by the histogram. For example, given an
+		 * sm_shift value of 9 (512 bytes) and i = 4 then we know
+		 * that we're looking at an 8K region in the histogram
+		 * (i.e. 9 + 4 = 13, 2^13 = 8192). To figure out the
+		 * number of sm_shift sectors (512 bytes in this example),
+		 * we would take 8192 / 512 = 16. Since the histogram
+		 * is offset by sm_shift we can simply use the value of
+		 * of i to calculate this (i.e. 2^i = 16 where i = 4).
+		 */
+		sectors = msp->ms_sm->sm_phys->smp_histogram[i] << i;
+		factor += (i + msp->ms_sm->sm_shift) * sectors;
+	}
+	return (factor * msp->ms_sm->sm_shift);
+}
 
 static uint64_t
 metaslab_weight(metaslab_t *msp)
 {
 	metaslab_group_t *mg = msp->ms_group;
-	space_map_t *sm = msp->ms_map;
-	space_map_obj_t *smo = &msp->ms_smo;
 	vdev_t *vd = mg->mg_vd;
 	uint64_t weight, space;
 
@@ -933,7 +1159,7 @@ metaslab_weight(metaslab_t *msp)
 	 * for us to do here.
 	 */
 	if (vd->vdev_removing) {
-		ASSERT0(smo->smo_alloc);
+		ASSERT0(space_map_allocated(msp->ms_sm));
 		ASSERT0(vd->vdev_ms_shift);
 		return (0);
 	}
@@ -941,7 +1167,7 @@ metaslab_weight(metaslab_t *msp)
 	/*
 	 * The baseline weight is the metaslab's free space.
 	 */
-	space = sm->sm_size - smo->smo_alloc;
+	space = msp->ms_size - space_map_allocated(msp->ms_sm);
 	weight = space;
 
 	/*
@@ -953,20 +1179,14 @@ metaslab_weight(metaslab_t *msp)
 	 * In effect, this means that we'll select the metaslab with the most
 	 * free bandwidth rather than simply the one with the most free space.
 	 */
-	weight = 2 * weight -
-	    ((sm->sm_start >> vd->vdev_ms_shift) * weight) / vd->vdev_ms_count;
+	weight = 2 * weight - (msp->ms_id * weight) / vd->vdev_ms_count;
 	ASSERT(weight >= space && weight <= 2 * space);
 
-	/*
-	 * For locality, assign higher weight to metaslabs which have
-	 * a lower offset than what we've already activated.
-	 */
-	if (sm->sm_start <= mg->mg_bonus_area)
-		weight *= (metaslab_smo_bonus_pct / 100);
-	ASSERT(weight >= space &&
-	    weight <= 2 * (metaslab_smo_bonus_pct / 100) * space);
+	msp->ms_factor = metaslab_weight_factor(msp);
+	if (metaslab_weight_factor_enable)
+		weight += msp->ms_factor;
 
-	if (sm->sm_loaded && !sm->sm_ops->smop_fragmented(sm)) {
+	if (msp->ms_loaded && !msp->ms_ops->msop_fragmented(msp)) {
 		/*
 		 * If this metaslab is one we're actively using, adjust its
 		 * weight to make it preferable to any inactive metaslab so
@@ -974,80 +1194,29 @@ metaslab_weight(metaslab_t *msp)
 		 */
 		weight |= (msp->ms_weight & METASLAB_ACTIVE_MASK);
 	}
-	return (weight);
-}
-
-static void
-metaslab_prefetch(metaslab_group_t *mg)
-{
-	spa_t *spa = mg->mg_vd->vdev_spa;
-	metaslab_t *msp;
-	avl_tree_t *t = &mg->mg_metaslab_tree;
-	int m;
-
-	mutex_enter(&mg->mg_lock);
 
-	/*
-	 * Prefetch the next potential metaslabs
-	 */
-	for (msp = avl_first(t), m = 0; msp; msp = AVL_NEXT(t, msp), m++) {
-		space_map_t *sm = msp->ms_map;
-		space_map_obj_t *smo = &msp->ms_smo;
-
-		/* If we have reached our prefetch limit then we're done */
-		if (m >= metaslab_prefetch_limit)
-			break;
-
-		if (!sm->sm_loaded && smo->smo_object != 0) {
-			mutex_exit(&mg->mg_lock);
-			dmu_prefetch(spa_meta_objset(spa), smo->smo_object,
-			    0ULL, smo->smo_objsize);
-			mutex_enter(&mg->mg_lock);
-		}
-	}
-	mutex_exit(&mg->mg_lock);
+	return (weight);
 }
 
 static int
 metaslab_activate(metaslab_t *msp, uint64_t activation_weight)
 {
-	metaslab_group_t *mg = msp->ms_group;
-	space_map_t *sm = msp->ms_map;
-	space_map_ops_t *sm_ops = msp->ms_group->mg_class->mc_ops;
-	int t;
-
 	ASSERT(MUTEX_HELD(&msp->ms_lock));
 
 	if ((msp->ms_weight & METASLAB_ACTIVE_MASK) == 0) {
-		space_map_load_wait(sm);
-		if (!sm->sm_loaded) {
-			space_map_obj_t *smo = &msp->ms_smo;
-
-			int error = space_map_load(sm, sm_ops, SM_FREE, smo,
-			    spa_meta_objset(msp->ms_group->mg_vd->vdev_spa));
-			if (error)  {
+		metaslab_load_wait(msp);
+		if (!msp->ms_loaded) {
+			int error = metaslab_load(msp);
+			if (error) {
 				metaslab_group_sort(msp->ms_group, msp, 0);
 				return (error);
 			}
-			for (t = 0; t < TXG_DEFER_SIZE; t++)
-				space_map_walk(msp->ms_defermap[t],
-				    space_map_claim, sm);
-
-		}
-
-		/*
-		 * Track the bonus area as we activate new metaslabs.
-		 */
-		if (sm->sm_start > mg->mg_bonus_area) {
-			mutex_enter(&mg->mg_lock);
-			mg->mg_bonus_area = sm->sm_start;
-			mutex_exit(&mg->mg_lock);
 		}
 
 		metaslab_group_sort(msp->ms_group, msp,
 		    msp->ms_weight | activation_weight);
 	}
-	ASSERT(sm->sm_loaded);
+	ASSERT(msp->ms_loaded);
 	ASSERT(msp->ms_weight & METASLAB_ACTIVE_MASK);
 
 	return (0);
@@ -1061,26 +1230,74 @@ metaslab_passivate(metaslab_t *msp, uint64_t size)
 	 * this metaslab again.  In that case, it had better be empty,
 	 * or we would be leaving space on the table.
 	 */
-	ASSERT(size >= SPA_MINBLOCKSIZE || msp->ms_map->sm_space == 0);
+	ASSERT(size >= SPA_MINBLOCKSIZE || range_tree_space(msp->ms_tree) == 0);
 	metaslab_group_sort(msp->ms_group, msp, MIN(msp->ms_weight, size));
 	ASSERT((msp->ms_weight & METASLAB_ACTIVE_MASK) == 0);
 }
 
+static void
+metaslab_preload(void *arg)
+{
+	metaslab_t *msp = arg;
+	spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+
+	mutex_enter(&msp->ms_lock);
+	metaslab_load_wait(msp);
+	if (!msp->ms_loaded)
+		(void) metaslab_load(msp);
+
+	/*
+	 * Set the ms_access_txg value so that we don't unload it right away.
+	 */
+	msp->ms_access_txg = spa_syncing_txg(spa) + metaslab_unload_delay + 1;
+	mutex_exit(&msp->ms_lock);
+}
+
+static void
+metaslab_group_preload(metaslab_group_t *mg)
+{
+	spa_t *spa = mg->mg_vd->vdev_spa;
+	metaslab_t *msp;
+	avl_tree_t *t = &mg->mg_metaslab_tree;
+	int m = 0;
+
+	if (spa_shutting_down(spa) || !metaslab_preload_enabled) {
+		taskq_wait(mg->mg_taskq);
+		return;
+	}
+	mutex_enter(&mg->mg_lock);
+
+	/*
+	 * Prefetch the next potential metaslabs
+	 */
+	for (msp = avl_first(t); msp != NULL; msp = AVL_NEXT(t, msp)) {
+
+		/* If we have reached our preload limit then we're done */
+		if (++m > metaslab_preload_limit)
+			break;
+
+		VERIFY(taskq_dispatch(mg->mg_taskq, metaslab_preload,
+		    msp, TQ_PUSHPAGE) != 0);
+	}
+	mutex_exit(&mg->mg_lock);
+}
+
 /*
- * Determine if the in-core space map representation can be condensed on-disk.
- * We would like to use the following criteria to make our decision:
+ * Determine if the space map's on-disk footprint is past our tolerance
+ * for inefficiency. We would like to use the following criteria to make
+ * our decision:
  *
  * 1. The size of the space map object should not dramatically increase as a
- * result of writing out our in-core free map.
+ * result of writing out the free space range tree.
  *
  * 2. The minimal on-disk space map representation is zfs_condense_pct/100
- * times the size than the in-core representation (i.e. zfs_condense_pct = 110
- * and in-core = 1MB, minimal = 1.1.MB).
+ * times the size than the free space range tree representation
+ * (i.e. zfs_condense_pct = 110 and in-core = 1MB, minimal = 1.1.MB).
  *
  * Checking the first condition is tricky since we don't want to walk
  * the entire AVL tree calculating the estimated on-disk size. Instead we
- * use the size-ordered AVL tree in the space map and calculate the
- * size required for the largest segment in our in-core free map. If the
+ * use the size-ordered range tree in the metaslab and calculate the
+ * size required to write out the largest segment in our free tree. If the
  * size required to represent that segment on disk is larger than the space
  * map object then we avoid condensing this map.
  *
@@ -1091,21 +1308,20 @@ metaslab_passivate(metaslab_t *msp, uint64_t size)
 static boolean_t
 metaslab_should_condense(metaslab_t *msp)
 {
-	space_map_t *sm = msp->ms_map;
-	space_map_obj_t *smo = &msp->ms_smo_syncing;
-	space_seg_t *ss;
+	space_map_t *sm = msp->ms_sm;
+	range_seg_t *rs;
 	uint64_t size, entries, segsz;
 
 	ASSERT(MUTEX_HELD(&msp->ms_lock));
-	ASSERT(sm->sm_loaded);
+	ASSERT(msp->ms_loaded);
 
 	/*
-	 * Use the sm_pp_root AVL tree, which is ordered by size, to obtain
-	 * the largest segment in the in-core free map. If the tree is
-	 * empty then we should condense the map.
+	 * Use the ms_size_tree range tree, which is ordered by size, to
+	 * obtain the largest segment in the free tree. If the tree is empty
+	 * then we should condense the map.
 	 */
-	ss = avl_last(sm->sm_pp_root);
-	if (ss == NULL)
+	rs = avl_last(&msp->ms_size_tree);
+	if (rs == NULL)
 		return (B_TRUE);
 
 	/*
@@ -1114,103 +1330,95 @@ metaslab_should_condense(metaslab_t *msp)
 	 * larger on-disk than the entire current on-disk structure, then
 	 * clearly condensing will increase the on-disk structure size.
 	 */
-	size = (ss->ss_end - ss->ss_start) >> sm->sm_shift;
+	size = (rs->rs_end - rs->rs_start) >> sm->sm_shift;
 	entries = size / (MIN(size, SM_RUN_MAX));
 	segsz = entries * sizeof (uint64_t);
 
-	return (segsz <= smo->smo_objsize &&
-	    smo->smo_objsize >= (zfs_condense_pct *
-	    sizeof (uint64_t) * avl_numnodes(&sm->sm_root)) / 100);
+	return (segsz <= space_map_length(msp->ms_sm) &&
+	    space_map_length(msp->ms_sm) >= (zfs_condense_pct *
+	    sizeof (uint64_t) * avl_numnodes(&msp->ms_tree->rt_root)) / 100);
 }
 
 /*
  * Condense the on-disk space map representation to its minimized form.
  * The minimized form consists of a small number of allocations followed by
- * the in-core free map.
+ * the entries of the free range tree.
  */
 static void
 metaslab_condense(metaslab_t *msp, uint64_t txg, dmu_tx_t *tx)
 {
 	spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
-	space_map_t *freemap = msp->ms_freemap[txg & TXG_MASK];
-	space_map_t condense_map;
-	space_map_t *sm = msp->ms_map;
-	objset_t *mos = spa_meta_objset(spa);
-	space_map_obj_t *smo = &msp->ms_smo_syncing;
+	range_tree_t *freetree = msp->ms_freetree[txg & TXG_MASK];
+	range_tree_t *condense_tree;
+	space_map_t *sm = msp->ms_sm;
 	int t;
 
 	ASSERT(MUTEX_HELD(&msp->ms_lock));
 	ASSERT3U(spa_sync_pass(spa), ==, 1);
-	ASSERT(sm->sm_loaded);
+	ASSERT(msp->ms_loaded);
 
 	spa_dbgmsg(spa, "condensing: txg %llu, msp[%llu] %p, "
-	    "smo size %llu, segments %lu", txg,
-	    (msp->ms_map->sm_start / msp->ms_map->sm_size), msp,
-	    smo->smo_objsize, avl_numnodes(&sm->sm_root));
+	    "smp size %llu, segments %lu", txg, msp->ms_id, msp,
+	    space_map_length(msp->ms_sm), avl_numnodes(&msp->ms_tree->rt_root));
 
 	/*
-	 * Create an map that is a 100% allocated map. We remove segments
+	 * Create an range tree that is 100% allocated. We remove segments
 	 * that have been freed in this txg, any deferred frees that exist,
 	 * and any allocation in the future. Removing segments should be
-	 * a relatively inexpensive operation since we expect these maps to
-	 * a small number of nodes.
+	 * a relatively inexpensive operation since we expect these trees to
+	 * have a small number of nodes.
 	 */
-	space_map_create(&condense_map, sm->sm_start, sm->sm_size,
-	    sm->sm_shift, sm->sm_lock);
-	space_map_add(&condense_map, condense_map.sm_start,
-	    condense_map.sm_size);
+	condense_tree = range_tree_create(NULL, NULL, &msp->ms_lock);
+	range_tree_add(condense_tree, msp->ms_start, msp->ms_size);
 
 	/*
-	 * Remove what's been freed in this txg from the condense_map.
+	 * Remove what's been freed in this txg from the condense_tree.
 	 * Since we're in sync_pass 1, we know that all the frees from
-	 * this txg are in the freemap.
+	 * this txg are in the freetree.
 	 */
-	space_map_walk(freemap, space_map_remove, &condense_map);
+	range_tree_walk(freetree, range_tree_remove, condense_tree);
 
-	for (t = 0; t < TXG_DEFER_SIZE; t++)
-		space_map_walk(msp->ms_defermap[t],
-		    space_map_remove, &condense_map);
+	for (t = 0; t < TXG_DEFER_SIZE; t++) {
+		range_tree_walk(msp->ms_defertree[t],
+		    range_tree_remove, condense_tree);
+	}
 
-	for (t = 1; t < TXG_CONCURRENT_STATES; t++)
-		space_map_walk(msp->ms_allocmap[(txg + t) & TXG_MASK],
-		    space_map_remove, &condense_map);
+	for (t = 1; t < TXG_CONCURRENT_STATES; t++) {
+		range_tree_walk(msp->ms_alloctree[(txg + t) & TXG_MASK],
+		    range_tree_remove, condense_tree);
+	}
 
 	/*
 	 * We're about to drop the metaslab's lock thus allowing
 	 * other consumers to change it's content. Set the
-	 * space_map's sm_condensing flag to ensure that
+	 * metaslab's ms_condensing flag to ensure that
 	 * allocations on this metaslab do not occur while we're
 	 * in the middle of committing it to disk. This is only critical
-	 * for the ms_map as all other space_maps use per txg
+	 * for the ms_tree as all other range trees use per txg
 	 * views of their content.
 	 */
-	sm->sm_condensing = B_TRUE;
+	msp->ms_condensing = B_TRUE;
 
 	mutex_exit(&msp->ms_lock);
-	space_map_truncate(smo, mos, tx);
+	space_map_truncate(sm, tx);
 	mutex_enter(&msp->ms_lock);
 
 	/*
 	 * While we would ideally like to create a space_map representation
 	 * that consists only of allocation records, doing so can be
-	 * prohibitively expensive because the in-core free map can be
+	 * prohibitively expensive because the in-core free tree can be
 	 * large, and therefore computationally expensive to subtract
-	 * from the condense_map. Instead we sync out two maps, a cheap
-	 * allocation only map followed by the in-core free map. While not
+	 * from the condense_tree. Instead we sync out two trees, a cheap
+	 * allocation only tree followed by the in-core free tree. While not
 	 * optimal, this is typically close to optimal, and much cheaper to
 	 * compute.
 	 */
-	space_map_sync(&condense_map, SM_ALLOC, smo, mos, tx);
-	space_map_vacate(&condense_map, NULL, NULL);
-	space_map_destroy(&condense_map);
-
-	space_map_sync(sm, SM_FREE, smo, mos, tx);
-	sm->sm_condensing = B_FALSE;
+	space_map_write(sm, condense_tree, SM_ALLOC, tx);
+	range_tree_vacate(condense_tree, NULL, NULL);
+	range_tree_destroy(condense_tree);
 
-	spa_dbgmsg(spa, "condensed: txg %llu, msp[%llu] %p, "
-	    "smo size %llu", txg,
-	    (msp->ms_map->sm_start / msp->ms_map->sm_size), msp,
-	    smo->smo_objsize);
+	space_map_write(sm, msp->ms_tree, SM_FREE, tx);
+	msp->ms_condensing = B_FALSE;
 }
 
 /*
@@ -1219,94 +1427,113 @@ metaslab_condense(metaslab_t *msp, uint64_t txg, dmu_tx_t *tx)
 void
 metaslab_sync(metaslab_t *msp, uint64_t txg)
 {
-	vdev_t *vd = msp->ms_group->mg_vd;
+	metaslab_group_t *mg = msp->ms_group;
+	vdev_t *vd = mg->mg_vd;
 	spa_t *spa = vd->vdev_spa;
 	objset_t *mos = spa_meta_objset(spa);
-	space_map_t *allocmap = msp->ms_allocmap[txg & TXG_MASK];
-	space_map_t **freemap = &msp->ms_freemap[txg & TXG_MASK];
-	space_map_t **freed_map = &msp->ms_freemap[TXG_CLEAN(txg) & TXG_MASK];
-	space_map_t *sm = msp->ms_map;
-	space_map_obj_t *smo = &msp->ms_smo_syncing;
-	dmu_buf_t *db;
+	range_tree_t *alloctree = msp->ms_alloctree[txg & TXG_MASK];
+	range_tree_t **freetree = &msp->ms_freetree[txg & TXG_MASK];
+	range_tree_t **freed_tree =
+	    &msp->ms_freetree[TXG_CLEAN(txg) & TXG_MASK];
 	dmu_tx_t *tx;
+	uint64_t object = space_map_object(msp->ms_sm);
 
 	ASSERT(!vd->vdev_ishole);
 
 	/*
 	 * This metaslab has just been added so there's no work to do now.
 	 */
-	if (*freemap == NULL) {
-		ASSERT3P(allocmap, ==, NULL);
+	if (*freetree == NULL) {
+		ASSERT3P(alloctree, ==, NULL);
 		return;
 	}
 
-	ASSERT3P(allocmap, !=, NULL);
-	ASSERT3P(*freemap, !=, NULL);
-	ASSERT3P(*freed_map, !=, NULL);
+	ASSERT3P(alloctree, !=, NULL);
+	ASSERT3P(*freetree, !=, NULL);
+	ASSERT3P(*freed_tree, !=, NULL);
 
-	if (allocmap->sm_space == 0 && (*freemap)->sm_space == 0)
+	if (range_tree_space(alloctree) == 0 &&
+	    range_tree_space(*freetree) == 0)
 		return;
 
 	/*
 	 * The only state that can actually be changing concurrently with
-	 * metaslab_sync() is the metaslab's ms_map.  No other thread can
-	 * be modifying this txg's allocmap, freemap, freed_map, or smo.
-	 * Therefore, we only hold ms_lock to satify space_map ASSERTs.
-	 * We drop it whenever we call into the DMU, because the DMU
-	 * can call down to us (e.g. via zio_free()) at any time.
+	 * metaslab_sync() is the metaslab's ms_tree.  No other thread can
+	 * be modifying this txg's alloctree, freetree, freed_tree, or
+	 * space_map_phys_t. Therefore, we only hold ms_lock to satify
+	 * space_map ASSERTs. We drop it whenever we call into the DMU,
+	 * because the DMU can call down to us (e.g. via zio_free()) at
+	 * any time.
 	 */
 
 	tx = dmu_tx_create_assigned(spa_get_dsl(spa), txg);
 
-	if (smo->smo_object == 0) {
-		ASSERT(smo->smo_objsize == 0);
-		ASSERT(smo->smo_alloc == 0);
-		smo->smo_object = dmu_object_alloc(mos,
-		    DMU_OT_SPACE_MAP, 1 << SPACE_MAP_BLOCKSHIFT,
-		    DMU_OT_SPACE_MAP_HEADER, sizeof (*smo), tx);
-		ASSERT(smo->smo_object != 0);
-		dmu_write(mos, vd->vdev_ms_array, sizeof (uint64_t) *
-		    (sm->sm_start >> vd->vdev_ms_shift),
-		    sizeof (uint64_t), &smo->smo_object, tx);
+	if (msp->ms_sm == NULL) {
+		uint64_t new_object;
+
+		new_object = space_map_alloc(mos, tx);
+		VERIFY3U(new_object, !=, 0);
+
+		VERIFY0(space_map_open(&msp->ms_sm, mos, new_object,
+		    msp->ms_start, msp->ms_size, vd->vdev_ashift,
+		    &msp->ms_lock));
+		ASSERT(msp->ms_sm != NULL);
 	}
 
 	mutex_enter(&msp->ms_lock);
 
-	if (sm->sm_loaded && spa_sync_pass(spa) == 1 &&
+	if (msp->ms_loaded && spa_sync_pass(spa) == 1 &&
 	    metaslab_should_condense(msp)) {
 		metaslab_condense(msp, txg, tx);
 	} else {
-		space_map_sync(allocmap, SM_ALLOC, smo, mos, tx);
-		space_map_sync(*freemap, SM_FREE, smo, mos, tx);
+		space_map_write(msp->ms_sm, alloctree, SM_ALLOC, tx);
+		space_map_write(msp->ms_sm, *freetree, SM_FREE, tx);
 	}
 
-	space_map_vacate(allocmap, NULL, NULL);
+	range_tree_vacate(alloctree, NULL, NULL);
+
+	if (msp->ms_loaded) {
+		/*
+		 * When the space map is loaded, we have an accruate
+		 * histogram in the range tree. This gives us an opportunity
+		 * to bring the space map's histogram up-to-date so we clear
+		 * it first before updating it.
+		 */
+		space_map_histogram_clear(msp->ms_sm);
+		space_map_histogram_add(msp->ms_sm, msp->ms_tree, tx);
+	} else {
+		/*
+		 * Since the space map is not loaded we simply update the
+		 * exisiting histogram with what was freed in this txg. This
+		 * means that the on-disk histogram may not have an accurate
+		 * view of the free space but it's close enough to allow
+		 * us to make allocation decisions.
+		 */
+		space_map_histogram_add(msp->ms_sm, *freetree, tx);
+	}
 
 	/*
-	 * For sync pass 1, we avoid walking the entire space map and
-	 * instead will just swap the pointers for freemap and
-	 * freed_map. We can safely do this since the freed_map is
+	 * For sync pass 1, we avoid traversing this txg's free range tree
+	 * and instead will just swap the pointers for freetree and
+	 * freed_tree. We can safely do this since the freed_tree is
 	 * guaranteed to be empty on the initial pass.
 	 */
 	if (spa_sync_pass(spa) == 1) {
-		ASSERT0((*freed_map)->sm_space);
-		ASSERT0(avl_numnodes(&(*freed_map)->sm_root));
-		space_map_swap(freemap, freed_map);
+		range_tree_swap(freetree, freed_tree);
 	} else {
-		space_map_vacate(*freemap, space_map_add, *freed_map);
+		range_tree_vacate(*freetree, range_tree_add, *freed_tree);
 	}
 
-	ASSERT0(msp->ms_allocmap[txg & TXG_MASK]->sm_space);
-	ASSERT0(msp->ms_freemap[txg & TXG_MASK]->sm_space);
+	ASSERT0(range_tree_space(msp->ms_alloctree[txg & TXG_MASK]));
+	ASSERT0(range_tree_space(msp->ms_freetree[txg & TXG_MASK]));
 
 	mutex_exit(&msp->ms_lock);
 
-	VERIFY0(dmu_bonus_hold(mos, smo->smo_object, FTAG, &db));
-	dmu_buf_will_dirty(db, tx);
-	ASSERT3U(db->db_size, >=, sizeof (*smo));
-	bcopy(smo, db->db_data, sizeof (*smo));
-	dmu_buf_rele(db, FTAG);
-
+	if (object != space_map_object(msp->ms_sm)) {
+		object = space_map_object(msp->ms_sm);
+		dmu_write(mos, vd->vdev_ms_array, sizeof (uint64_t) *
+		    msp->ms_id, sizeof (uint64_t), &object, tx);
+	}
 	dmu_tx_commit(tx);
 }
 
@@ -1317,13 +1544,10 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
 void
 metaslab_sync_done(metaslab_t *msp, uint64_t txg)
 {
-	space_map_obj_t *smo = &msp->ms_smo;
-	space_map_obj_t *smosync = &msp->ms_smo_syncing;
-	space_map_t *sm = msp->ms_map;
-	space_map_t **freed_map = &msp->ms_freemap[TXG_CLEAN(txg) & TXG_MASK];
-	space_map_t **defer_map = &msp->ms_defermap[txg % TXG_DEFER_SIZE];
 	metaslab_group_t *mg = msp->ms_group;
 	vdev_t *vd = mg->mg_vd;
+	range_tree_t **freed_tree;
+	range_tree_t **defer_tree;
 	int64_t alloc_delta, defer_delta;
 	int t;
 
@@ -1333,63 +1557,63 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
 
 	/*
 	 * If this metaslab is just becoming available, initialize its
-	 * allocmaps, freemaps, and defermap and add its capacity to the vdev.
+	 * alloctrees, freetrees, and defertree and add its capacity to
+	 * the vdev.
 	 */
-	if (*freed_map == NULL) {
-		ASSERT(*defer_map == NULL);
+	if (msp->ms_freetree[TXG_CLEAN(txg) & TXG_MASK] == NULL) {
 		for (t = 0; t < TXG_SIZE; t++) {
-			msp->ms_allocmap[t] = kmem_zalloc(sizeof (space_map_t),
-			    KM_PUSHPAGE);
-			space_map_create(msp->ms_allocmap[t], sm->sm_start,
-			    sm->sm_size, sm->sm_shift, sm->sm_lock);
-			msp->ms_freemap[t] = kmem_zalloc(sizeof (space_map_t),
-			    KM_PUSHPAGE);
-			space_map_create(msp->ms_freemap[t], sm->sm_start,
-			    sm->sm_size, sm->sm_shift, sm->sm_lock);
+			ASSERT(msp->ms_alloctree[t] == NULL);
+			ASSERT(msp->ms_freetree[t] == NULL);
+
+			msp->ms_alloctree[t] = range_tree_create(NULL, msp,
+			    &msp->ms_lock);
+			msp->ms_freetree[t] = range_tree_create(NULL, msp,
+			    &msp->ms_lock);
 		}
 
 		for (t = 0; t < TXG_DEFER_SIZE; t++) {
-			msp->ms_defermap[t] = kmem_zalloc(sizeof (space_map_t),
-			    KM_PUSHPAGE);
-			space_map_create(msp->ms_defermap[t], sm->sm_start,
-			    sm->sm_size, sm->sm_shift, sm->sm_lock);
-		}
+			ASSERT(msp->ms_defertree[t] == NULL);
 
-		freed_map = &msp->ms_freemap[TXG_CLEAN(txg) & TXG_MASK];
-		defer_map = &msp->ms_defermap[txg % TXG_DEFER_SIZE];
+			msp->ms_defertree[t] = range_tree_create(NULL, msp,
+			    &msp->ms_lock);
+		}
 
-		vdev_space_update(vd, 0, 0, sm->sm_size);
+		vdev_space_update(vd, 0, 0, msp->ms_size);
 	}
 
-	alloc_delta = smosync->smo_alloc - smo->smo_alloc;
-	defer_delta = (*freed_map)->sm_space - (*defer_map)->sm_space;
+	freed_tree = &msp->ms_freetree[TXG_CLEAN(txg) & TXG_MASK];
+	defer_tree = &msp->ms_defertree[txg % TXG_DEFER_SIZE];
+
+	alloc_delta = space_map_alloc_delta(msp->ms_sm);
+	defer_delta = range_tree_space(*freed_tree) -
+	    range_tree_space(*defer_tree);
 
 	vdev_space_update(vd, alloc_delta + defer_delta, defer_delta, 0);
 
-	ASSERT(msp->ms_allocmap[txg & TXG_MASK]->sm_space == 0);
-	ASSERT(msp->ms_freemap[txg & TXG_MASK]->sm_space == 0);
+	ASSERT0(range_tree_space(msp->ms_alloctree[txg & TXG_MASK]));
+	ASSERT0(range_tree_space(msp->ms_freetree[txg & TXG_MASK]));
 
 	/*
-	 * If there's a space_map_load() in progress, wait for it to complete
+	 * If there's a metaslab_load() in progress, wait for it to complete
 	 * so that we have a consistent view of the in-core space map.
 	 */
-	space_map_load_wait(sm);
+	metaslab_load_wait(msp);
 
 	/*
-	 * Move the frees from the defer_map to this map (if it's loaded).
-	 * Swap the freed_map and the defer_map -- this is safe to do
-	 * because we've just emptied out the defer_map.
+	 * Move the frees from the defer_tree back to the free
+	 * range tree (if it's loaded). Swap the freed_tree and the
+	 * defer_tree -- this is safe to do because we've just emptied out
+	 * the defer_tree.
 	 */
-	space_map_vacate(*defer_map, sm->sm_loaded ? space_map_free : NULL, sm);
-	ASSERT0((*defer_map)->sm_space);
-	ASSERT0(avl_numnodes(&(*defer_map)->sm_root));
-	space_map_swap(freed_map, defer_map);
+	range_tree_vacate(*defer_tree,
+	    msp->ms_loaded ? range_tree_add : NULL, msp->ms_tree);
+	range_tree_swap(freed_tree, defer_tree);
 
-	*smo = *smosync;
+	space_map_update(msp->ms_sm);
 
 	msp->ms_deferspace += defer_delta;
 	ASSERT3S(msp->ms_deferspace, >=, 0);
-	ASSERT3S(msp->ms_deferspace, <=, sm->sm_size);
+	ASSERT3S(msp->ms_deferspace, <=, msp->ms_size);
 	if (msp->ms_deferspace != 0) {
 		/*
 		 * Keep syncing this metaslab until all deferred frees
@@ -1398,57 +1622,33 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
 		vdev_dirty(vd, VDD_METASLAB, msp, txg + 1);
 	}
 
-	/*
-	 * If the map is loaded but no longer active, evict it as soon as all
-	 * future allocations have synced.  (If we unloaded it now and then
-	 * loaded a moment later, the map wouldn't reflect those allocations.)
-	 */
-	if (sm->sm_loaded && (msp->ms_weight & METASLAB_ACTIVE_MASK) == 0) {
-		int evictable = 1;
-
-		for (t = 1; t < TXG_CONCURRENT_STATES; t++)
-			if (msp->ms_allocmap[(txg + t) & TXG_MASK]->sm_space)
-				evictable = 0;
+	if (msp->ms_loaded && msp->ms_access_txg < txg) {
+		for (t = 1; t < TXG_CONCURRENT_STATES; t++) {
+			VERIFY0(range_tree_space(
+			    msp->ms_alloctree[(txg + t) & TXG_MASK]));
+		}
 
-		if (evictable && !metaslab_debug_unload)
-			space_map_unload(sm);
+		if (!metaslab_debug_unload)
+			metaslab_unload(msp);
 	}
 
 	metaslab_group_sort(mg, msp, metaslab_weight(msp));
-
 	mutex_exit(&msp->ms_lock);
+
 }
 
 void
 metaslab_sync_reassess(metaslab_group_t *mg)
 {
-	vdev_t *vd = mg->mg_vd;
 	int64_t failures = mg->mg_alloc_failures;
-	int m;
 
 	metaslab_group_alloc_update(mg);
-
-	/*
-	 * Re-evaluate all metaslabs which have lower offsets than the
-	 * bonus area.
-	 */
-	for (m = 0; m < vd->vdev_ms_count; m++) {
-		metaslab_t *msp = vd->vdev_ms[m];
-
-		if (msp->ms_map->sm_start > mg->mg_bonus_area)
-			break;
-
-		mutex_enter(&msp->ms_lock);
-		metaslab_group_sort(mg, msp, metaslab_weight(msp));
-		mutex_exit(&msp->ms_lock);
-	}
-
 	atomic_add_64(&mg->mg_alloc_failures, -failures);
 
 	/*
-	 * Prefetch the next potential metaslabs
+	 * Preload the next potential metaslabs
 	 */
-	metaslab_prefetch(mg);
+	metaslab_group_preload(mg);
 }
 
 static uint64_t
@@ -1456,7 +1656,7 @@ metaslab_distance(metaslab_t *msp, dva_t *dva)
 {
 	uint64_t ms_shift = msp->ms_group->mg_vd->vdev_ms_shift;
 	uint64_t offset = DVA_GET_OFFSET(dva) >> ms_shift;
-	uint64_t start = msp->ms_map->sm_start >> ms_shift;
+	uint64_t start = msp->ms_id;
 
 	if (msp->ms_group->mg_vd->vdev_id != DVA_GET_VDEV(dva))
 		return (1ULL << 63);
@@ -1508,7 +1708,7 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
 			/*
 			 * If the selected metaslab is condensing, skip it.
 			 */
-			if (msp->ms_map->sm_condensing)
+			if (msp->ms_condensing)
 				continue;
 
 			was_active = msp->ms_weight & METASLAB_ACTIVE_MASK;
@@ -1516,7 +1716,8 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
 				break;
 
 			target_distance = min_distance +
-			    (msp->ms_smo.smo_alloc ? 0 : min_distance >> 1);
+			    (space_map_allocated(msp->ms_sm) != 0 ? 0 :
+			    min_distance >> 1);
 
 			for (i = 0; i < d; i++)
 				if (metaslab_distance(msp, &dva[i]) <
@@ -1543,9 +1744,10 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
 		    CAN_FASTGANG(flags) && psize > SPA_GANGBLOCKSIZE &&
 		    activation_weight == METASLAB_WEIGHT_PRIMARY) {
 			spa_dbgmsg(spa, "%s: skipping metaslab group: "
-			    "vdev %llu, txg %llu, mg %p, psize %llu, "
-			    "asize %llu, failures %llu", spa_name(spa),
-			    mg->mg_vd->vdev_id, txg, mg, psize, asize,
+			    "vdev %llu, txg %llu, mg %p, msp[%llu] %p, "
+			    "psize %llu, asize %llu, failures %llu",
+			    spa_name(spa), mg->mg_vd->vdev_id, txg, mg,
+			    msp->ms_id, msp, psize, asize,
 			    mg->mg_alloc_failures);
 			mutex_exit(&msp->ms_lock);
 			return (-1ULL);
@@ -1582,25 +1784,25 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
 		 * we can't manipulate this metaslab until it's committed
 		 * to disk.
 		 */
-		if (msp->ms_map->sm_condensing) {
+		if (msp->ms_condensing) {
 			mutex_exit(&msp->ms_lock);
 			continue;
 		}
 
-		if ((offset = space_map_alloc(msp->ms_map, asize)) != -1ULL)
+		if ((offset = metaslab_block_alloc(msp, asize)) != -1ULL)
 			break;
 
 		atomic_inc_64(&mg->mg_alloc_failures);
 
-		metaslab_passivate(msp, space_map_maxsize(msp->ms_map));
-
+		metaslab_passivate(msp, metaslab_block_maxsize(msp));
 		mutex_exit(&msp->ms_lock);
 	}
 
-	if (msp->ms_allocmap[txg & TXG_MASK]->sm_space == 0)
+	if (range_tree_space(msp->ms_alloctree[txg & TXG_MASK]) == 0)
 		vdev_dirty(mg->mg_vd, VDD_METASLAB, msp, txg);
 
-	space_map_add(msp->ms_allocmap[txg & TXG_MASK], offset, asize);
+	range_tree_add(msp->ms_alloctree[txg & TXG_MASK], offset, asize);
+	msp->ms_access_txg = txg + metaslab_unload_delay;
 
 	mutex_exit(&msp->ms_lock);
 
@@ -1869,13 +2071,22 @@ metaslab_free_dva(spa_t *spa, const dva_t *dva, uint64_t txg, boolean_t now)
 	mutex_enter(&msp->ms_lock);
 
 	if (now) {
-		space_map_remove(msp->ms_allocmap[txg & TXG_MASK],
+		range_tree_remove(msp->ms_alloctree[txg & TXG_MASK],
 		    offset, size);
-		space_map_free(msp->ms_map, offset, size);
+
+		VERIFY(!msp->ms_condensing);
+		VERIFY3U(offset, >=, msp->ms_start);
+		VERIFY3U(offset + size, <=, msp->ms_start + msp->ms_size);
+		VERIFY3U(range_tree_space(msp->ms_tree) + size, <=,
+		    msp->ms_size);
+		VERIFY0(P2PHASE(offset, 1ULL << vd->vdev_ashift));
+		VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
+		range_tree_add(msp->ms_tree, offset, size);
 	} else {
-		if (msp->ms_freemap[txg & TXG_MASK]->sm_space == 0)
+		if (range_tree_space(msp->ms_freetree[txg & TXG_MASK]) == 0)
 			vdev_dirty(vd, VDD_METASLAB, msp, txg);
-		space_map_add(msp->ms_freemap[txg & TXG_MASK], offset, size);
+		range_tree_add(msp->ms_freetree[txg & TXG_MASK],
+		    offset, size);
 	}
 
 	mutex_exit(&msp->ms_lock);
@@ -1910,10 +2121,10 @@ metaslab_claim_dva(spa_t *spa, const dva_t *dva, uint64_t txg)
 
 	mutex_enter(&msp->ms_lock);
 
-	if ((txg != 0 && spa_writeable(spa)) || !msp->ms_map->sm_loaded)
+	if ((txg != 0 && spa_writeable(spa)) || !msp->ms_loaded)
 		error = metaslab_activate(msp, METASLAB_WEIGHT_SECONDARY);
 
-	if (error == 0 && !space_map_contains(msp->ms_map, offset, size))
+	if (error == 0 && !range_tree_contains(msp->ms_tree, offset, size))
 		error = SET_ERROR(ENOENT);
 
 	if (error || txg == 0) {	/* txg == 0 indicates dry run */
@@ -1921,12 +2132,16 @@ metaslab_claim_dva(spa_t *spa, const dva_t *dva, uint64_t txg)
 		return (error);
 	}
 
-	space_map_claim(msp->ms_map, offset, size);
+	VERIFY(!msp->ms_condensing);
+	VERIFY0(P2PHASE(offset, 1ULL << vd->vdev_ashift));
+	VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
+	VERIFY3U(range_tree_space(msp->ms_tree) - size, <=, msp->ms_size);
+	range_tree_remove(msp->ms_tree, offset, size);
 
 	if (spa_writeable(spa)) {	/* don't dirty if we're zdb(1M) */
-		if (msp->ms_allocmap[txg & TXG_MASK]->sm_space == 0)
+		if (range_tree_space(msp->ms_alloctree[txg & TXG_MASK]) == 0)
 			vdev_dirty(vd, VDD_METASLAB, msp, txg);
-		space_map_add(msp->ms_allocmap[txg & TXG_MASK], offset, size);
+		range_tree_add(msp->ms_alloctree[txg & TXG_MASK], offset, size);
 	}
 
 	mutex_exit(&msp->ms_lock);
@@ -1959,7 +2174,7 @@ metaslab_alloc(spa_t *spa, metaslab_class_t *mc, uint64_t psize, blkptr_t *bp,
 	for (d = 0; d < ndvas; d++) {
 		error = metaslab_alloc_dva(spa, mc, psize, dva, d, hintdva,
 		    txg, flags);
-		if (error) {
+		if (error != 0) {
 			for (d--; d >= 0; d--) {
 				metaslab_free_dva(spa, &dva[d], txg, B_TRUE);
 				bzero(&dva[d], sizeof (dva_t));
@@ -2073,19 +2288,6 @@ metaslab_fastwrite_unmark(spa_t *spa, const blkptr_t *bp)
 	spa_config_exit(spa, SCL_VDEV, FTAG);
 }
 
-static void
-checkmap(space_map_t *sm, uint64_t off, uint64_t size)
-{
-	space_seg_t *ss;
-	avl_index_t where;
-
-	mutex_enter(sm->sm_lock);
-	ss = space_map_find(sm, off, size, &where);
-	if (ss != NULL)
-		panic("freeing free block; ss=%p", (void *)ss);
-	mutex_exit(sm->sm_lock);
-}
-
 void
 metaslab_check_free(spa_t *spa, const blkptr_t *bp)
 {
@@ -2096,28 +2298,28 @@ metaslab_check_free(spa_t *spa, const blkptr_t *bp)
 
 	spa_config_enter(spa, SCL_VDEV, FTAG, RW_READER);
 	for (i = 0; i < BP_GET_NDVAS(bp); i++) {
-		uint64_t vdid = DVA_GET_VDEV(&bp->blk_dva[i]);
-		vdev_t *vd = vdev_lookup_top(spa, vdid);
-		uint64_t off = DVA_GET_OFFSET(&bp->blk_dva[i]);
+		uint64_t vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
+		vdev_t *vd = vdev_lookup_top(spa, vdev);
+		uint64_t offset = DVA_GET_OFFSET(&bp->blk_dva[i]);
 		uint64_t size = DVA_GET_ASIZE(&bp->blk_dva[i]);
-		metaslab_t *ms = vd->vdev_ms[off >> vd->vdev_ms_shift];
+		metaslab_t *msp = vd->vdev_ms[offset >> vd->vdev_ms_shift];
 
-		if (ms->ms_map->sm_loaded)
-			checkmap(ms->ms_map, off, size);
+		if (msp->ms_loaded)
+			range_tree_verify(msp->ms_tree, offset, size);
 
 		for (j = 0; j < TXG_SIZE; j++)
-			checkmap(ms->ms_freemap[j], off, size);
+			range_tree_verify(msp->ms_freetree[j], offset, size);
 		for (j = 0; j < TXG_DEFER_SIZE; j++)
-			checkmap(ms->ms_defermap[j], off, size);
+			range_tree_verify(msp->ms_defertree[j], offset, size);
 	}
 	spa_config_exit(spa, SCL_VDEV, FTAG);
 }
 
 #if defined(_KERNEL) && defined(HAVE_SPL)
 module_param(metaslab_debug_load, int, 0644);
-MODULE_PARM_DESC(metaslab_debug_load, "load all metaslabs during pool import");
-
 module_param(metaslab_debug_unload, int, 0644);
+MODULE_PARM_DESC(metaslab_debug_load,
+	"load all metaslabs when pool is first opened");
 MODULE_PARM_DESC(metaslab_debug_unload,
 	"prevent metaslabs from being unloaded");
 
diff --git a/module/zfs/range_tree.c b/module/zfs/range_tree.c
new file mode 100644
index 000000000..cb4641078
--- /dev/null
+++ b/module/zfs/range_tree.c
@@ -0,0 +1,391 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License (the "License").
+ * You may not use this file except in compliance with the License.
+ *
+ * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
+ * or http://www.opensolaris.org/os/licensing.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information: Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ */
+/*
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
+ * Use is subject to license terms.
+ */
+/*
+ * Copyright (c) 2013 by Delphix. All rights reserved.
+ */
+
+#include <sys/zfs_context.h>
+#include <sys/spa.h>
+#include <sys/dmu.h>
+#include <sys/dnode.h>
+#include <sys/zio.h>
+#include <sys/range_tree.h>
+
+static kmem_cache_t *range_seg_cache;
+
+void
+range_tree_init(void)
+{
+	ASSERT(range_seg_cache == NULL);
+	range_seg_cache = kmem_cache_create("range_seg_cache",
+	    sizeof (range_seg_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
+}
+
+void
+range_tree_fini(void)
+{
+	kmem_cache_destroy(range_seg_cache);
+	range_seg_cache = NULL;
+}
+
+void
+range_tree_stat_verify(range_tree_t *rt)
+{
+	range_seg_t *rs;
+	uint64_t hist[RANGE_TREE_HISTOGRAM_SIZE] = { 0 };
+	int i;
+
+	for (rs = avl_first(&rt->rt_root); rs != NULL;
+	    rs = AVL_NEXT(&rt->rt_root, rs)) {
+		uint64_t size = rs->rs_end - rs->rs_start;
+		int idx	= highbit(size) - 1;
+
+		hist[idx]++;
+		ASSERT3U(hist[idx], !=, 0);
+	}
+
+	for (i = 0; i < RANGE_TREE_HISTOGRAM_SIZE; i++) {
+		if (hist[i] != rt->rt_histogram[i]) {
+			zfs_dbgmsg("i=%d, hist=%p, hist=%llu, rt_hist=%llu",
+			    i, hist, hist[i], rt->rt_histogram[i]);
+		}
+		VERIFY3U(hist[i], ==, rt->rt_histogram[i]);
+	}
+}
+
+static void
+range_tree_stat_incr(range_tree_t *rt, range_seg_t *rs)
+{
+	uint64_t size = rs->rs_end - rs->rs_start;
+	int idx = highbit(size) - 1;
+
+	ASSERT3U(idx, <,
+	    sizeof (rt->rt_histogram) / sizeof (*rt->rt_histogram));
+
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+	rt->rt_histogram[idx]++;
+	ASSERT3U(rt->rt_histogram[idx], !=, 0);
+}
+
+static void
+range_tree_stat_decr(range_tree_t *rt, range_seg_t *rs)
+{
+	uint64_t size = rs->rs_end - rs->rs_start;
+	int idx = highbit(size) - 1;
+
+	ASSERT3U(idx, <,
+	    sizeof (rt->rt_histogram) / sizeof (*rt->rt_histogram));
+
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+	ASSERT3U(rt->rt_histogram[idx], !=, 0);
+	rt->rt_histogram[idx]--;
+}
+
+/*
+ * NOTE: caller is responsible for all locking.
+ */
+static int
+range_tree_seg_compare(const void *x1, const void *x2)
+{
+	const range_seg_t *r1 = x1;
+	const range_seg_t *r2 = x2;
+
+	if (r1->rs_start < r2->rs_start) {
+		if (r1->rs_end > r2->rs_start)
+			return (0);
+		return (-1);
+	}
+	if (r1->rs_start > r2->rs_start) {
+		if (r1->rs_start < r2->rs_end)
+			return (0);
+		return (1);
+	}
+	return (0);
+}
+
+range_tree_t *
+range_tree_create(range_tree_ops_t *ops, void *arg, kmutex_t *lp)
+{
+	range_tree_t *rt;
+
+	rt = kmem_zalloc(sizeof (range_tree_t), KM_PUSHPAGE);
+
+	avl_create(&rt->rt_root, range_tree_seg_compare,
+	    sizeof (range_seg_t), offsetof(range_seg_t, rs_node));
+
+	rt->rt_lock = lp;
+	rt->rt_ops = ops;
+	rt->rt_arg = arg;
+
+	if (rt->rt_ops != NULL)
+		rt->rt_ops->rtop_create(rt, rt->rt_arg);
+
+	return (rt);
+}
+
+void
+range_tree_destroy(range_tree_t *rt)
+{
+	VERIFY0(rt->rt_space);
+
+	if (rt->rt_ops != NULL)
+		rt->rt_ops->rtop_destroy(rt, rt->rt_arg);
+
+	avl_destroy(&rt->rt_root);
+	kmem_free(rt, sizeof (*rt));
+}
+
+void
+range_tree_add(void *arg, uint64_t start, uint64_t size)
+{
+	range_tree_t *rt = arg;
+	avl_index_t where;
+	range_seg_t rsearch, *rs_before, *rs_after, *rs;
+	uint64_t end = start + size;
+	boolean_t merge_before, merge_after;
+
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+	VERIFY(size != 0);
+
+	rsearch.rs_start = start;
+	rsearch.rs_end = end;
+	rs = avl_find(&rt->rt_root, &rsearch, &where);
+
+	if (rs != NULL && rs->rs_start <= start && rs->rs_end >= end) {
+		zfs_panic_recover("zfs: allocating allocated segment"
+		    "(offset=%llu size=%llu)\n",
+		    (longlong_t)start, (longlong_t)size);
+		return;
+	}
+
+	/* Make sure we don't overlap with either of our neighbors */
+	VERIFY(rs == NULL);
+
+	rs_before = avl_nearest(&rt->rt_root, where, AVL_BEFORE);
+	rs_after = avl_nearest(&rt->rt_root, where, AVL_AFTER);
+
+	merge_before = (rs_before != NULL && rs_before->rs_end == start);
+	merge_after = (rs_after != NULL && rs_after->rs_start == end);
+
+	if (merge_before && merge_after) {
+		avl_remove(&rt->rt_root, rs_before);
+		if (rt->rt_ops != NULL) {
+			rt->rt_ops->rtop_remove(rt, rs_before, rt->rt_arg);
+			rt->rt_ops->rtop_remove(rt, rs_after, rt->rt_arg);
+		}
+
+		range_tree_stat_decr(rt, rs_before);
+		range_tree_stat_decr(rt, rs_after);
+
+		rs_after->rs_start = rs_before->rs_start;
+		kmem_cache_free(range_seg_cache, rs_before);
+		rs = rs_after;
+	} else if (merge_before) {
+		if (rt->rt_ops != NULL)
+			rt->rt_ops->rtop_remove(rt, rs_before, rt->rt_arg);
+
+		range_tree_stat_decr(rt, rs_before);
+
+		rs_before->rs_end = end;
+		rs = rs_before;
+	} else if (merge_after) {
+		if (rt->rt_ops != NULL)
+			rt->rt_ops->rtop_remove(rt, rs_after, rt->rt_arg);
+
+		range_tree_stat_decr(rt, rs_after);
+
+		rs_after->rs_start = start;
+		rs = rs_after;
+	} else {
+		rs = kmem_cache_alloc(range_seg_cache, KM_PUSHPAGE);
+		rs->rs_start = start;
+		rs->rs_end = end;
+		avl_insert(&rt->rt_root, rs, where);
+	}
+
+	if (rt->rt_ops != NULL)
+		rt->rt_ops->rtop_add(rt, rs, rt->rt_arg);
+
+	range_tree_stat_incr(rt, rs);
+	rt->rt_space += size;
+}
+
+void
+range_tree_remove(void *arg, uint64_t start, uint64_t size)
+{
+	range_tree_t *rt = arg;
+	avl_index_t where;
+	range_seg_t rsearch, *rs, *newseg;
+	uint64_t end = start + size;
+	boolean_t left_over, right_over;
+
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+	VERIFY3U(size, !=, 0);
+	VERIFY3U(size, <=, rt->rt_space);
+
+	rsearch.rs_start = start;
+	rsearch.rs_end = end;
+	rs = avl_find(&rt->rt_root, &rsearch, &where);
+
+	/* Make sure we completely overlap with someone */
+	if (rs == NULL) {
+		zfs_panic_recover("zfs: freeing free segment "
+		    "(offset=%llu size=%llu)",
+		    (longlong_t)start, (longlong_t)size);
+		return;
+	}
+	VERIFY3U(rs->rs_start, <=, start);
+	VERIFY3U(rs->rs_end, >=, end);
+
+	left_over = (rs->rs_start != start);
+	right_over = (rs->rs_end != end);
+
+	range_tree_stat_decr(rt, rs);
+
+	if (rt->rt_ops != NULL)
+		rt->rt_ops->rtop_remove(rt, rs, rt->rt_arg);
+
+	if (left_over && right_over) {
+		newseg = kmem_cache_alloc(range_seg_cache, KM_PUSHPAGE);
+		newseg->rs_start = end;
+		newseg->rs_end = rs->rs_end;
+		range_tree_stat_incr(rt, newseg);
+
+		rs->rs_end = start;
+
+		avl_insert_here(&rt->rt_root, newseg, rs, AVL_AFTER);
+		if (rt->rt_ops != NULL)
+			rt->rt_ops->rtop_add(rt, newseg, rt->rt_arg);
+	} else if (left_over) {
+		rs->rs_end = start;
+	} else if (right_over) {
+		rs->rs_start = end;
+	} else {
+		avl_remove(&rt->rt_root, rs);
+		kmem_cache_free(range_seg_cache, rs);
+		rs = NULL;
+	}
+
+	if (rs != NULL) {
+		range_tree_stat_incr(rt, rs);
+
+		if (rt->rt_ops != NULL)
+			rt->rt_ops->rtop_add(rt, rs, rt->rt_arg);
+	}
+
+	rt->rt_space -= size;
+}
+
+static range_seg_t *
+range_tree_find(range_tree_t *rt, uint64_t start, uint64_t size,
+    avl_index_t *wherep)
+{
+	range_seg_t rsearch, *rs;
+	uint64_t end = start + size;
+
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+	VERIFY(size != 0);
+
+	rsearch.rs_start = start;
+	rsearch.rs_end = end;
+	rs = avl_find(&rt->rt_root, &rsearch, wherep);
+
+	if (rs != NULL && rs->rs_start <= start && rs->rs_end >= end)
+		return (rs);
+	return (NULL);
+}
+
+void
+range_tree_verify(range_tree_t *rt, uint64_t off, uint64_t size)
+{
+	range_seg_t *rs;
+	avl_index_t where;
+
+	mutex_enter(rt->rt_lock);
+	rs = range_tree_find(rt, off, size, &where);
+	if (rs != NULL)
+		panic("freeing free block; rs=%p", (void *)rs);
+	mutex_exit(rt->rt_lock);
+}
+
+boolean_t
+range_tree_contains(range_tree_t *rt, uint64_t start, uint64_t size)
+{
+	avl_index_t where;
+
+	return (range_tree_find(rt, start, size, &where) != NULL);
+}
+
+void
+range_tree_swap(range_tree_t **rtsrc, range_tree_t **rtdst)
+{
+	range_tree_t *rt;
+
+	ASSERT(MUTEX_HELD((*rtsrc)->rt_lock));
+	ASSERT0(range_tree_space(*rtdst));
+	ASSERT0(avl_numnodes(&(*rtdst)->rt_root));
+
+	rt = *rtsrc;
+	*rtsrc = *rtdst;
+	*rtdst = rt;
+}
+
+void
+range_tree_vacate(range_tree_t *rt, range_tree_func_t *func, void *arg)
+{
+	range_seg_t *rs;
+	void *cookie = NULL;
+
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+
+	if (rt->rt_ops != NULL)
+		rt->rt_ops->rtop_vacate(rt, rt->rt_arg);
+
+	while ((rs = avl_destroy_nodes(&rt->rt_root, &cookie)) != NULL) {
+		if (func != NULL)
+			func(arg, rs->rs_start, rs->rs_end - rs->rs_start);
+		kmem_cache_free(range_seg_cache, rs);
+	}
+
+	bzero(rt->rt_histogram, sizeof (rt->rt_histogram));
+	rt->rt_space = 0;
+}
+
+void
+range_tree_walk(range_tree_t *rt, range_tree_func_t *func, void *arg)
+{
+	range_seg_t *rs;
+
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+
+	for (rs = avl_first(&rt->rt_root); rs; rs = AVL_NEXT(&rt->rt_root, rs))
+		func(arg, rs->rs_start, rs->rs_end - rs->rs_start);
+}
+
+uint64_t
+range_tree_space(range_tree_t *rt)
+{
+	return (rt->rt_space);
+}
diff --git a/module/zfs/spa.c b/module/zfs/spa.c
index 0d0499c63..397e9e627 100644
--- a/module/zfs/spa.c
+++ b/module/zfs/spa.c
@@ -1259,6 +1259,15 @@ spa_unload(spa_t *spa)
 
 	bpobj_close(&spa->spa_deferred_bpobj);
 
+	spa_config_enter(spa, SCL_ALL, FTAG, RW_WRITER);
+
+	/*
+	 * Close all vdevs.
+	 */
+	if (spa->spa_root_vdev)
+		vdev_free(spa->spa_root_vdev);
+	ASSERT(spa->spa_root_vdev == NULL);
+
 	/*
 	 * Close the dsl pool.
 	 */
@@ -1270,20 +1279,12 @@ spa_unload(spa_t *spa)
 
 	ddt_unload(spa);
 
-	spa_config_enter(spa, SCL_ALL, FTAG, RW_WRITER);
 
 	/*
 	 * Drop and purge level 2 cache
 	 */
 	spa_l2cache_drop(spa);
 
-	/*
-	 * Close all vdevs.
-	 */
-	if (spa->spa_root_vdev)
-		vdev_free(spa->spa_root_vdev);
-	ASSERT(spa->spa_root_vdev == NULL);
-
 	for (i = 0; i < spa->spa_spares.sav_count; i++)
 		vdev_free(spa->spa_spares.sav_vdevs[i]);
 	if (spa->spa_spares.sav_vdevs) {
@@ -4568,7 +4569,9 @@ spa_vdev_attach(spa_t *spa, uint64_t guid, nvlist_t *nvroot, int replacing)
 	vdev_dirty(tvd, VDD_DTL, newvd, txg);
 
 	/*
-	 * Restart the resilver
+	 * Schedule the resilver to restart in the future. We do this to
+	 * ensure that dmu_sync-ed blocks have been stitched into the
+	 * respective datasets.
 	 */
 	dsl_resilver_restart(spa->spa_dsl_pool, dtl_max_txg);
 
@@ -5193,7 +5196,7 @@ spa_vdev_remove_evacuate(spa_t *spa, vdev_t *vd)
 	ASSERT0(vd->vdev_stat.vs_alloc);
 	txg = spa_vdev_config_enter(spa);
 	vd->vdev_removing = B_TRUE;
-	vdev_dirty(vd, 0, NULL, txg);
+	vdev_dirty_leaves(vd, VDD_DTL, txg);
 	vdev_config_dirty(vd);
 	spa_vdev_config_exit(spa, NULL, txg, 0, FTAG);
 
@@ -5965,7 +5968,7 @@ spa_sync_props(void *arg, dmu_tx_t *tx)
 			ASSERT(zpool_prop_feature(nvpair_name(elem)));
 
 			fname = strchr(nvpair_name(elem), '@') + 1;
-			VERIFY3U(0, ==, zfeature_lookup_name(fname, &feature));
+			VERIFY0(zfeature_lookup_name(fname, &feature));
 
 			spa_feature_enable(spa, feature, tx);
 			spa_history_log_internal(spa, "set", tx,
@@ -5973,7 +5976,7 @@ spa_sync_props(void *arg, dmu_tx_t *tx)
 			break;
 
 		case ZPOOL_PROP_VERSION:
-			VERIFY(nvpair_value_uint64(elem, &intval) == 0);
+			intval = fnvpair_value_uint64(elem);
 			/*
 			 * The version is synced seperatly before other
 			 * properties and should be correct by now.
@@ -5997,7 +6000,7 @@ spa_sync_props(void *arg, dmu_tx_t *tx)
 			 */
 			break;
 		case ZPOOL_PROP_COMMENT:
-			VERIFY(nvpair_value_string(elem, &strval) == 0);
+			strval = fnvpair_value_string(elem);
 			if (spa->spa_comment != NULL)
 				spa_strfree(spa->spa_comment);
 			spa->spa_comment = spa_strdup(strval);
@@ -6029,23 +6032,23 @@ spa_sync_props(void *arg, dmu_tx_t *tx)
 
 			if (nvpair_type(elem) == DATA_TYPE_STRING) {
 				ASSERT(proptype == PROP_TYPE_STRING);
-				VERIFY(nvpair_value_string(elem, &strval) == 0);
-				VERIFY(zap_update(mos,
+				strval = fnvpair_value_string(elem);
+				VERIFY0(zap_update(mos,
 				    spa->spa_pool_props_object, propname,
-				    1, strlen(strval) + 1, strval, tx) == 0);
+				    1, strlen(strval) + 1, strval, tx));
 				spa_history_log_internal(spa, "set", tx,
 				    "%s=%s", nvpair_name(elem), strval);
 			} else if (nvpair_type(elem) == DATA_TYPE_UINT64) {
-				VERIFY(nvpair_value_uint64(elem, &intval) == 0);
+				intval = fnvpair_value_uint64(elem);
 
 				if (proptype == PROP_TYPE_INDEX) {
 					const char *unused;
-					VERIFY(zpool_prop_index_to_string(
-					    prop, intval, &unused) == 0);
+					VERIFY0(zpool_prop_index_to_string(
+					    prop, intval, &unused));
 				}
-				VERIFY(zap_update(mos,
+				VERIFY0(zap_update(mos,
 				    spa->spa_pool_props_object, propname,
-				    8, 1, &intval, tx) == 0);
+				    8, 1, &intval, tx));
 				spa_history_log_internal(spa, "set", tx,
 				    "%s=%lld", nvpair_name(elem), intval);
 			} else {
diff --git a/module/zfs/spa_misc.c b/module/zfs/spa_misc.c
index 935a61796..ef59d2255 100644
--- a/module/zfs/spa_misc.c
+++ b/module/zfs/spa_misc.c
@@ -986,7 +986,7 @@ spa_vdev_config_exit(spa_t *spa, vdev_t *vd, uint64_t txg, int error, char *tag)
 		txg_wait_synced(spa->spa_dsl_pool, txg);
 
 	if (vd != NULL) {
-		ASSERT(!vd->vdev_detached || vd->vdev_dtl_smo.smo_object == 0);
+		ASSERT(!vd->vdev_detached || vd->vdev_dtl_sm == NULL);
 		spa_config_enter(spa, SCL_ALL, spa, RW_WRITER);
 		vdev_free(vd);
 		spa_config_exit(spa, SCL_ALL, spa);
@@ -1655,7 +1655,7 @@ spa_init(int mode)
 	fm_init();
 	refcount_init();
 	unique_init();
-	space_map_init();
+	range_tree_init();
 	ddt_init();
 	zio_init();
 	dmu_init();
@@ -1682,7 +1682,7 @@ spa_fini(void)
 	dmu_fini();
 	zio_fini();
 	ddt_fini();
-	space_map_fini();
+	range_tree_fini();
 	unique_fini();
 	refcount_fini();
 	fm_fini();
diff --git a/module/zfs/space_map.c b/module/zfs/space_map.c
index 2cf1d2a18..bbc926d4d 100644
--- a/module/zfs/space_map.c
+++ b/module/zfs/space_map.c
@@ -23,330 +23,79 @@
  * Use is subject to license terms.
  */
 /*
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  */
 
 #include <sys/zfs_context.h>
 #include <sys/spa.h>
 #include <sys/dmu.h>
+#include <sys/dmu_tx.h>
+#include <sys/dnode.h>
+#include <sys/dsl_pool.h>
 #include <sys/zio.h>
 #include <sys/space_map.h>
-
-static kmem_cache_t *space_seg_cache;
-
-void
-space_map_init(void)
-{
-	ASSERT(space_seg_cache == NULL);
-	space_seg_cache = kmem_cache_create("space_seg_cache",
-	    sizeof (space_seg_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
-}
-
-void
-space_map_fini(void)
-{
-	kmem_cache_destroy(space_seg_cache);
-	space_seg_cache = NULL;
-}
-
-/*
- * Space map routines.
- * NOTE: caller is responsible for all locking.
- */
-static int
-space_map_seg_compare(const void *x1, const void *x2)
-{
-	const space_seg_t *s1 = x1;
-	const space_seg_t *s2 = x2;
-
-	if (s1->ss_start < s2->ss_start) {
-		if (s1->ss_end > s2->ss_start)
-			return (0);
-		return (-1);
-	}
-	if (s1->ss_start > s2->ss_start) {
-		if (s1->ss_start < s2->ss_end)
-			return (0);
-		return (1);
-	}
-	return (0);
-}
-
-void
-space_map_create(space_map_t *sm, uint64_t start, uint64_t size, uint8_t shift,
-	kmutex_t *lp)
-{
-	bzero(sm, sizeof (*sm));
-
-	cv_init(&sm->sm_load_cv, NULL, CV_DEFAULT, NULL);
-
-	avl_create(&sm->sm_root, space_map_seg_compare,
-	    sizeof (space_seg_t), offsetof(struct space_seg, ss_node));
-
-	sm->sm_start = start;
-	sm->sm_size = size;
-	sm->sm_shift = shift;
-	sm->sm_lock = lp;
-}
-
-void
-space_map_destroy(space_map_t *sm)
-{
-	ASSERT(!sm->sm_loaded && !sm->sm_loading);
-	VERIFY0(sm->sm_space);
-	avl_destroy(&sm->sm_root);
-	cv_destroy(&sm->sm_load_cv);
-}
-
-void
-space_map_add(space_map_t *sm, uint64_t start, uint64_t size)
-{
-	avl_index_t where;
-	space_seg_t *ss_before, *ss_after, *ss;
-	uint64_t end = start + size;
-	int merge_before, merge_after;
-
-	ASSERT(MUTEX_HELD(sm->sm_lock));
-	VERIFY(!sm->sm_condensing);
-	VERIFY(size != 0);
-	VERIFY3U(start, >=, sm->sm_start);
-	VERIFY3U(end, <=, sm->sm_start + sm->sm_size);
-	VERIFY(sm->sm_space + size <= sm->sm_size);
-	VERIFY(P2PHASE(start, 1ULL << sm->sm_shift) == 0);
-	VERIFY(P2PHASE(size, 1ULL << sm->sm_shift) == 0);
-
-	ss = space_map_find(sm, start, size, &where);
-	if (ss != NULL) {
-		zfs_panic_recover("zfs: allocating allocated segment"
-		    "(offset=%llu size=%llu)\n",
-		    (longlong_t)start, (longlong_t)size);
-		return;
-	}
-
-	/* Make sure we don't overlap with either of our neighbors */
-	VERIFY(ss == NULL);
-
-	ss_before = avl_nearest(&sm->sm_root, where, AVL_BEFORE);
-	ss_after = avl_nearest(&sm->sm_root, where, AVL_AFTER);
-
-	merge_before = (ss_before != NULL && ss_before->ss_end == start);
-	merge_after = (ss_after != NULL && ss_after->ss_start == end);
-
-	if (merge_before && merge_after) {
-		avl_remove(&sm->sm_root, ss_before);
-		if (sm->sm_pp_root) {
-			avl_remove(sm->sm_pp_root, ss_before);
-			avl_remove(sm->sm_pp_root, ss_after);
-		}
-		ss_after->ss_start = ss_before->ss_start;
-		kmem_cache_free(space_seg_cache, ss_before);
-		ss = ss_after;
-	} else if (merge_before) {
-		ss_before->ss_end = end;
-		if (sm->sm_pp_root)
-			avl_remove(sm->sm_pp_root, ss_before);
-		ss = ss_before;
-	} else if (merge_after) {
-		ss_after->ss_start = start;
-		if (sm->sm_pp_root)
-			avl_remove(sm->sm_pp_root, ss_after);
-		ss = ss_after;
-	} else {
-		ss = kmem_cache_alloc(space_seg_cache, KM_PUSHPAGE);
-		ss->ss_start = start;
-		ss->ss_end = end;
-		avl_insert(&sm->sm_root, ss, where);
-	}
-
-	if (sm->sm_pp_root)
-		avl_add(sm->sm_pp_root, ss);
-
-	sm->sm_space += size;
-}
-
-void
-space_map_remove(space_map_t *sm, uint64_t start, uint64_t size)
-{
-	avl_index_t where;
-	space_seg_t *ss, *newseg;
-	uint64_t end = start + size;
-	int left_over, right_over;
-
-	VERIFY(!sm->sm_condensing);
-	ss = space_map_find(sm, start, size, &where);
-
-	/* Make sure we completely overlap with someone */
-	if (ss == NULL) {
-		zfs_panic_recover("zfs: freeing free segment "
-		    "(offset=%llu size=%llu)",
-		    (longlong_t)start, (longlong_t)size);
-		return;
-	}
-	VERIFY3U(ss->ss_start, <=, start);
-	VERIFY3U(ss->ss_end, >=, end);
-	VERIFY(sm->sm_space - size <= sm->sm_size);
-
-	left_over = (ss->ss_start != start);
-	right_over = (ss->ss_end != end);
-
-	if (sm->sm_pp_root)
-		avl_remove(sm->sm_pp_root, ss);
-
-	if (left_over && right_over) {
-		newseg = kmem_cache_alloc(space_seg_cache, KM_PUSHPAGE);
-		newseg->ss_start = end;
-		newseg->ss_end = ss->ss_end;
-		ss->ss_end = start;
-		avl_insert_here(&sm->sm_root, newseg, ss, AVL_AFTER);
-		if (sm->sm_pp_root)
-			avl_add(sm->sm_pp_root, newseg);
-	} else if (left_over) {
-		ss->ss_end = start;
-	} else if (right_over) {
-		ss->ss_start = end;
-	} else {
-		avl_remove(&sm->sm_root, ss);
-		kmem_cache_free(space_seg_cache, ss);
-		ss = NULL;
-	}
-
-	if (sm->sm_pp_root && ss != NULL)
-		avl_add(sm->sm_pp_root, ss);
-
-	sm->sm_space -= size;
-}
-
-space_seg_t *
-space_map_find(space_map_t *sm, uint64_t start, uint64_t size,
-    avl_index_t *wherep)
-{
-	space_seg_t ssearch, *ss;
-
-	ASSERT(MUTEX_HELD(sm->sm_lock));
-	VERIFY(size != 0);
-	VERIFY(P2PHASE(start, 1ULL << sm->sm_shift) == 0);
-	VERIFY(P2PHASE(size, 1ULL << sm->sm_shift) == 0);
-
-	ssearch.ss_start = start;
-	ssearch.ss_end = start + size;
-	ss = avl_find(&sm->sm_root, &ssearch, wherep);
-
-	if (ss != NULL && ss->ss_start <= start && ss->ss_end >= start + size)
-		return (ss);
-	return (NULL);
-}
-
-boolean_t
-space_map_contains(space_map_t *sm, uint64_t start, uint64_t size)
-{
-	avl_index_t where;
-
-	return (space_map_find(sm, start, size, &where) != 0);
-}
-
-void
-space_map_swap(space_map_t **msrc, space_map_t **mdst)
-{
-	space_map_t *sm;
-
-	ASSERT(MUTEX_HELD((*msrc)->sm_lock));
-	ASSERT0((*mdst)->sm_space);
-	ASSERT0(avl_numnodes(&(*mdst)->sm_root));
-
-	sm = *msrc;
-	*msrc = *mdst;
-	*mdst = sm;
-}
-
-void
-space_map_vacate(space_map_t *sm, space_map_func_t *func, space_map_t *mdest)
-{
-	space_seg_t *ss;
-	void *cookie = NULL;
-
-	ASSERT(MUTEX_HELD(sm->sm_lock));
-
-	while ((ss = avl_destroy_nodes(&sm->sm_root, &cookie)) != NULL) {
-		if (func != NULL)
-			func(mdest, ss->ss_start, ss->ss_end - ss->ss_start);
-		kmem_cache_free(space_seg_cache, ss);
-	}
-	sm->sm_space = 0;
-}
-
-void
-space_map_walk(space_map_t *sm, space_map_func_t *func, space_map_t *mdest)
-{
-	space_seg_t *ss;
-
-	ASSERT(MUTEX_HELD(sm->sm_lock));
-
-	for (ss = avl_first(&sm->sm_root); ss; ss = AVL_NEXT(&sm->sm_root, ss))
-		func(mdest, ss->ss_start, ss->ss_end - ss->ss_start);
-}
+#include <sys/refcount.h>
+#include <sys/zfeature.h>
 
 /*
- * Wait for any in-progress space_map_load() to complete.
+ * This value controls how the space map's block size is allowed to grow.
+ * If the value is set to the same size as SPACE_MAP_INITIAL_BLOCKSIZE then
+ * the space map block size will remain fixed. Setting this value to something
+ * greater than SPACE_MAP_INITIAL_BLOCKSIZE will allow the space map to
+ * increase its block size as needed. To maintain backwards compatibilty the
+ * space map's block size must be a power of 2 and SPACE_MAP_INITIAL_BLOCKSIZE
+ * or larger.
  */
-void
-space_map_load_wait(space_map_t *sm)
-{
-	ASSERT(MUTEX_HELD(sm->sm_lock));
-
-	while (sm->sm_loading) {
-		ASSERT(!sm->sm_loaded);
-		cv_wait(&sm->sm_load_cv, sm->sm_lock);
-	}
-}
+int space_map_max_blksz = (1 << 12);
 
 /*
+ * Load the space map disk into the specified range tree. Segments of maptype
+ * are added to the range tree, other segment types are removed.
+ *
  * Note: space_map_load() will drop sm_lock across dmu_read() calls.
  * The caller must be OK with this.
  */
 int
-space_map_load(space_map_t *sm, space_map_ops_t *ops, uint8_t maptype,
-	space_map_obj_t *smo, objset_t *os)
+space_map_load(space_map_t *sm, range_tree_t *rt, maptype_t maptype)
 {
 	uint64_t *entry, *entry_map, *entry_map_end;
 	uint64_t bufsize, size, offset, end, space;
-	uint64_t mapstart = sm->sm_start;
 	int error = 0;
 
 	ASSERT(MUTEX_HELD(sm->sm_lock));
-	ASSERT(!sm->sm_loaded);
-	ASSERT(!sm->sm_loading);
 
-	sm->sm_loading = B_TRUE;
-	end = smo->smo_objsize;
-	space = smo->smo_alloc;
+	end = space_map_length(sm);
+	space = space_map_allocated(sm);
 
-	ASSERT(sm->sm_ops == NULL);
-	VERIFY0(sm->sm_space);
+	VERIFY0(range_tree_space(rt));
 
 	if (maptype == SM_FREE) {
-		space_map_add(sm, sm->sm_start, sm->sm_size);
+		range_tree_add(rt, sm->sm_start, sm->sm_size);
 		space = sm->sm_size - space;
 	}
 
-	bufsize = 1ULL << SPACE_MAP_BLOCKSHIFT;
+	bufsize = MAX(sm->sm_blksz, SPA_MINBLOCKSIZE);
 	entry_map = zio_buf_alloc(bufsize);
 
 	mutex_exit(sm->sm_lock);
-	if (end > bufsize)
-		dmu_prefetch(os, smo->smo_object, bufsize, end - bufsize);
+	if (end > bufsize) {
+		dmu_prefetch(sm->sm_os, space_map_object(sm), bufsize,
+		    end - bufsize);
+	}
 	mutex_enter(sm->sm_lock);
 
 	for (offset = 0; offset < end; offset += bufsize) {
 		size = MIN(end - offset, bufsize);
 		VERIFY(P2PHASE(size, sizeof (uint64_t)) == 0);
 		VERIFY(size != 0);
+		ASSERT3U(sm->sm_blksz, !=, 0);
 
 		dprintf("object=%llu  offset=%llx  size=%llx\n",
-		    smo->smo_object, offset, size);
+		    space_map_object(sm), offset, size);
 
 		mutex_exit(sm->sm_lock);
-		error = dmu_read(os, smo->smo_object, offset, size, entry_map,
-		    DMU_READ_PREFETCH);
+		error = dmu_read(sm->sm_os, space_map_object(sm), offset, size,
+		    entry_map, DMU_READ_PREFETCH);
 		mutex_enter(sm->sm_lock);
 		if (error != 0)
 			break;
@@ -354,115 +103,239 @@ space_map_load(space_map_t *sm, space_map_ops_t *ops, uint8_t maptype,
 		entry_map_end = entry_map + (size / sizeof (uint64_t));
 		for (entry = entry_map; entry < entry_map_end; entry++) {
 			uint64_t e = *entry;
+			uint64_t offset, size;
 
 			if (SM_DEBUG_DECODE(e))		/* Skip debug entries */
 				continue;
 
-			(SM_TYPE_DECODE(e) == maptype ?
-			    space_map_add : space_map_remove)(sm,
-			    (SM_OFFSET_DECODE(e) << sm->sm_shift) + mapstart,
-			    SM_RUN_DECODE(e) << sm->sm_shift);
+			offset = (SM_OFFSET_DECODE(e) << sm->sm_shift) +
+			    sm->sm_start;
+			size = SM_RUN_DECODE(e) << sm->sm_shift;
+
+			VERIFY0(P2PHASE(offset, 1ULL << sm->sm_shift));
+			VERIFY0(P2PHASE(size, 1ULL << sm->sm_shift));
+			VERIFY3U(offset, >=, sm->sm_start);
+			VERIFY3U(offset + size, <=, sm->sm_start + sm->sm_size);
+			if (SM_TYPE_DECODE(e) == maptype) {
+				VERIFY3U(range_tree_space(rt) + size, <=,
+				    sm->sm_size);
+				range_tree_add(rt, offset, size);
+			} else {
+				range_tree_remove(rt, offset, size);
+			}
 		}
 	}
 
-	if (error == 0) {
-		VERIFY3U(sm->sm_space, ==, space);
-
-		sm->sm_loaded = B_TRUE;
-		sm->sm_ops = ops;
-		if (ops != NULL)
-			ops->smop_load(sm);
-	} else {
-		space_map_vacate(sm, NULL, NULL);
-	}
+	if (error == 0)
+		VERIFY3U(range_tree_space(rt), ==, space);
+	else
+		range_tree_vacate(rt, NULL, NULL);
 
 	zio_buf_free(entry_map, bufsize);
-
-	sm->sm_loading = B_FALSE;
-
-	cv_broadcast(&sm->sm_load_cv);
-
 	return (error);
 }
 
 void
-space_map_unload(space_map_t *sm)
+space_map_histogram_clear(space_map_t *sm)
 {
-	ASSERT(MUTEX_HELD(sm->sm_lock));
+	if (sm->sm_dbuf->db_size != sizeof (space_map_phys_t))
+		return;
 
-	if (sm->sm_loaded && sm->sm_ops != NULL)
-		sm->sm_ops->smop_unload(sm);
+	bzero(sm->sm_phys->smp_histogram, sizeof (sm->sm_phys->smp_histogram));
+}
 
-	sm->sm_loaded = B_FALSE;
-	sm->sm_ops = NULL;
+boolean_t
+space_map_histogram_verify(space_map_t *sm, range_tree_t *rt)
+{
+	int i;
 
-	space_map_vacate(sm, NULL, NULL);
+	/*
+	 * Verify that the in-core range tree does not have any
+	 * ranges smaller than our sm_shift size.
+	 */
+	for (i = 0; i < sm->sm_shift; i++) {
+		if (rt->rt_histogram[i] != 0)
+			return (B_FALSE);
+	}
+	return (B_TRUE);
 }
 
-uint64_t
-space_map_maxsize(space_map_t *sm)
+void
+space_map_histogram_add(space_map_t *sm, range_tree_t *rt, dmu_tx_t *tx)
 {
-	ASSERT(sm->sm_ops != NULL);
-	return (sm->sm_ops->smop_max(sm));
+	int idx = 0;
+	int i;
+
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+	ASSERT(dmu_tx_is_syncing(tx));
+	VERIFY3U(space_map_object(sm), !=, 0);
+
+	if (sm->sm_dbuf->db_size != sizeof (space_map_phys_t))
+		return;
+
+	dmu_buf_will_dirty(sm->sm_dbuf, tx);
+
+	ASSERT(space_map_histogram_verify(sm, rt));
+
+	/*
+	 * Transfer the content of the range tree histogram to the space
+	 * map histogram. The space map histogram contains 32 buckets ranging
+	 * between 2^sm_shift to 2^(32+sm_shift-1). The range tree,
+	 * however, can represent ranges from 2^0 to 2^63. Since the space
+	 * map only cares about allocatable blocks (minimum of sm_shift) we
+	 * can safely ignore all ranges in the range tree smaller than sm_shift.
+	 */
+	for (i = sm->sm_shift; i < RANGE_TREE_HISTOGRAM_SIZE; i++) {
+
+		/*
+		 * Since the largest histogram bucket in the space map is
+		 * 2^(32+sm_shift-1), we need to normalize the values in
+		 * the range tree for any bucket larger than that size. For
+		 * example given an sm_shift of 9, ranges larger than 2^40
+		 * would get normalized as if they were 1TB ranges. Assume
+		 * the range tree had a count of 5 in the 2^44 (16TB) bucket,
+		 * the calculation below would normalize this to 5 * 2^4 (16).
+		 */
+		ASSERT3U(i, >=, idx + sm->sm_shift);
+		sm->sm_phys->smp_histogram[idx] +=
+		    rt->rt_histogram[i] << (i - idx - sm->sm_shift);
+
+		/*
+		 * Increment the space map's index as long as we haven't
+		 * reached the maximum bucket size. Accumulate all ranges
+		 * larger than the max bucket size into the last bucket.
+		 */
+		if (idx < SPACE_MAP_HISTOGRAM_SIZE(sm) - 1) {
+			ASSERT3U(idx + sm->sm_shift, ==, i);
+			idx++;
+			ASSERT3U(idx, <, SPACE_MAP_HISTOGRAM_SIZE(sm));
+		}
+	}
 }
 
 uint64_t
-space_map_alloc(space_map_t *sm, uint64_t size)
+space_map_entries(space_map_t *sm, range_tree_t *rt)
 {
-	uint64_t start;
+	avl_tree_t *t = &rt->rt_root;
+	range_seg_t *rs;
+	uint64_t size, entries;
 
-	start = sm->sm_ops->smop_alloc(sm, size);
-	if (start != -1ULL)
-		space_map_remove(sm, start, size);
-	return (start);
-}
+	/*
+	 * All space_maps always have a debug entry so account for it here.
+	 */
+	entries = 1;
 
-void
-space_map_claim(space_map_t *sm, uint64_t start, uint64_t size)
-{
-	sm->sm_ops->smop_claim(sm, start, size);
-	space_map_remove(sm, start, size);
+	/*
+	 * Traverse the range tree and calculate the number of space map
+	 * entries that would be required to write out the range tree.
+	 */
+	for (rs = avl_first(t); rs != NULL; rs = AVL_NEXT(t, rs)) {
+		size = (rs->rs_end - rs->rs_start) >> sm->sm_shift;
+		entries += howmany(size, SM_RUN_MAX);
+	}
+	return (entries);
 }
 
 void
-space_map_free(space_map_t *sm, uint64_t start, uint64_t size)
+space_map_set_blocksize(space_map_t *sm, uint64_t size, dmu_tx_t *tx)
 {
-	space_map_add(sm, start, size);
-	sm->sm_ops->smop_free(sm, start, size);
+	uint32_t blksz;
+	u_longlong_t blocks;
+
+	ASSERT3U(sm->sm_blksz, !=, 0);
+	ASSERT3U(space_map_object(sm), !=, 0);
+	ASSERT(sm->sm_dbuf != NULL);
+	VERIFY(ISP2(space_map_max_blksz));
+
+	if (sm->sm_blksz >= space_map_max_blksz)
+		return;
+
+	/*
+	 * The object contains more than one block so we can't adjust
+	 * its size.
+	 */
+	if (sm->sm_phys->smp_objsize > sm->sm_blksz)
+		return;
+
+	if (size > sm->sm_blksz) {
+		uint64_t newsz;
+
+		/*
+		 * Older software versions treat space map blocks as fixed
+		 * entities. The DMU is capable of handling different block
+		 * sizes making it possible for us to increase the
+		 * block size and maintain backwards compatibility. The
+		 * caveat is that the new block sizes must be a
+		 * power of 2 so that old software can append to the file,
+		 * adding more blocks. The block size can grow until it
+		 * reaches space_map_max_blksz.
+		 */
+		newsz = ISP2(size) ? size : 1ULL << highbit(size);
+		if (newsz > space_map_max_blksz)
+			newsz = space_map_max_blksz;
+
+		VERIFY0(dmu_object_set_blocksize(sm->sm_os,
+		    space_map_object(sm), newsz, 0, tx));
+		dmu_object_size_from_db(sm->sm_dbuf, &blksz, &blocks);
+
+		zfs_dbgmsg("txg %llu, spa %s, increasing blksz from %d to %d",
+		    dmu_tx_get_txg(tx), spa_name(dmu_objset_spa(sm->sm_os)),
+		    sm->sm_blksz, blksz);
+
+		VERIFY3U(newsz, ==, blksz);
+		VERIFY3U(sm->sm_blksz, <, blksz);
+		sm->sm_blksz = blksz;
+	}
 }
 
 /*
- * Note: space_map_sync() will drop sm_lock across dmu_write() calls.
+ * Note: space_map_write() will drop sm_lock across dmu_write() calls.
  */
 void
-space_map_sync(space_map_t *sm, uint8_t maptype,
-	space_map_obj_t *smo, objset_t *os, dmu_tx_t *tx)
+space_map_write(space_map_t *sm, range_tree_t *rt, maptype_t maptype,
+    dmu_tx_t *tx)
 {
+	objset_t *os = sm->sm_os;
 	spa_t *spa = dmu_objset_spa(os);
-	avl_tree_t *t = &sm->sm_root;
-	space_seg_t *ss;
-	uint64_t bufsize, start, size, run_len, total, sm_space, nodes;
+	avl_tree_t *t = &rt->rt_root;
+	range_seg_t *rs;
+	uint64_t size, total, rt_space, nodes;
 	uint64_t *entry, *entry_map, *entry_map_end;
+	uint64_t newsz, expected_entries, actual_entries = 1;
 
-	ASSERT(MUTEX_HELD(sm->sm_lock));
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+	ASSERT(dsl_pool_sync_context(dmu_objset_pool(os)));
+	VERIFY3U(space_map_object(sm), !=, 0);
+	dmu_buf_will_dirty(sm->sm_dbuf, tx);
 
-	if (sm->sm_space == 0)
-		return;
+	/*
+	 * This field is no longer necessary since the in-core space map
+	 * now contains the object number but is maintained for backwards
+	 * compatibility.
+	 */
+	sm->sm_phys->smp_object = sm->sm_object;
 
-	dprintf("object %4llu, txg %llu, pass %d, %c, count %lu, space %llx\n",
-	    smo->smo_object, dmu_tx_get_txg(tx), spa_sync_pass(spa),
-	    maptype == SM_ALLOC ? 'A' : 'F', avl_numnodes(&sm->sm_root),
-	    sm->sm_space);
+	if (range_tree_space(rt) == 0) {
+		VERIFY3U(sm->sm_object, ==, sm->sm_phys->smp_object);
+		return;
+	}
 
 	if (maptype == SM_ALLOC)
-		smo->smo_alloc += sm->sm_space;
+		sm->sm_phys->smp_alloc += range_tree_space(rt);
 	else
-		smo->smo_alloc -= sm->sm_space;
+		sm->sm_phys->smp_alloc -= range_tree_space(rt);
 
-	bufsize = (8 + avl_numnodes(&sm->sm_root)) * sizeof (uint64_t);
-	bufsize = MIN(bufsize, 1ULL << SPACE_MAP_BLOCKSHIFT);
-	entry_map = zio_buf_alloc(bufsize);
-	entry_map_end = entry_map + (bufsize / sizeof (uint64_t));
+	expected_entries = space_map_entries(sm, rt);
+
+	/*
+	 * Calculate the new size for the space map on-disk and see if
+	 * we can grow the block size to accommodate the new size.
+	 */
+	newsz = sm->sm_phys->smp_objsize + expected_entries * sizeof (uint64_t);
+	space_map_set_blocksize(sm, newsz, tx);
+
+	entry_map = zio_buf_alloc(sm->sm_blksz);
+	entry_map_end = entry_map + (sm->sm_blksz / sizeof (uint64_t));
 	entry = entry_map;
 
 	*entry++ = SM_DEBUG_ENCODE(1) |
@@ -471,24 +344,28 @@ space_map_sync(space_map_t *sm, uint8_t maptype,
 	    SM_DEBUG_TXG_ENCODE(dmu_tx_get_txg(tx));
 
 	total = 0;
-	nodes = avl_numnodes(&sm->sm_root);
-	sm_space = sm->sm_space;
-	for (ss = avl_first(t); ss != NULL; ss = AVL_NEXT(t, ss)) {
-		size = ss->ss_end - ss->ss_start;
-		start = (ss->ss_start - sm->sm_start) >> sm->sm_shift;
+	nodes = avl_numnodes(&rt->rt_root);
+	rt_space = range_tree_space(rt);
+	for (rs = avl_first(t); rs != NULL; rs = AVL_NEXT(t, rs)) {
+		uint64_t start;
+
+		size = (rs->rs_end - rs->rs_start) >> sm->sm_shift;
+		start = (rs->rs_start - sm->sm_start) >> sm->sm_shift;
 
-		total += size;
-		size >>= sm->sm_shift;
+		total += size << sm->sm_shift;
+
+		while (size != 0) {
+			uint64_t run_len;
 
-		while (size) {
 			run_len = MIN(size, SM_RUN_MAX);
 
 			if (entry == entry_map_end) {
-				mutex_exit(sm->sm_lock);
-				dmu_write(os, smo->smo_object, smo->smo_objsize,
-				    bufsize, entry_map, tx);
-				mutex_enter(sm->sm_lock);
-				smo->smo_objsize += bufsize;
+				mutex_exit(rt->rt_lock);
+				dmu_write(os, space_map_object(sm),
+				    sm->sm_phys->smp_objsize, sm->sm_blksz,
+				    entry_map, tx);
+				mutex_enter(rt->rt_lock);
+				sm->sm_phys->smp_objsize += sm->sm_blksz;
 				entry = entry_map;
 			}
 
@@ -498,162 +375,241 @@ space_map_sync(space_map_t *sm, uint8_t maptype,
 
 			start += run_len;
 			size -= run_len;
+			actual_entries++;
 		}
 	}
 
 	if (entry != entry_map) {
 		size = (entry - entry_map) * sizeof (uint64_t);
-		mutex_exit(sm->sm_lock);
-		dmu_write(os, smo->smo_object, smo->smo_objsize,
+		mutex_exit(rt->rt_lock);
+		dmu_write(os, space_map_object(sm), sm->sm_phys->smp_objsize,
 		    size, entry_map, tx);
-		mutex_enter(sm->sm_lock);
-		smo->smo_objsize += size;
+		mutex_enter(rt->rt_lock);
+		sm->sm_phys->smp_objsize += size;
 	}
+	ASSERT3U(expected_entries, ==, actual_entries);
 
 	/*
 	 * Ensure that the space_map's accounting wasn't changed
 	 * while we were in the middle of writing it out.
 	 */
-	VERIFY3U(nodes, ==, avl_numnodes(&sm->sm_root));
-	VERIFY3U(sm->sm_space, ==, sm_space);
-	VERIFY3U(sm->sm_space, ==, total);
+	VERIFY3U(nodes, ==, avl_numnodes(&rt->rt_root));
+	VERIFY3U(range_tree_space(rt), ==, rt_space);
+	VERIFY3U(range_tree_space(rt), ==, total);
 
-	zio_buf_free(entry_map, bufsize);
+	zio_buf_free(entry_map, sm->sm_blksz);
 }
 
-void
-space_map_truncate(space_map_obj_t *smo, objset_t *os, dmu_tx_t *tx)
+static int
+space_map_open_impl(space_map_t *sm)
 {
-	VERIFY(dmu_free_range(os, smo->smo_object, 0, -1ULL, tx) == 0);
+	int error;
+	u_longlong_t blocks;
+
+	error = dmu_bonus_hold(sm->sm_os, sm->sm_object, sm, &sm->sm_dbuf);
+	if (error)
+		return (error);
 
-	smo->smo_objsize = 0;
-	smo->smo_alloc = 0;
+	dmu_object_size_from_db(sm->sm_dbuf, &sm->sm_blksz, &blocks);
+	sm->sm_phys = sm->sm_dbuf->db_data;
+	return (0);
 }
 
-/*
- * Space map reference trees.
- *
- * A space map is a collection of integers.  Every integer is either
- * in the map, or it's not.  A space map reference tree generalizes
- * the idea: it allows its members to have arbitrary reference counts,
- * as opposed to the implicit reference count of 0 or 1 in a space map.
- * This representation comes in handy when computing the union or
- * intersection of multiple space maps.  For example, the union of
- * N space maps is the subset of the reference tree with refcnt >= 1.
- * The intersection of N space maps is the subset with refcnt >= N.
- *
- * [It's very much like a Fourier transform.  Unions and intersections
- * are hard to perform in the 'space map domain', so we convert the maps
- * into the 'reference count domain', where it's trivial, then invert.]
- *
- * vdev_dtl_reassess() uses computations of this form to determine
- * DTL_MISSING and DTL_OUTAGE for interior vdevs -- e.g. a RAID-Z vdev
- * has an outage wherever refcnt >= vdev_nparity + 1, and a mirror vdev
- * has an outage wherever refcnt >= vdev_children.
- */
-static int
-space_map_ref_compare(const void *x1, const void *x2)
+int
+space_map_open(space_map_t **smp, objset_t *os, uint64_t object,
+    uint64_t start, uint64_t size, uint8_t shift, kmutex_t *lp)
 {
-	const space_ref_t *sr1 = x1;
-	const space_ref_t *sr2 = x2;
+	space_map_t *sm;
+	int error;
 
-	if (sr1->sr_offset < sr2->sr_offset)
-		return (-1);
-	if (sr1->sr_offset > sr2->sr_offset)
-		return (1);
+	ASSERT(*smp == NULL);
+	ASSERT(os != NULL);
+	ASSERT(object != 0);
 
-	if (sr1 < sr2)
-		return (-1);
-	if (sr1 > sr2)
-		return (1);
+	sm = kmem_alloc(sizeof (space_map_t), KM_PUSHPAGE);
 
-	return (0);
-}
+	sm->sm_start = start;
+	sm->sm_size = size;
+	sm->sm_shift = shift;
+	sm->sm_lock = lp;
+	sm->sm_os = os;
+	sm->sm_object = object;
+	sm->sm_length = 0;
+	sm->sm_alloc = 0;
+	sm->sm_blksz = 0;
+	sm->sm_dbuf = NULL;
+	sm->sm_phys = NULL;
+
+	error = space_map_open_impl(sm);
+	if (error != 0) {
+		space_map_close(sm);
+		return (error);
+	}
 
-void
-space_map_ref_create(avl_tree_t *t)
-{
-	avl_create(t, space_map_ref_compare,
-	    sizeof (space_ref_t), offsetof(space_ref_t, sr_node));
+	*smp = sm;
+
+	return (0);
 }
 
 void
-space_map_ref_destroy(avl_tree_t *t)
+space_map_close(space_map_t *sm)
 {
-	space_ref_t *sr;
-	void *cookie = NULL;
+	if (sm == NULL)
+		return;
 
-	while ((sr = avl_destroy_nodes(t, &cookie)) != NULL)
-		kmem_free(sr, sizeof (*sr));
+	if (sm->sm_dbuf != NULL)
+		dmu_buf_rele(sm->sm_dbuf, sm);
+	sm->sm_dbuf = NULL;
+	sm->sm_phys = NULL;
 
-	avl_destroy(t);
+	kmem_free(sm, sizeof (*sm));
 }
 
 static void
-space_map_ref_add_node(avl_tree_t *t, uint64_t offset, int64_t refcnt)
+space_map_reallocate(space_map_t *sm, dmu_tx_t *tx)
 {
-	space_ref_t *sr;
+	ASSERT(dmu_tx_is_syncing(tx));
 
-	sr = kmem_alloc(sizeof (*sr), KM_PUSHPAGE);
-	sr->sr_offset = offset;
-	sr->sr_refcnt = refcnt;
+	space_map_free(sm, tx);
+	dmu_buf_rele(sm->sm_dbuf, sm);
 
-	avl_add(t, sr);
+	sm->sm_object = space_map_alloc(sm->sm_os, tx);
+	VERIFY0(space_map_open_impl(sm));
 }
 
 void
-space_map_ref_add_seg(avl_tree_t *t, uint64_t start, uint64_t end,
-	int64_t refcnt)
+space_map_truncate(space_map_t *sm, dmu_tx_t *tx)
 {
-	space_map_ref_add_node(t, start, refcnt);
-	space_map_ref_add_node(t, end, -refcnt);
+	objset_t *os = sm->sm_os;
+	spa_t *spa = dmu_objset_spa(os);
+	zfeature_info_t *space_map_histogram =
+	    &spa_feature_table[SPA_FEATURE_SPACEMAP_HISTOGRAM];
+	dmu_object_info_t doi;
+	int bonuslen;
+
+	ASSERT(dsl_pool_sync_context(dmu_objset_pool(os)));
+	ASSERT(dmu_tx_is_syncing(tx));
+
+	VERIFY0(dmu_free_range(os, space_map_object(sm), 0, -1ULL, tx));
+	dmu_object_info_from_db(sm->sm_dbuf, &doi);
+
+	if (spa_feature_is_enabled(spa, space_map_histogram)) {
+		bonuslen = sizeof (space_map_phys_t);
+		ASSERT3U(bonuslen, <=, dmu_bonus_max());
+	} else {
+		bonuslen = SPACE_MAP_SIZE_V0;
+	}
+
+	if (bonuslen != doi.doi_bonus_size ||
+	    doi.doi_data_block_size != SPACE_MAP_INITIAL_BLOCKSIZE) {
+		zfs_dbgmsg("txg %llu, spa %s, reallocating: "
+		    "old bonus %u, old blocksz %u", dmu_tx_get_txg(tx),
+		    spa_name(spa), doi.doi_bonus_size, doi.doi_data_block_size);
+		space_map_reallocate(sm, tx);
+		VERIFY3U(sm->sm_blksz, ==, SPACE_MAP_INITIAL_BLOCKSIZE);
+	}
+
+	dmu_buf_will_dirty(sm->sm_dbuf, tx);
+	sm->sm_phys->smp_objsize = 0;
+	sm->sm_phys->smp_alloc = 0;
 }
 
 /*
- * Convert (or add) a space map into a reference tree.
+ * Update the in-core space_map allocation and length values.
  */
 void
-space_map_ref_add_map(avl_tree_t *t, space_map_t *sm, int64_t refcnt)
+space_map_update(space_map_t *sm)
 {
-	space_seg_t *ss;
+	if (sm == NULL)
+		return;
 
 	ASSERT(MUTEX_HELD(sm->sm_lock));
 
-	for (ss = avl_first(&sm->sm_root); ss; ss = AVL_NEXT(&sm->sm_root, ss))
-		space_map_ref_add_seg(t, ss->ss_start, ss->ss_end, refcnt);
+	sm->sm_alloc = sm->sm_phys->smp_alloc;
+	sm->sm_length = sm->sm_phys->smp_objsize;
+}
+
+uint64_t
+space_map_alloc(objset_t *os, dmu_tx_t *tx)
+{
+	spa_t *spa = dmu_objset_spa(os);
+	zfeature_info_t *space_map_histogram =
+	    &spa_feature_table[SPA_FEATURE_SPACEMAP_HISTOGRAM];
+	uint64_t object;
+	int bonuslen;
+
+	if (spa_feature_is_enabled(spa, space_map_histogram)) {
+		spa_feature_incr(spa, space_map_histogram, tx);
+		bonuslen = sizeof (space_map_phys_t);
+		ASSERT3U(bonuslen, <=, dmu_bonus_max());
+	} else {
+		bonuslen = SPACE_MAP_SIZE_V0;
+	}
+
+	object = dmu_object_alloc(os,
+	    DMU_OT_SPACE_MAP, SPACE_MAP_INITIAL_BLOCKSIZE,
+	    DMU_OT_SPACE_MAP_HEADER, bonuslen, tx);
+
+	return (object);
 }
 
-/*
- * Convert a reference tree into a space map.  The space map will contain
- * all members of the reference tree for which refcnt >= minref.
- */
 void
-space_map_ref_generate_map(avl_tree_t *t, space_map_t *sm, int64_t minref)
+space_map_free(space_map_t *sm, dmu_tx_t *tx)
 {
-	uint64_t start = -1ULL;
-	int64_t refcnt = 0;
-	space_ref_t *sr;
+	spa_t *spa;
+	zfeature_info_t *space_map_histogram =
+	    &spa_feature_table[SPA_FEATURE_SPACEMAP_HISTOGRAM];
 
-	ASSERT(MUTEX_HELD(sm->sm_lock));
+	if (sm == NULL)
+		return;
 
-	space_map_vacate(sm, NULL, NULL);
+	spa = dmu_objset_spa(sm->sm_os);
+	if (spa_feature_is_enabled(spa, space_map_histogram)) {
+		dmu_object_info_t doi;
 
-	for (sr = avl_first(t); sr != NULL; sr = AVL_NEXT(t, sr)) {
-		refcnt += sr->sr_refcnt;
-		if (refcnt >= minref) {
-			if (start == -1ULL) {
-				start = sr->sr_offset;
-			}
-		} else {
-			if (start != -1ULL) {
-				uint64_t end = sr->sr_offset;
-				ASSERT(start <= end);
-				if (end > start)
-					space_map_add(sm, start, end - start);
-				start = -1ULL;
-			}
+		dmu_object_info_from_db(sm->sm_dbuf, &doi);
+		if (doi.doi_bonus_size != SPACE_MAP_SIZE_V0) {
+			VERIFY(spa_feature_is_active(spa, space_map_histogram));
+			spa_feature_decr(spa, space_map_histogram, tx);
 		}
 	}
-	ASSERT(refcnt == 0);
-	ASSERT(start == -1ULL);
+
+	VERIFY3U(dmu_object_free(sm->sm_os, space_map_object(sm), tx), ==, 0);
+	sm->sm_object = 0;
+}
+
+uint64_t
+space_map_object(space_map_t *sm)
+{
+	return (sm != NULL ? sm->sm_object : 0);
+}
+
+/*
+ * Returns the already synced, on-disk allocated space.
+ */
+uint64_t
+space_map_allocated(space_map_t *sm)
+{
+	return (sm != NULL ? sm->sm_alloc : 0);
+}
+
+/*
+ * Returns the already synced, on-disk length;
+ */
+uint64_t
+space_map_length(space_map_t *sm)
+{
+	return (sm != NULL ? sm->sm_length : 0);
+}
+
+/*
+ * Returns the allocated space that is currently syncing.
+ */
+int64_t
+space_map_alloc_delta(space_map_t *sm)
+{
+	if (sm == NULL)
+		return (0);
+	ASSERT(sm->sm_dbuf != NULL);
+	return (sm->sm_phys->smp_alloc - space_map_allocated(sm));
 }
diff --git a/module/zfs/space_reftree.c b/module/zfs/space_reftree.c
new file mode 100644
index 000000000..d20281e16
--- /dev/null
+++ b/module/zfs/space_reftree.c
@@ -0,0 +1,159 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License (the "License").
+ * You may not use this file except in compliance with the License.
+ *
+ * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
+ * or http://www.opensolaris.org/os/licensing.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information: Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ */
+/*
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
+ * Use is subject to license terms.
+ */
+/*
+ * Copyright (c) 2013 by Delphix. All rights reserved.
+ */
+
+#include <sys/zfs_context.h>
+#include <sys/range_tree.h>
+#include <sys/space_reftree.h>
+
+/*
+ * Space reference trees.
+ *
+ * A range tree is a collection of integers.  Every integer is either
+ * in the tree, or it's not.  A space reference tree generalizes
+ * the idea: it allows its members to have arbitrary reference counts,
+ * as opposed to the implicit reference count of 0 or 1 in a range tree.
+ * This representation comes in handy when computing the union or
+ * intersection of multiple space maps.  For example, the union of
+ * N range trees is the subset of the reference tree with refcnt >= 1.
+ * The intersection of N range trees is the subset with refcnt >= N.
+ *
+ * [It's very much like a Fourier transform.  Unions and intersections
+ * are hard to perform in the 'range tree domain', so we convert the trees
+ * into the 'reference count domain', where it's trivial, then invert.]
+ *
+ * vdev_dtl_reassess() uses computations of this form to determine
+ * DTL_MISSING and DTL_OUTAGE for interior vdevs -- e.g. a RAID-Z vdev
+ * has an outage wherever refcnt >= vdev_nparity + 1, and a mirror vdev
+ * has an outage wherever refcnt >= vdev_children.
+ */
+static int
+space_reftree_compare(const void *x1, const void *x2)
+{
+	const space_ref_t *sr1 = x1;
+	const space_ref_t *sr2 = x2;
+
+	if (sr1->sr_offset < sr2->sr_offset)
+		return (-1);
+	if (sr1->sr_offset > sr2->sr_offset)
+		return (1);
+
+	if (sr1 < sr2)
+		return (-1);
+	if (sr1 > sr2)
+		return (1);
+
+	return (0);
+}
+
+void
+space_reftree_create(avl_tree_t *t)
+{
+	avl_create(t, space_reftree_compare,
+	    sizeof (space_ref_t), offsetof(space_ref_t, sr_node));
+}
+
+void
+space_reftree_destroy(avl_tree_t *t)
+{
+	space_ref_t *sr;
+	void *cookie = NULL;
+
+	while ((sr = avl_destroy_nodes(t, &cookie)) != NULL)
+		kmem_free(sr, sizeof (*sr));
+
+	avl_destroy(t);
+}
+
+static void
+space_reftree_add_node(avl_tree_t *t, uint64_t offset, int64_t refcnt)
+{
+	space_ref_t *sr;
+
+	sr = kmem_alloc(sizeof (*sr), KM_PUSHPAGE);
+	sr->sr_offset = offset;
+	sr->sr_refcnt = refcnt;
+
+	avl_add(t, sr);
+}
+
+void
+space_reftree_add_seg(avl_tree_t *t, uint64_t start, uint64_t end,
+	int64_t refcnt)
+{
+	space_reftree_add_node(t, start, refcnt);
+	space_reftree_add_node(t, end, -refcnt);
+}
+
+/*
+ * Convert (or add) a range tree into a reference tree.
+ */
+void
+space_reftree_add_map(avl_tree_t *t, range_tree_t *rt, int64_t refcnt)
+{
+	range_seg_t *rs;
+
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+
+	for (rs = avl_first(&rt->rt_root); rs; rs = AVL_NEXT(&rt->rt_root, rs))
+		space_reftree_add_seg(t, rs->rs_start, rs->rs_end, refcnt);
+}
+
+/*
+ * Convert a reference tree into a range tree.  The range tree will contain
+ * all members of the reference tree for which refcnt >= minref.
+ */
+void
+space_reftree_generate_map(avl_tree_t *t, range_tree_t *rt, int64_t minref)
+{
+	uint64_t start = -1ULL;
+	int64_t refcnt = 0;
+	space_ref_t *sr;
+
+	ASSERT(MUTEX_HELD(rt->rt_lock));
+
+	range_tree_vacate(rt, NULL, NULL);
+
+	for (sr = avl_first(t); sr != NULL; sr = AVL_NEXT(t, sr)) {
+		refcnt += sr->sr_refcnt;
+		if (refcnt >= minref) {
+			if (start == -1ULL) {
+				start = sr->sr_offset;
+			}
+		} else {
+			if (start != -1ULL) {
+				uint64_t end = sr->sr_offset;
+				ASSERT(start <= end);
+				if (end > start)
+					range_tree_add(rt, start, end - start);
+				start = -1ULL;
+			}
+		}
+	}
+	ASSERT(refcnt == 0);
+	ASSERT(start == -1ULL);
+}
diff --git a/module/zfs/vdev.c b/module/zfs/vdev.c
index 4c67792c9..69a315317 100644
--- a/module/zfs/vdev.c
+++ b/module/zfs/vdev.c
@@ -36,6 +36,7 @@
 #include <sys/metaslab.h>
 #include <sys/metaslab_impl.h>
 #include <sys/space_map.h>
+#include <sys/space_reftree.h>
 #include <sys/zio.h>
 #include <sys/zap.h>
 #include <sys/fs/zfs.h>
@@ -193,7 +194,7 @@ vdev_add_child(vdev_t *pvd, vdev_t *cvd)
 	pvd->vdev_children = MAX(pvd->vdev_children, id + 1);
 	newsize = pvd->vdev_children * sizeof (vdev_t *);
 
-	newchild = kmem_zalloc(newsize, KM_PUSHPAGE);
+	newchild = kmem_alloc(newsize, KM_PUSHPAGE);
 	if (pvd->vdev_child != NULL) {
 		bcopy(pvd->vdev_child, newchild, oldsize);
 		kmem_free(pvd->vdev_child, oldsize);
@@ -263,7 +264,7 @@ vdev_compact_children(vdev_t *pvd)
 		if (pvd->vdev_child[c])
 			newc++;
 
-	newchild = kmem_alloc(newc * sizeof (vdev_t *), KM_PUSHPAGE);
+	newchild = kmem_zalloc(newc * sizeof (vdev_t *), KM_PUSHPAGE);
 
 	for (c = newc = 0; c < oldc; c++) {
 		if ((cvd = pvd->vdev_child[c]) != NULL) {
@@ -324,7 +325,7 @@ vdev_alloc_common(spa_t *spa, uint_t id, uint64_t guid, vdev_ops_t *ops)
 	mutex_init(&vd->vdev_stat_lock, NULL, MUTEX_DEFAULT, NULL);
 	mutex_init(&vd->vdev_probe_lock, NULL, MUTEX_DEFAULT, NULL);
 	for (t = 0; t < DTL_TYPES; t++) {
-		space_map_create(&vd->vdev_dtl[t], 0, -1ULL, 0,
+		vd->vdev_dtl[t] = range_tree_create(NULL, NULL,
 		    &vd->vdev_dtl_lock);
 	}
 	txg_list_create(&vd->vdev_ms_list,
@@ -510,7 +511,7 @@ vdev_alloc(spa_t *spa, vdev_t **vdp, nvlist_t *nv, vdev_t *parent, uint_t id,
 	    alloctype == VDEV_ALLOC_ROOTPOOL)) {
 		if (alloctype == VDEV_ALLOC_LOAD) {
 			(void) nvlist_lookup_uint64(nv, ZPOOL_CONFIG_DTL,
-			    &vd->vdev_dtl_smo.smo_object);
+			    &vd->vdev_dtl_object);
 			(void) nvlist_lookup_uint64(nv, ZPOOL_CONFIG_UNSPARE,
 			    &vd->vdev_unspare);
 		}
@@ -633,9 +634,10 @@ vdev_free(vdev_t *vd)
 	txg_list_destroy(&vd->vdev_dtl_list);
 
 	mutex_enter(&vd->vdev_dtl_lock);
+	space_map_close(vd->vdev_dtl_sm);
 	for (t = 0; t < DTL_TYPES; t++) {
-		space_map_unload(&vd->vdev_dtl[t]);
-		space_map_destroy(&vd->vdev_dtl[t]);
+		range_tree_vacate(vd->vdev_dtl[t], NULL, NULL);
+		range_tree_destroy(vd->vdev_dtl[t]);
 	}
 	mutex_exit(&vd->vdev_dtl_lock);
 
@@ -859,27 +861,16 @@ vdev_metaslab_init(vdev_t *vd, uint64_t txg)
 	vd->vdev_ms_count = newc;
 
 	for (m = oldc; m < newc; m++) {
-		space_map_obj_t smo = { 0, 0, 0 };
+		uint64_t object = 0;
+
 		if (txg == 0) {
-			uint64_t object = 0;
 			error = dmu_read(mos, vd->vdev_ms_array,
 			    m * sizeof (uint64_t), sizeof (uint64_t), &object,
 			    DMU_READ_PREFETCH);
 			if (error)
 				return (error);
-			if (object != 0) {
-				dmu_buf_t *db;
-				error = dmu_bonus_hold(mos, object, FTAG, &db);
-				if (error)
-					return (error);
-				ASSERT3U(db->db_size, >=, sizeof (smo));
-				bcopy(db->db_data, &smo, sizeof (smo));
-				ASSERT3U(smo.smo_object, ==, object);
-				dmu_buf_rele(db, FTAG);
-			}
 		}
-		vd->vdev_ms[m] = metaslab_init(vd->vdev_mg, &smo,
-		    m << vd->vdev_ms_shift, 1ULL << vd->vdev_ms_shift, txg);
+		vd->vdev_ms[m] = metaslab_init(vd->vdev_mg, m, object, txg);
 	}
 
 	if (txg == 0)
@@ -907,9 +898,12 @@ vdev_metaslab_fini(vdev_t *vd)
 
 	if (vd->vdev_ms != NULL) {
 		metaslab_group_passivate(vd->vdev_mg);
-		for (m = 0; m < count; m++)
-			if (vd->vdev_ms[m] != NULL)
-				metaslab_fini(vd->vdev_ms[m]);
+		for (m = 0; m < count; m++) {
+			metaslab_t *msp = vd->vdev_ms[m];
+
+			if (msp != NULL)
+				metaslab_fini(msp);
+		}
 		kmem_free(vd->vdev_ms, count * sizeof (metaslab_t *));
 		vd->vdev_ms = NULL;
 	}
@@ -1572,9 +1566,10 @@ vdev_create(vdev_t *vd, uint64_t txg, boolean_t isreplacing)
 	}
 
 	/*
-	 * Recursively initialize all labels.
+	 * Recursively load DTLs and initialize all labels.
 	 */
-	if ((error = vdev_label_init(vd, txg, isreplacing ?
+	if ((error = vdev_dtl_load(vd)) != 0 ||
+	    (error = vdev_label_init(vd, txg, isreplacing ?
 	    VDEV_LABEL_REPLACE : VDEV_LABEL_CREATE)) != 0) {
 		vdev_close(vd);
 		return (error);
@@ -1610,6 +1605,18 @@ vdev_dirty(vdev_t *vd, int flags, void *arg, uint64_t txg)
 	(void) txg_list_add(&vd->vdev_spa->spa_vdev_txg_list, vd, txg);
 }
 
+void
+vdev_dirty_leaves(vdev_t *vd, int flags, uint64_t txg)
+{
+	int c;
+
+	for (c = 0; c < vd->vdev_children; c++)
+		vdev_dirty_leaves(vd->vdev_child[c], flags, txg);
+
+	if (vd->vdev_ops->vdev_op_leaf)
+		vdev_dirty(vd->vdev_top, flags, vd, txg);
+}
+
 /*
  * DTLs.
  *
@@ -1651,31 +1658,31 @@ vdev_dirty(vdev_t *vd, int flags, void *arg, uint64_t txg)
 void
 vdev_dtl_dirty(vdev_t *vd, vdev_dtl_type_t t, uint64_t txg, uint64_t size)
 {
-	space_map_t *sm = &vd->vdev_dtl[t];
+	range_tree_t *rt = vd->vdev_dtl[t];
 
 	ASSERT(t < DTL_TYPES);
 	ASSERT(vd != vd->vdev_spa->spa_root_vdev);
 	ASSERT(spa_writeable(vd->vdev_spa));
 
-	mutex_enter(sm->sm_lock);
-	if (!space_map_contains(sm, txg, size))
-		space_map_add(sm, txg, size);
-	mutex_exit(sm->sm_lock);
+	mutex_enter(rt->rt_lock);
+	if (!range_tree_contains(rt, txg, size))
+		range_tree_add(rt, txg, size);
+	mutex_exit(rt->rt_lock);
 }
 
 boolean_t
 vdev_dtl_contains(vdev_t *vd, vdev_dtl_type_t t, uint64_t txg, uint64_t size)
 {
-	space_map_t *sm = &vd->vdev_dtl[t];
+	range_tree_t *rt = vd->vdev_dtl[t];
 	boolean_t dirty = B_FALSE;
 
 	ASSERT(t < DTL_TYPES);
 	ASSERT(vd != vd->vdev_spa->spa_root_vdev);
 
-	mutex_enter(sm->sm_lock);
-	if (sm->sm_space != 0)
-		dirty = space_map_contains(sm, txg, size);
-	mutex_exit(sm->sm_lock);
+	mutex_enter(rt->rt_lock);
+	if (range_tree_space(rt) != 0)
+		dirty = range_tree_contains(rt, txg, size);
+	mutex_exit(rt->rt_lock);
 
 	return (dirty);
 }
@@ -1683,12 +1690,12 @@ vdev_dtl_contains(vdev_t *vd, vdev_dtl_type_t t, uint64_t txg, uint64_t size)
 boolean_t
 vdev_dtl_empty(vdev_t *vd, vdev_dtl_type_t t)
 {
-	space_map_t *sm = &vd->vdev_dtl[t];
+	range_tree_t *rt = vd->vdev_dtl[t];
 	boolean_t empty;
 
-	mutex_enter(sm->sm_lock);
-	empty = (sm->sm_space == 0);
-	mutex_exit(sm->sm_lock);
+	mutex_enter(rt->rt_lock);
+	empty = (range_tree_space(rt) == 0);
+	mutex_exit(rt->rt_lock);
 
 	return (empty);
 }
@@ -1699,14 +1706,14 @@ vdev_dtl_empty(vdev_t *vd, vdev_dtl_type_t t)
 static uint64_t
 vdev_dtl_min(vdev_t *vd)
 {
-	space_seg_t *ss;
+	range_seg_t *rs;
 
 	ASSERT(MUTEX_HELD(&vd->vdev_dtl_lock));
-	ASSERT3U(vd->vdev_dtl[DTL_MISSING].sm_space, !=, 0);
+	ASSERT3U(range_tree_space(vd->vdev_dtl[DTL_MISSING]), !=, 0);
 	ASSERT0(vd->vdev_children);
 
-	ss = avl_first(&vd->vdev_dtl[DTL_MISSING].sm_root);
-	return (ss->ss_start - 1);
+	rs = avl_first(&vd->vdev_dtl[DTL_MISSING]->rt_root);
+	return (rs->rs_start - 1);
 }
 
 /*
@@ -1715,14 +1722,14 @@ vdev_dtl_min(vdev_t *vd)
 static uint64_t
 vdev_dtl_max(vdev_t *vd)
 {
-	space_seg_t *ss;
+	range_seg_t *rs;
 
 	ASSERT(MUTEX_HELD(&vd->vdev_dtl_lock));
-	ASSERT3U(vd->vdev_dtl[DTL_MISSING].sm_space, !=, 0);
+	ASSERT3U(range_tree_space(vd->vdev_dtl[DTL_MISSING]), !=, 0);
 	ASSERT0(vd->vdev_children);
 
-	ss = avl_last(&vd->vdev_dtl[DTL_MISSING].sm_root);
-	return (ss->ss_end);
+	rs = avl_last(&vd->vdev_dtl[DTL_MISSING]->rt_root);
+	return (rs->rs_end);
 }
 
 /*
@@ -1743,7 +1750,7 @@ vdev_dtl_should_excise(vdev_t *vd)
 	ASSERT0(vd->vdev_children);
 
 	if (vd->vdev_resilver_txg == 0 ||
-	    vd->vdev_dtl[DTL_MISSING].sm_space == 0)
+	    range_tree_space(vd->vdev_dtl[DTL_MISSING]) == 0)
 		return (B_TRUE);
 
 	/*
@@ -1813,35 +1820,35 @@ vdev_dtl_reassess(vdev_t *vd, uint64_t txg, uint64_t scrub_txg, int scrub_done)
 			 * positive refcnt -- either 1 or 2.  We then convert
 			 * the reference tree into the new DTL_MISSING map.
 			 */
-			space_map_ref_create(&reftree);
-			space_map_ref_add_map(&reftree,
-			    &vd->vdev_dtl[DTL_MISSING], 1);
-			space_map_ref_add_seg(&reftree, 0, scrub_txg, -1);
-			space_map_ref_add_map(&reftree,
-			    &vd->vdev_dtl[DTL_SCRUB], 2);
-			space_map_ref_generate_map(&reftree,
-			    &vd->vdev_dtl[DTL_MISSING], 1);
-			space_map_ref_destroy(&reftree);
+			space_reftree_create(&reftree);
+			space_reftree_add_map(&reftree,
+			    vd->vdev_dtl[DTL_MISSING], 1);
+			space_reftree_add_seg(&reftree, 0, scrub_txg, -1);
+			space_reftree_add_map(&reftree,
+			    vd->vdev_dtl[DTL_SCRUB], 2);
+			space_reftree_generate_map(&reftree,
+			    vd->vdev_dtl[DTL_MISSING], 1);
+			space_reftree_destroy(&reftree);
 		}
-		space_map_vacate(&vd->vdev_dtl[DTL_PARTIAL], NULL, NULL);
-		space_map_walk(&vd->vdev_dtl[DTL_MISSING],
-		    space_map_add, &vd->vdev_dtl[DTL_PARTIAL]);
+		range_tree_vacate(vd->vdev_dtl[DTL_PARTIAL], NULL, NULL);
+		range_tree_walk(vd->vdev_dtl[DTL_MISSING],
+		    range_tree_add, vd->vdev_dtl[DTL_PARTIAL]);
 		if (scrub_done)
-			space_map_vacate(&vd->vdev_dtl[DTL_SCRUB], NULL, NULL);
-		space_map_vacate(&vd->vdev_dtl[DTL_OUTAGE], NULL, NULL);
+			range_tree_vacate(vd->vdev_dtl[DTL_SCRUB], NULL, NULL);
+		range_tree_vacate(vd->vdev_dtl[DTL_OUTAGE], NULL, NULL);
 		if (!vdev_readable(vd))
-			space_map_add(&vd->vdev_dtl[DTL_OUTAGE], 0, -1ULL);
+			range_tree_add(vd->vdev_dtl[DTL_OUTAGE], 0, -1ULL);
 		else
-			space_map_walk(&vd->vdev_dtl[DTL_MISSING],
-			    space_map_add, &vd->vdev_dtl[DTL_OUTAGE]);
+			range_tree_walk(vd->vdev_dtl[DTL_MISSING],
+			    range_tree_add, vd->vdev_dtl[DTL_OUTAGE]);
 
 		/*
 		 * If the vdev was resilvering and no longer has any
 		 * DTLs then reset its resilvering flag.
 		 */
 		if (vd->vdev_resilver_txg != 0 &&
-		    vd->vdev_dtl[DTL_MISSING].sm_space == 0 &&
-		    vd->vdev_dtl[DTL_OUTAGE].sm_space == 0)
+		    range_tree_space(vd->vdev_dtl[DTL_MISSING]) == 0 &&
+		    range_tree_space(vd->vdev_dtl[DTL_OUTAGE]) == 0)
 			vd->vdev_resilver_txg = 0;
 
 		mutex_exit(&vd->vdev_dtl_lock);
@@ -1853,6 +1860,8 @@ vdev_dtl_reassess(vdev_t *vd, uint64_t txg, uint64_t scrub_txg, int scrub_done)
 
 	mutex_enter(&vd->vdev_dtl_lock);
 	for (t = 0; t < DTL_TYPES; t++) {
+		int c;
+
 		/* account for child's outage in parent's missing map */
 		int s = (t == DTL_MISSING) ? DTL_OUTAGE: t;
 		if (t == DTL_SCRUB)
@@ -1863,46 +1872,56 @@ vdev_dtl_reassess(vdev_t *vd, uint64_t txg, uint64_t scrub_txg, int scrub_done)
 			minref = vd->vdev_nparity + 1;	/* RAID-Z */
 		else
 			minref = vd->vdev_children;	/* any kind of mirror */
-		space_map_ref_create(&reftree);
+		space_reftree_create(&reftree);
 		for (c = 0; c < vd->vdev_children; c++) {
 			vdev_t *cvd = vd->vdev_child[c];
 			mutex_enter(&cvd->vdev_dtl_lock);
-			space_map_ref_add_map(&reftree, &cvd->vdev_dtl[s], 1);
+			space_reftree_add_map(&reftree, cvd->vdev_dtl[s], 1);
 			mutex_exit(&cvd->vdev_dtl_lock);
 		}
-		space_map_ref_generate_map(&reftree, &vd->vdev_dtl[t], minref);
-		space_map_ref_destroy(&reftree);
+		space_reftree_generate_map(&reftree, vd->vdev_dtl[t], minref);
+		space_reftree_destroy(&reftree);
 	}
 	mutex_exit(&vd->vdev_dtl_lock);
 }
 
-static int
+int
 vdev_dtl_load(vdev_t *vd)
 {
 	spa_t *spa = vd->vdev_spa;
-	space_map_obj_t *smo = &vd->vdev_dtl_smo;
 	objset_t *mos = spa->spa_meta_objset;
-	dmu_buf_t *db;
-	int error;
+	int error = 0;
+	int c;
 
-	ASSERT(vd->vdev_children == 0);
+	if (vd->vdev_ops->vdev_op_leaf && vd->vdev_dtl_object != 0) {
+		ASSERT(!vd->vdev_ishole);
 
-	if (smo->smo_object == 0)
-		return (0);
+		error = space_map_open(&vd->vdev_dtl_sm, mos,
+		    vd->vdev_dtl_object, 0, -1ULL, 0, &vd->vdev_dtl_lock);
+		if (error)
+			return (error);
+		ASSERT(vd->vdev_dtl_sm != NULL);
 
-	ASSERT(!vd->vdev_ishole);
+		mutex_enter(&vd->vdev_dtl_lock);
 
-	if ((error = dmu_bonus_hold(mos, smo->smo_object, FTAG, &db)) != 0)
-		return (error);
+		/*
+		 * Now that we've opened the space_map we need to update
+		 * the in-core DTL.
+		 */
+		space_map_update(vd->vdev_dtl_sm);
 
-	ASSERT3U(db->db_size, >=, sizeof (*smo));
-	bcopy(db->db_data, smo, sizeof (*smo));
-	dmu_buf_rele(db, FTAG);
+		error = space_map_load(vd->vdev_dtl_sm,
+		    vd->vdev_dtl[DTL_MISSING], SM_ALLOC);
+		mutex_exit(&vd->vdev_dtl_lock);
 
-	mutex_enter(&vd->vdev_dtl_lock);
-	error = space_map_load(&vd->vdev_dtl[DTL_MISSING],
-	    NULL, SM_ALLOC, smo, mos);
-	mutex_exit(&vd->vdev_dtl_lock);
+		return (error);
+	}
+
+	for (c = 0; c < vd->vdev_children; c++) {
+		error = vdev_dtl_load(vd->vdev_child[c]);
+		if (error != 0)
+			break;
+	}
 
 	return (error);
 }
@@ -1911,64 +1930,74 @@ void
 vdev_dtl_sync(vdev_t *vd, uint64_t txg)
 {
 	spa_t *spa = vd->vdev_spa;
-	space_map_obj_t *smo = &vd->vdev_dtl_smo;
-	space_map_t *sm = &vd->vdev_dtl[DTL_MISSING];
+	range_tree_t *rt = vd->vdev_dtl[DTL_MISSING];
 	objset_t *mos = spa->spa_meta_objset;
-	space_map_t smsync;
-	kmutex_t smlock;
-	dmu_buf_t *db;
+	range_tree_t *rtsync;
+	kmutex_t rtlock;
 	dmu_tx_t *tx;
+	uint64_t object = space_map_object(vd->vdev_dtl_sm);
 
 	ASSERT(!vd->vdev_ishole);
+	ASSERT(vd->vdev_ops->vdev_op_leaf);
 
 	tx = dmu_tx_create_assigned(spa->spa_dsl_pool, txg);
 
-	if (vd->vdev_detached) {
-		if (smo->smo_object != 0) {
-			VERIFY0(dmu_object_free(mos, smo->smo_object, tx));
-			smo->smo_object = 0;
-		}
+	if (vd->vdev_detached || vd->vdev_top->vdev_removing) {
+		mutex_enter(&vd->vdev_dtl_lock);
+		space_map_free(vd->vdev_dtl_sm, tx);
+		space_map_close(vd->vdev_dtl_sm);
+		vd->vdev_dtl_sm = NULL;
+		mutex_exit(&vd->vdev_dtl_lock);
 		dmu_tx_commit(tx);
 		return;
 	}
 
-	if (smo->smo_object == 0) {
-		ASSERT(smo->smo_objsize == 0);
-		ASSERT(smo->smo_alloc == 0);
-		smo->smo_object = dmu_object_alloc(mos,
-		    DMU_OT_SPACE_MAP, 1 << SPACE_MAP_BLOCKSHIFT,
-		    DMU_OT_SPACE_MAP_HEADER, sizeof (*smo), tx);
-		ASSERT(smo->smo_object != 0);
-		vdev_config_dirty(vd->vdev_top);
+	if (vd->vdev_dtl_sm == NULL) {
+		uint64_t new_object;
+
+		new_object = space_map_alloc(mos, tx);
+		VERIFY3U(new_object, !=, 0);
+
+		VERIFY0(space_map_open(&vd->vdev_dtl_sm, mos, new_object,
+		    0, -1ULL, 0, &vd->vdev_dtl_lock));
+		ASSERT(vd->vdev_dtl_sm != NULL);
 	}
 
-	mutex_init(&smlock, NULL, MUTEX_DEFAULT, NULL);
+	mutex_init(&rtlock, NULL, MUTEX_DEFAULT, NULL);
 
-	space_map_create(&smsync, sm->sm_start, sm->sm_size, sm->sm_shift,
-	    &smlock);
+	rtsync = range_tree_create(NULL, NULL, &rtlock);
 
-	mutex_enter(&smlock);
+	mutex_enter(&rtlock);
 
 	mutex_enter(&vd->vdev_dtl_lock);
-	space_map_walk(sm, space_map_add, &smsync);
+	range_tree_walk(rt, range_tree_add, rtsync);
 	mutex_exit(&vd->vdev_dtl_lock);
 
-	space_map_truncate(smo, mos, tx);
-	space_map_sync(&smsync, SM_ALLOC, smo, mos, tx);
-	space_map_vacate(&smsync, NULL, NULL);
+	space_map_truncate(vd->vdev_dtl_sm, tx);
+	space_map_write(vd->vdev_dtl_sm, rtsync, SM_ALLOC, tx);
+	range_tree_vacate(rtsync, NULL, NULL);
 
-	space_map_destroy(&smsync);
+	range_tree_destroy(rtsync);
 
-	mutex_exit(&smlock);
-	mutex_destroy(&smlock);
+	mutex_exit(&rtlock);
+	mutex_destroy(&rtlock);
 
-	VERIFY(0 == dmu_bonus_hold(mos, smo->smo_object, FTAG, &db));
-	dmu_buf_will_dirty(db, tx);
-	ASSERT3U(db->db_size, >=, sizeof (*smo));
-	bcopy(smo, db->db_data, sizeof (*smo));
-	dmu_buf_rele(db, FTAG);
+	/*
+	 * If the object for the space map has changed then dirty
+	 * the top level so that we update the config.
+	 */
+	if (object != space_map_object(vd->vdev_dtl_sm)) {
+		zfs_dbgmsg("txg %llu, spa %s, DTL old object %llu, "
+		    "new object %llu", txg, spa_name(spa), object,
+		    space_map_object(vd->vdev_dtl_sm));
+		vdev_config_dirty(vd->vdev_top);
+	}
 
 	dmu_tx_commit(tx);
+
+	mutex_enter(&vd->vdev_dtl_lock);
+	space_map_update(vd->vdev_dtl_sm);
+	mutex_exit(&vd->vdev_dtl_lock);
 }
 
 /*
@@ -2018,7 +2047,7 @@ vdev_resilver_needed(vdev_t *vd, uint64_t *minp, uint64_t *maxp)
 
 	if (vd->vdev_children == 0) {
 		mutex_enter(&vd->vdev_dtl_lock);
-		if (vd->vdev_dtl[DTL_MISSING].sm_space != 0 &&
+		if (range_tree_space(vd->vdev_dtl[DTL_MISSING]) != 0 &&
 		    vdev_writeable(vd)) {
 
 			thismin = vdev_dtl_min(vd);
@@ -2126,29 +2155,25 @@ vdev_remove(vdev_t *vd, uint64_t txg)
 
 	tx = dmu_tx_create_assigned(spa_get_dsl(spa), txg);
 
-	if (vd->vdev_dtl_smo.smo_object) {
-		ASSERT0(vd->vdev_dtl_smo.smo_alloc);
-		(void) dmu_object_free(mos, vd->vdev_dtl_smo.smo_object, tx);
-		vd->vdev_dtl_smo.smo_object = 0;
-	}
-
 	if (vd->vdev_ms != NULL) {
 		for (m = 0; m < vd->vdev_ms_count; m++) {
 			metaslab_t *msp = vd->vdev_ms[m];
 
-			if (msp == NULL || msp->ms_smo.smo_object == 0)
+			if (msp == NULL || msp->ms_sm == NULL)
 				continue;
 
-			ASSERT0(msp->ms_smo.smo_alloc);
-			(void) dmu_object_free(mos, msp->ms_smo.smo_object, tx);
-			msp->ms_smo.smo_object = 0;
+			mutex_enter(&msp->ms_lock);
+			VERIFY0(space_map_allocated(msp->ms_sm));
+			space_map_free(msp->ms_sm, tx);
+			space_map_close(msp->ms_sm);
+			msp->ms_sm = NULL;
+			mutex_exit(&msp->ms_lock);
 		}
 	}
 
 	if (vd->vdev_ms_array) {
 		(void) dmu_object_free(mos, vd->vdev_ms_array, tx);
 		vd->vdev_ms_array = 0;
-		vd->vdev_ms_shift = 0;
 	}
 	dmu_tx_commit(tx);
 }
diff --git a/module/zfs/vdev_label.c b/module/zfs/vdev_label.c
index d5af110a5..0780bf601 100644
--- a/module/zfs/vdev_label.c
+++ b/module/zfs/vdev_label.c
@@ -283,9 +283,10 @@ vdev_config_generate(spa_t *spa, vdev_t *vd, boolean_t getstats,
 			    vd->vdev_removing);
 	}
 
-	if (vd->vdev_dtl_smo.smo_object != 0)
+	if (vd->vdev_dtl_sm != NULL) {
 		fnvlist_add_uint64(nv, ZPOOL_CONFIG_DTL,
-		    vd->vdev_dtl_smo.smo_object);
+		    space_map_object(vd->vdev_dtl_sm));
+	}
 
 	if (vd->vdev_crtxg)
 		fnvlist_add_uint64(nv, ZPOOL_CONFIG_CREATE_TXG, vd->vdev_crtxg);
diff --git a/module/zfs/zfeature.c b/module/zfs/zfeature.c
index 4f4785a0c..cdb9d6dce 100644
--- a/module/zfs/zfeature.c
+++ b/module/zfs/zfeature.c
@@ -369,36 +369,46 @@ spa_feature_enable(spa_t *spa, zfeature_info_t *feature, dmu_tx_t *tx)
 	    spa->spa_feat_desc_obj, feature, FEATURE_ACTION_ENABLE, tx));
 }
 
-/*
- * If the specified feature has not yet been enabled, this function returns
- * ENOTSUP; otherwise, this function increments the feature's refcount (or
- * returns EOVERFLOW if the refcount cannot be incremented). This function must
- * be called from syncing context.
- */
 void
 spa_feature_incr(spa_t *spa, zfeature_info_t *feature, dmu_tx_t *tx)
 {
+	ASSERT(dmu_tx_is_syncing(tx));
 	ASSERT3U(spa_version(spa), >=, SPA_VERSION_FEATURES);
 	VERIFY3U(0, ==, feature_do_action(spa->spa_meta_objset,
 	    spa->spa_feat_for_read_obj, spa->spa_feat_for_write_obj,
 	    spa->spa_feat_desc_obj, feature, FEATURE_ACTION_INCR, tx));
 }
 
-/*
- * If the specified feature has not yet been enabled, this function returns
- * ENOTSUP; otherwise, this function decrements the feature's refcount (or
- * returns EOVERFLOW if the refcount is already 0). This function must
- * be called from syncing context.
- */
 void
 spa_feature_decr(spa_t *spa, zfeature_info_t *feature, dmu_tx_t *tx)
 {
+	ASSERT(dmu_tx_is_syncing(tx));
 	ASSERT3U(spa_version(spa), >=, SPA_VERSION_FEATURES);
 	VERIFY3U(0, ==, feature_do_action(spa->spa_meta_objset,
 	    spa->spa_feat_for_read_obj, spa->spa_feat_for_write_obj,
 	    spa->spa_feat_desc_obj, feature, FEATURE_ACTION_DECR, tx));
 }
 
+/*
+ * This interface is for debugging only. Normal consumers should use
+ * spa_feature_is_enabled/spa_feature_is_active.
+ */
+int
+spa_feature_get_refcount(spa_t *spa, zfeature_info_t *feature)
+{
+	int err;
+	uint64_t refcount = 0;
+
+	if (spa_version(spa) < SPA_VERSION_FEATURES)
+		return (B_FALSE);
+
+	err = feature_get_refcount(spa->spa_meta_objset,
+	    spa->spa_feat_for_read_obj, spa->spa_feat_for_write_obj,
+	    feature, &refcount);
+	ASSERT(err == 0 || err == ENOTSUP);
+	return (err == 0 ? refcount : 0);
+}
+
 boolean_t
 spa_feature_is_enabled(spa_t *spa, zfeature_info_t *feature)
 {
diff --git a/module/zfs/zfeature_common.c b/module/zfs/zfeature_common.c
index 2e1dc4e5c..cee544880 100644
--- a/module/zfs/zfeature_common.c
+++ b/module/zfs/zfeature_common.c
@@ -20,7 +20,7 @@
  */
 
 /*
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  */
 
@@ -164,4 +164,7 @@ zpool_feature_init(void)
 	zfeature_register(SPA_FEATURE_LZ4_COMPRESS,
 	    "org.illumos:lz4_compress", "lz4_compress",
 	    "LZ4 compression algorithm support.", B_FALSE, B_FALSE, NULL);
+	zfeature_register(SPA_FEATURE_SPACEMAP_HISTOGRAM,
+	    "com.delphix:spacemap_histogram", "spacemap_histogram",
+	    "Spacemaps maintain space histograms.", B_TRUE, B_FALSE, NULL);
 }
-- 
cgit v1.2.3