Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #pragma ident	"@(#)dsl_dataset.c	1.38	07/12/18 SMI"
     27 
     28 #include <sys/dmu_objset.h>
     29 #include <sys/dsl_dataset.h>
     30 #include <sys/dsl_dir.h>
     31 #include <sys/dsl_prop.h>
     32 #include <sys/dsl_synctask.h>
     33 #include <sys/dmu_traverse.h>
     34 #include <sys/dmu_tx.h>
     35 #include <sys/arc.h>
     36 #include <sys/zio.h>
     37 #include <sys/zap.h>
     38 #include <sys/unique.h>
     39 #include <sys/zfs_context.h>
     40 #include <sys/zfs_ioctl.h>
     41 #include <sys/spa.h>
     42 #include <sys/sunddi.h>
     43 
     44 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
     45 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
     46 static dsl_checkfunc_t dsl_dataset_rollback_check;
     47 static dsl_syncfunc_t dsl_dataset_rollback_sync;
     48 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
     49 
     50 #define	DS_REF_MAX	(1ULL << 62)
     51 
     52 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
     53 
     54 /*
     55  * We use weighted reference counts to express the various forms of exclusion
     56  * between different open modes.  A STANDARD open is 1 point, an EXCLUSIVE open
     57  * is DS_REF_MAX, and a PRIMARY open is little more than half of an EXCLUSIVE.
     58  * This makes the exclusion logic simple: the total refcnt for all opens cannot
     59  * exceed DS_REF_MAX.  For example, EXCLUSIVE opens are exclusive because their
     60  * weight (DS_REF_MAX) consumes the entire refcnt space.  PRIMARY opens consume
     61  * just over half of the refcnt space, so there can't be more than one, but it
     62  * can peacefully coexist with any number of STANDARD opens.
     63  */
     64 static uint64_t ds_refcnt_weight[DS_MODE_LEVELS] = {
     65 	0,			/* DS_MODE_NONE - invalid		*/
     66 	1,			/* DS_MODE_STANDARD - unlimited number	*/
     67 	(DS_REF_MAX >> 1) + 1,	/* DS_MODE_PRIMARY - only one of these	*/
     68 	DS_REF_MAX		/* DS_MODE_EXCLUSIVE - no other opens	*/
     69 };
     70 
     71 /*
     72  * Figure out how much of this delta should be propogated to the dsl_dir
     73  * layer.  If there's a refreservation, that space has already been
     74  * partially accounted for in our ancestors.
     75  */
     76 static int64_t
     77 parent_delta(dsl_dataset_t *ds, int64_t delta)
     78 {
     79 	uint64_t old_bytes, new_bytes;
     80 
     81 	if (ds->ds_reserved == 0)
     82 		return (delta);
     83 
     84 	old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
     85 	new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
     86 
     87 	ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
     88 	return (new_bytes - old_bytes);
     89 }
     90 
     91 void
     92 dsl_dataset_block_born(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
     93 {
     94 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
     95 	int compressed = BP_GET_PSIZE(bp);
     96 	int uncompressed = BP_GET_UCSIZE(bp);
     97 	int64_t delta;
     98 
     99 	dprintf_bp(bp, "born, ds=%p\n", ds);
    100 
    101 	ASSERT(dmu_tx_is_syncing(tx));
    102 	/* It could have been compressed away to nothing */
    103 	if (BP_IS_HOLE(bp))
    104 		return;
    105 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
    106 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
    107 	if (ds == NULL) {
    108 		/*
    109 		 * Account for the meta-objset space in its placeholder
    110 		 * dsl_dir.
    111 		 */
    112 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
    113 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
    114 		    used, compressed, uncompressed, tx);
    115 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
    116 		return;
    117 	}
    118 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
    119 	mutex_enter(&ds->ds_lock);
    120 	delta = parent_delta(ds, used);
    121 	ds->ds_phys->ds_used_bytes += used;
    122 	ds->ds_phys->ds_compressed_bytes += compressed;
    123 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
    124 	ds->ds_phys->ds_unique_bytes += used;
    125 	mutex_exit(&ds->ds_lock);
    126 	dsl_dir_diduse_space(ds->ds_dir, delta, compressed, uncompressed, tx);
    127 }
    128 
    129 void
    130 dsl_dataset_block_kill(dsl_dataset_t *ds, blkptr_t *bp, zio_t *pio,
    131     dmu_tx_t *tx)
    132 {
    133 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
    134 	int compressed = BP_GET_PSIZE(bp);
    135 	int uncompressed = BP_GET_UCSIZE(bp);
    136 
    137 	ASSERT(dmu_tx_is_syncing(tx));
    138 	/* No block pointer => nothing to free */
    139 	if (BP_IS_HOLE(bp))
    140 		return;
    141 
    142 	ASSERT(used > 0);
    143 	if (ds == NULL) {
    144 		int err;
    145 		/*
    146 		 * Account for the meta-objset space in its placeholder
    147 		 * dataset.
    148 		 */
    149 		err = arc_free(pio, tx->tx_pool->dp_spa,
    150 		    tx->tx_txg, bp, NULL, NULL, pio ? ARC_NOWAIT: ARC_WAIT);
    151 		ASSERT(err == 0);
    152 
    153 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
    154 		    -used, -compressed, -uncompressed, tx);
    155 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
    156 		return;
    157 	}
    158 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
    159 
    160 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
    161 
    162 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
    163 		int err;
    164 		int64_t delta;
    165 
    166 		dprintf_bp(bp, "freeing: %s", "");
    167 		err = arc_free(pio, tx->tx_pool->dp_spa,
    168 		    tx->tx_txg, bp, NULL, NULL, pio ? ARC_NOWAIT: ARC_WAIT);
    169 		ASSERT(err == 0);
    170 
    171 		mutex_enter(&ds->ds_lock);
    172 		ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
    173 		    !DS_UNIQUE_IS_ACCURATE(ds));
    174 		delta = parent_delta(ds, -used);
    175 		ds->ds_phys->ds_unique_bytes -= used;
    176 		mutex_exit(&ds->ds_lock);
    177 		dsl_dir_diduse_space(ds->ds_dir,
    178 		    delta, -compressed, -uncompressed, tx);
    179 	} else {
    180 		dprintf_bp(bp, "putting on dead list: %s", "");
    181 		VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
    182 		ASSERT3U(ds->ds_prev->ds_object, ==,
    183 		    ds->ds_phys->ds_prev_snap_obj);
    184 		ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
    185 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
    186 		if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
    187 		    ds->ds_object && bp->blk_birth >
    188 		    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
    189 			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
    190 			mutex_enter(&ds->ds_prev->ds_lock);
    191 			ds->ds_prev->ds_phys->ds_unique_bytes += used;
    192 			mutex_exit(&ds->ds_prev->ds_lock);
    193 		}
    194 	}
    195 	mutex_enter(&ds->ds_lock);
    196 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
    197 	ds->ds_phys->ds_used_bytes -= used;
    198 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
    199 	ds->ds_phys->ds_compressed_bytes -= compressed;
    200 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
    201 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
    202 	mutex_exit(&ds->ds_lock);
    203 }
    204 
    205 uint64_t
    206 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
    207 {
    208 	uint64_t trysnap = 0;
    209 
    210 	if (ds == NULL)
    211 		return (0);
    212 	/*
    213 	 * The snapshot creation could fail, but that would cause an
    214 	 * incorrect FALSE return, which would only result in an
    215 	 * overestimation of the amount of space that an operation would
    216 	 * consume, which is OK.
    217 	 *
    218 	 * There's also a small window where we could miss a pending
    219 	 * snapshot, because we could set the sync task in the quiescing
    220 	 * phase.  So this should only be used as a guess.
    221 	 */
    222 	if (ds->ds_trysnap_txg >
    223 	    spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
    224 		trysnap = ds->ds_trysnap_txg;
    225 	return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
    226 }
    227 
    228 int
    229 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
    230 {
    231 	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
    232 }
    233 
    234 /* ARGSUSED */
    235 static void
    236 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
    237 {
    238 	dsl_dataset_t *ds = dsv;
    239 
    240 	/* open_refcount == DS_REF_MAX when deleting */
    241 	ASSERT(ds->ds_open_refcount == 0 ||
    242 	    ds->ds_open_refcount == DS_REF_MAX);
    243 
    244 	dprintf_ds(ds, "evicting %s\n", "");
    245 
    246 	unique_remove(ds->ds_fsid_guid);
    247 
    248 	if (ds->ds_user_ptr != NULL)
    249 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
    250 
    251 	if (ds->ds_prev) {
    252 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
    253 		ds->ds_prev = NULL;
    254 	}
    255 
    256 	bplist_close(&ds->ds_deadlist);
    257 	dsl_dir_close(ds->ds_dir, ds);
    258 
    259 	ASSERT(!list_link_active(&ds->ds_synced_link));
    260 
    261 	mutex_destroy(&ds->ds_lock);
    262 	mutex_destroy(&ds->ds_opening_lock);
    263 	mutex_destroy(&ds->ds_deadlist.bpl_lock);
    264 
    265 	kmem_free(ds, sizeof (dsl_dataset_t));
    266 }
    267 
    268 static int
    269 dsl_dataset_get_snapname(dsl_dataset_t *ds)
    270 {
    271 	dsl_dataset_phys_t *headphys;
    272 	int err;
    273 	dmu_buf_t *headdbuf;
    274 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
    275 	objset_t *mos = dp->dp_meta_objset;
    276 
    277 	if (ds->ds_snapname[0])
    278 		return (0);
    279 	if (ds->ds_phys->ds_next_snap_obj == 0)
    280 		return (0);
    281 
    282 	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
    283 	    FTAG, &headdbuf);
    284 	if (err)
    285 		return (err);
    286 	headphys = headdbuf->db_data;
    287 	err = zap_value_search(dp->dp_meta_objset,
    288 	    headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
    289 	dmu_buf_rele(headdbuf, FTAG);
    290 	return (err);
    291 }
    292 
    293 int
    294 dsl_dataset_open_obj(dsl_pool_t *dp, uint64_t dsobj, const char *snapname,
    295     int mode, void *tag, dsl_dataset_t **dsp)
    296 {
    297 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
    298 	objset_t *mos = dp->dp_meta_objset;
    299 	dmu_buf_t *dbuf;
    300 	dsl_dataset_t *ds;
    301 	int err;
    302 
    303 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
    304 	    dsl_pool_sync_context(dp));
    305 
    306 	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
    307 	if (err)
    308 		return (err);
    309 	ds = dmu_buf_get_user(dbuf);
    310 	if (ds == NULL) {
    311 		dsl_dataset_t *winner;
    312 
    313 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
    314 		ds->ds_dbuf = dbuf;
    315 		ds->ds_object = dsobj;
    316 		ds->ds_phys = dbuf->db_data;
    317 
    318 		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
    319 		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
    320 		mutex_init(&ds->ds_deadlist.bpl_lock, NULL, MUTEX_DEFAULT,
    321 		    NULL);
    322 
    323 		err = bplist_open(&ds->ds_deadlist,
    324 		    mos, ds->ds_phys->ds_deadlist_obj);
    325 		if (err == 0) {
    326 			err = dsl_dir_open_obj(dp,
    327 			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
    328 		}
    329 		if (err) {
    330 			/*
    331 			 * we don't really need to close the blist if we
    332 			 * just opened it.
    333 			 */
    334 			mutex_destroy(&ds->ds_lock);
    335 			mutex_destroy(&ds->ds_opening_lock);
    336 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
    337 			kmem_free(ds, sizeof (dsl_dataset_t));
    338 			dmu_buf_rele(dbuf, tag);
    339 			return (err);
    340 		}
    341 
    342 		if (ds->ds_dir->dd_phys->dd_head_dataset_obj == dsobj) {
    343 			ds->ds_snapname[0] = '\0';
    344 			if (ds->ds_phys->ds_prev_snap_obj) {
    345 				err = dsl_dataset_open_obj(dp,
    346 				    ds->ds_phys->ds_prev_snap_obj, NULL,
    347 				    DS_MODE_NONE, ds, &ds->ds_prev);
    348 			}
    349 		} else {
    350 			if (snapname) {
    351 #ifdef ZFS_DEBUG
    352 				dsl_dataset_phys_t *headphys;
    353 				dmu_buf_t *headdbuf;
    354 				err = dmu_bonus_hold(mos,
    355 				    ds->ds_dir->dd_phys->dd_head_dataset_obj,
    356 				    FTAG, &headdbuf);
    357 				if (err == 0) {
    358 					headphys = headdbuf->db_data;
    359 					uint64_t foundobj;
    360 					err = zap_lookup(dp->dp_meta_objset,
    361 					    headphys->ds_snapnames_zapobj,
    362 					    snapname, sizeof (foundobj), 1,
    363 					    &foundobj);
    364 					ASSERT3U(foundobj, ==, dsobj);
    365 					dmu_buf_rele(headdbuf, FTAG);
    366 				}
    367 #endif
    368 				(void) strcat(ds->ds_snapname, snapname);
    369 			} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
    370 				err = dsl_dataset_get_snapname(ds);
    371 			}
    372 		}
    373 
    374 		if (!dsl_dataset_is_snapshot(ds)) {
    375 			/*
    376 			 * In sync context, we're called with either no lock
    377 			 * or with the write lock.  If we're not syncing,
    378 			 * we're always called with the read lock held.
    379 			 */
    380 			boolean_t need_lock =
    381 			    !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
    382 			    dsl_pool_sync_context(dp);
    383 
    384 			if (need_lock)
    385 				rw_enter(&dp->dp_config_rwlock, RW_READER);
    386 
    387 			err = dsl_prop_get_ds_locked(ds->ds_dir,
    388 			    "refreservation", sizeof (uint64_t), 1,
    389 			    &ds->ds_reserved, NULL);
    390 			if (err == 0) {
    391 				err = dsl_prop_get_ds_locked(ds->ds_dir,
    392 				    "refquota", sizeof (uint64_t), 1,
    393 				    &ds->ds_quota, NULL);
    394 			}
    395 
    396 			if (need_lock)
    397 				rw_exit(&dp->dp_config_rwlock);
    398 		} else {
    399 			ds->ds_reserved = ds->ds_quota = 0;
    400 		}
    401 
    402 		if (err == 0) {
    403 			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
    404 			    dsl_dataset_evict);
    405 		}
    406 		if (err || winner) {
    407 			bplist_close(&ds->ds_deadlist);
    408 			if (ds->ds_prev) {
    409 				dsl_dataset_close(ds->ds_prev,
    410 				    DS_MODE_NONE, ds);
    411 			}
    412 			dsl_dir_close(ds->ds_dir, ds);
    413 			mutex_destroy(&ds->ds_lock);
    414 			mutex_destroy(&ds->ds_opening_lock);
    415 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
    416 			kmem_free(ds, sizeof (dsl_dataset_t));
    417 			if (err) {
    418 				dmu_buf_rele(dbuf, tag);
    419 				return (err);
    420 			}
    421 			ds = winner;
    422 		} else {
    423 			ds->ds_fsid_guid =
    424 			    unique_insert(ds->ds_phys->ds_fsid_guid);
    425 		}
    426 	}
    427 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
    428 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
    429 
    430 	mutex_enter(&ds->ds_lock);
    431 	if ((DS_MODE_LEVEL(mode) == DS_MODE_PRIMARY &&
    432 	    (ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT) &&
    433 	    !DS_MODE_IS_INCONSISTENT(mode)) ||
    434 	    (ds->ds_open_refcount + weight > DS_REF_MAX)) {
    435 		mutex_exit(&ds->ds_lock);
    436 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
    437 		return (EBUSY);
    438 	}
    439 	ds->ds_open_refcount += weight;
    440 	mutex_exit(&ds->ds_lock);
    441 
    442 	*dsp = ds;
    443 	return (0);
    444 }
    445 
    446 int
    447 dsl_dataset_open_spa(spa_t *spa, const char *name, int mode,
    448     void *tag, dsl_dataset_t **dsp)
    449 {
    450 	dsl_dir_t *dd;
    451 	dsl_pool_t *dp;
    452 	const char *tail;
    453 	uint64_t obj;
    454 	dsl_dataset_t *ds = NULL;
    455 	int err = 0;
    456 
    457 	err = dsl_dir_open_spa(spa, name, FTAG, &dd, &tail);
    458 	if (err)
    459 		return (err);
    460 
    461 	dp = dd->dd_pool;
    462 	obj = dd->dd_phys->dd_head_dataset_obj;
    463 	rw_enter(&dp->dp_config_rwlock, RW_READER);
    464 	if (obj == 0) {
    465 		/* A dataset with no associated objset */
    466 		err = ENOENT;
    467 		goto out;
    468 	}
    469 
    470 	if (tail != NULL) {
    471 		objset_t *mos = dp->dp_meta_objset;
    472 
    473 		err = dsl_dataset_open_obj(dp, obj, NULL,
    474 		    DS_MODE_NONE, tag, &ds);
    475 		if (err)
    476 			goto out;
    477 		obj = ds->ds_phys->ds_snapnames_zapobj;
    478 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
    479 		ds = NULL;
    480 
    481 		if (tail[0] != '@') {
    482 			err = ENOENT;
    483 			goto out;
    484 		}
    485 		tail++;
    486 
    487 		/* Look for a snapshot */
    488 		if (!DS_MODE_IS_READONLY(mode)) {
    489 			err = EROFS;
    490 			goto out;
    491 		}
    492 		dprintf("looking for snapshot '%s'\n", tail);
    493 		err = zap_lookup(mos, obj, tail, 8, 1, &obj);
    494 		if (err)
    495 			goto out;
    496 	}
    497 	err = dsl_dataset_open_obj(dp, obj, tail, mode, tag, &ds);
    498 
    499 out:
    500 	rw_exit(&dp->dp_config_rwlock);
    501 	dsl_dir_close(dd, FTAG);
    502 
    503 	ASSERT3U((err == 0), ==, (ds != NULL));
    504 	/* ASSERT(ds == NULL || strcmp(name, ds->ds_name) == 0); */
    505 
    506 	*dsp = ds;
    507 	return (err);
    508 }
    509 
    510 int
    511 dsl_dataset_open(const char *name, int mode, void *tag, dsl_dataset_t **dsp)
    512 {
    513 	return (dsl_dataset_open_spa(NULL, name, mode, tag, dsp));
    514 }
    515 
    516 void
    517 dsl_dataset_name(dsl_dataset_t *ds, char *name)
    518 {
    519 	if (ds == NULL) {
    520 		(void) strcpy(name, "mos");
    521 	} else {
    522 		dsl_dir_name(ds->ds_dir, name);
    523 		VERIFY(0 == dsl_dataset_get_snapname(ds));
    524 		if (ds->ds_snapname[0]) {
    525 			(void) strcat(name, "@");
    526 			if (!MUTEX_HELD(&ds->ds_lock)) {
    527 				/*
    528 				 * We use a "recursive" mutex so that we
    529 				 * can call dprintf_ds() with ds_lock held.
    530 				 */
    531 				mutex_enter(&ds->ds_lock);
    532 				(void) strcat(name, ds->ds_snapname);
    533 				mutex_exit(&ds->ds_lock);
    534 			} else {
    535 				(void) strcat(name, ds->ds_snapname);
    536 			}
    537 		}
    538 	}
    539 }
    540 
    541 static int
    542 dsl_dataset_namelen(dsl_dataset_t *ds)
    543 {
    544 	int result;
    545 
    546 	if (ds == NULL) {
    547 		result = 3;	/* "mos" */
    548 	} else {
    549 		result = dsl_dir_namelen(ds->ds_dir);
    550 		VERIFY(0 == dsl_dataset_get_snapname(ds));
    551 		if (ds->ds_snapname[0]) {
    552 			++result;	/* adding one for the @-sign */
    553 			if (!MUTEX_HELD(&ds->ds_lock)) {
    554 				/* see dsl_datset_name */
    555 				mutex_enter(&ds->ds_lock);
    556 				result += strlen(ds->ds_snapname);
    557 				mutex_exit(&ds->ds_lock);
    558 			} else {
    559 				result += strlen(ds->ds_snapname);
    560 			}
    561 		}
    562 	}
    563 
    564 	return (result);
    565 }
    566 
    567 void
    568 dsl_dataset_close(dsl_dataset_t *ds, int mode, void *tag)
    569 {
    570 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
    571 	mutex_enter(&ds->ds_lock);
    572 	ASSERT3U(ds->ds_open_refcount, >=, weight);
    573 	ds->ds_open_refcount -= weight;
    574 	dprintf_ds(ds, "closing mode %u refcount now 0x%llx\n",
    575 	    mode, ds->ds_open_refcount);
    576 	mutex_exit(&ds->ds_lock);
    577 
    578 	dmu_buf_rele(ds->ds_dbuf, tag);
    579 }
    580 
    581 void
    582 dsl_dataset_downgrade(dsl_dataset_t *ds, int oldmode, int newmode)
    583 {
    584 	uint64_t oldweight = ds_refcnt_weight[DS_MODE_LEVEL(oldmode)];
    585 	uint64_t newweight = ds_refcnt_weight[DS_MODE_LEVEL(newmode)];
    586 	mutex_enter(&ds->ds_lock);
    587 	ASSERT3U(ds->ds_open_refcount, >=, oldweight);
    588 	ASSERT3U(oldweight, >=, newweight);
    589 	ds->ds_open_refcount -= oldweight;
    590 	ds->ds_open_refcount += newweight;
    591 	mutex_exit(&ds->ds_lock);
    592 }
    593 
    594 boolean_t
    595 dsl_dataset_tryupgrade(dsl_dataset_t *ds, int oldmode, int newmode)
    596 {
    597 	boolean_t rv;
    598 	uint64_t oldweight = ds_refcnt_weight[DS_MODE_LEVEL(oldmode)];
    599 	uint64_t newweight = ds_refcnt_weight[DS_MODE_LEVEL(newmode)];
    600 	mutex_enter(&ds->ds_lock);
    601 	ASSERT3U(ds->ds_open_refcount, >=, oldweight);
    602 	ASSERT3U(newweight, >=, oldweight);
    603 	if (ds->ds_open_refcount - oldweight + newweight > DS_REF_MAX) {
    604 		rv = B_FALSE;
    605 	} else {
    606 		ds->ds_open_refcount -= oldweight;
    607 		ds->ds_open_refcount += newweight;
    608 		rv = B_TRUE;
    609 	}
    610 	mutex_exit(&ds->ds_lock);
    611 	return (rv);
    612 }
    613 
    614 void
    615 dsl_dataset_create_root(dsl_pool_t *dp, uint64_t *ddobjp, dmu_tx_t *tx)
    616 {
    617 	objset_t *mos = dp->dp_meta_objset;
    618 	dmu_buf_t *dbuf;
    619 	dsl_dataset_phys_t *dsphys;
    620 	dsl_dataset_t *ds;
    621 	uint64_t dsobj;
    622 	dsl_dir_t *dd;
    623 
    624 	dsl_dir_create_root(mos, ddobjp, tx);
    625 	VERIFY(0 == dsl_dir_open_obj(dp, *ddobjp, NULL, FTAG, &dd));
    626 
    627 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
    628 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
    629 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
    630 	dmu_buf_will_dirty(dbuf, tx);
    631 	dsphys = dbuf->db_data;
    632 	dsphys->ds_dir_obj = dd->dd_object;
    633 	dsphys->ds_fsid_guid = unique_create();
    634 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
    635 	    sizeof (dsphys->ds_guid));
    636 	dsphys->ds_snapnames_zapobj =
    637 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
    638 	dsphys->ds_creation_time = gethrestime_sec();
    639 	dsphys->ds_creation_txg = tx->tx_txg;
    640 	dsphys->ds_deadlist_obj =
    641 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
    642 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
    643 		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
    644 	dmu_buf_rele(dbuf, FTAG);
    645 
    646 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
    647 	dd->dd_phys->dd_head_dataset_obj = dsobj;
    648 	dsl_dir_close(dd, FTAG);
    649 
    650 	VERIFY(0 ==
    651 	    dsl_dataset_open_obj(dp, dsobj, NULL, DS_MODE_NONE, FTAG, &ds));
    652 	(void) dmu_objset_create_impl(dp->dp_spa, ds,
    653 	    &ds->ds_phys->ds_bp, DMU_OST_ZFS, tx);
    654 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
    655 }
    656 
    657 uint64_t
    658 dsl_dataset_create_sync_impl(dsl_dir_t *dd, dsl_dataset_t *origin, dmu_tx_t *tx)
    659 {
    660 	dsl_pool_t *dp = dd->dd_pool;
    661 	dmu_buf_t *dbuf;
    662 	dsl_dataset_phys_t *dsphys;
    663 	uint64_t dsobj;
    664 	objset_t *mos = dp->dp_meta_objset;
    665 
    666 	ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
    667 	ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
    668 	ASSERT(dmu_tx_is_syncing(tx));
    669 	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
    670 
    671 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
    672 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
    673 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
    674 	dmu_buf_will_dirty(dbuf, tx);
    675 	dsphys = dbuf->db_data;
    676 	dsphys->ds_dir_obj = dd->dd_object;
    677 	dsphys->ds_fsid_guid = unique_create();
    678 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
    679 	    sizeof (dsphys->ds_guid));
    680 	dsphys->ds_snapnames_zapobj =
    681 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
    682 	dsphys->ds_creation_time = gethrestime_sec();
    683 	dsphys->ds_creation_txg = tx->tx_txg;
    684 	dsphys->ds_deadlist_obj =
    685 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
    686 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
    687 		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
    688 
    689 	if (origin) {
    690 		dsphys->ds_prev_snap_obj = origin->ds_object;
    691 		dsphys->ds_prev_snap_txg =
    692 		    origin->ds_phys->ds_creation_txg;
    693 		dsphys->ds_used_bytes =
    694 		    origin->ds_phys->ds_used_bytes;
    695 		dsphys->ds_compressed_bytes =
    696 		    origin->ds_phys->ds_compressed_bytes;
    697 		dsphys->ds_uncompressed_bytes =
    698 		    origin->ds_phys->ds_uncompressed_bytes;
    699 		dsphys->ds_bp = origin->ds_phys->ds_bp;
    700 
    701 		dmu_buf_will_dirty(origin->ds_dbuf, tx);
    702 		origin->ds_phys->ds_num_children++;
    703 
    704 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
    705 		dd->dd_phys->dd_origin_obj = origin->ds_object;
    706 	}
    707 	dmu_buf_rele(dbuf, FTAG);
    708 
    709 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
    710 	dd->dd_phys->dd_head_dataset_obj = dsobj;
    711 
    712 	return (dsobj);
    713 }
    714 
    715 uint64_t
    716 dsl_dataset_create_sync(dsl_dir_t *pdd,
    717     const char *lastname, dsl_dataset_t *origin, cred_t *cr, dmu_tx_t *tx)
    718 {
    719 	dsl_pool_t *dp = pdd->dd_pool;
    720 	uint64_t dsobj, ddobj;
    721 	dsl_dir_t *dd;
    722 
    723 	ASSERT(lastname[0] != '@');
    724 
    725 	ddobj = dsl_dir_create_sync(pdd, lastname, tx);
    726 	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
    727 
    728 	dsobj = dsl_dataset_create_sync_impl(dd, origin, tx);
    729 
    730 	dsl_deleg_set_create_perms(dd, tx, cr);
    731 
    732 	dsl_dir_close(dd, FTAG);
    733 
    734 	return (dsobj);
    735 }
    736 
    737 struct destroyarg {
    738 	dsl_sync_task_group_t *dstg;
    739 	char *snapname;
    740 	char *failed;
    741 };
    742 
    743 static int
    744 dsl_snapshot_destroy_one(char *name, void *arg)
    745 {
    746 	struct destroyarg *da = arg;
    747 	dsl_dataset_t *ds;
    748 	char *cp;
    749 	int err;
    750 
    751 	(void) strcat(name, "@");
    752 	(void) strcat(name, da->snapname);
    753 	err = dsl_dataset_open(name,
    754 	    DS_MODE_EXCLUSIVE | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
    755 	    da->dstg, &ds);
    756 	cp = strchr(name, '@');
    757 	*cp = '\0';
    758 	if (err == ENOENT)
    759 		return (0);
    760 	if (err) {
    761 		(void) strcpy(da->failed, name);
    762 		return (err);
    763 	}
    764 
    765 	dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
    766 	    dsl_dataset_destroy_sync, ds, da->dstg, 0);
    767 	return (0);
    768 }
    769 
    770 /*
    771  * Destroy 'snapname' in all descendants of 'fsname'.
    772  */
    773 #pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
    774 int
    775 dsl_snapshots_destroy(char *fsname, char *snapname)
    776 {
    777 	int err;
    778 	struct destroyarg da;
    779 	dsl_sync_task_t *dst;
    780 	spa_t *spa;
    781 
    782 	err = spa_open(fsname, &spa, FTAG);
    783 	if (err)
    784 		return (err);
    785 	da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
    786 	da.snapname = snapname;
    787 	da.failed = fsname;
    788 
    789 	err = dmu_objset_find(fsname,
    790 	    dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
    791 
    792 	if (err == 0)
    793 		err = dsl_sync_task_group_wait(da.dstg);
    794 
    795 	for (dst = list_head(&da.dstg->dstg_tasks); dst;
    796 	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
    797 		dsl_dataset_t *ds = dst->dst_arg1;
    798 		if (dst->dst_err) {
    799 			dsl_dataset_name(ds, fsname);
    800 			*strchr(fsname, '@') = '\0';
    801 		}
    802 		/*
    803 		 * If it was successful, destroy_sync would have
    804 		 * closed the ds
    805 		 */
    806 		if (err)
    807 			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, da.dstg);
    808 	}
    809 
    810 	dsl_sync_task_group_destroy(da.dstg);
    811 	spa_close(spa, FTAG);
    812 	return (err);
    813 }
    814 
    815 /*
    816  * ds must be opened EXCLUSIVE or PRIMARY.  on return (whether
    817  * successful or not), ds will be closed and caller can no longer
    818  * dereference it.
    819  */
    820 int
    821 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag)
    822 {
    823 	int err;
    824 	dsl_sync_task_group_t *dstg;
    825 	objset_t *os;
    826 	dsl_dir_t *dd;
    827 	uint64_t obj;
    828 
    829 	if (ds->ds_open_refcount != DS_REF_MAX) {
    830 		if (dsl_dataset_tryupgrade(ds, DS_MODE_PRIMARY,
    831 		    DS_MODE_EXCLUSIVE) == 0) {
    832 			dsl_dataset_close(ds, DS_MODE_PRIMARY, tag);
    833 			return (EBUSY);
    834 		}
    835 	}
    836 
    837 	if (dsl_dataset_is_snapshot(ds)) {
    838 		/* Destroying a snapshot is simpler */
    839 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
    840 		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
    841 		    ds, tag, 0);
    842 		goto out;
    843 	}
    844 
    845 	dd = ds->ds_dir;
    846 
    847 	/*
    848 	 * Check for errors and mark this ds as inconsistent, in
    849 	 * case we crash while freeing the objects.
    850 	 */
    851 	err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
    852 	    dsl_dataset_destroy_begin_sync, ds, NULL, 0);
    853 	if (err)
    854 		goto out;
    855 
    856 	err = dmu_objset_open_ds(ds, DMU_OST_ANY, &os);
    857 	if (err)
    858 		goto out;
    859 
    860 	/*
    861 	 * remove the objects in open context, so that we won't
    862 	 * have too much to do in syncing context.
    863 	 */
    864 	for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
    865 	    ds->ds_phys->ds_prev_snap_txg)) {
    866 		dmu_tx_t *tx = dmu_tx_create(os);
    867 		dmu_tx_hold_free(tx, obj, 0, DMU_OBJECT_END);
    868 		dmu_tx_hold_bonus(tx, obj);
    869 		err = dmu_tx_assign(tx, TXG_WAIT);
    870 		if (err) {
    871 			/*
    872 			 * Perhaps there is not enough disk
    873 			 * space.  Just deal with it from
    874 			 * dsl_dataset_destroy_sync().
    875 			 */
    876 			dmu_tx_abort(tx);
    877 			continue;
    878 		}
    879 		VERIFY(0 == dmu_object_free(os, obj, tx));
    880 		dmu_tx_commit(tx);
    881 	}
    882 	/* Make sure it's not dirty before we finish destroying it. */
    883 	txg_wait_synced(dd->dd_pool, 0);
    884 
    885 	dmu_objset_close(os);
    886 	if (err != ESRCH)
    887 		goto out;
    888 
    889 	if (ds->ds_user_ptr) {
    890 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
    891 		ds->ds_user_ptr = NULL;
    892 	}
    893 
    894 	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
    895 	err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
    896 	rw_exit(&dd->dd_pool->dp_config_rwlock);
    897 
    898 	if (err)
    899 		goto out;
    900 
    901 	/*
    902 	 * Blow away the dsl_dir + head dataset.
    903 	 */
    904 	dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
    905 	dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
    906 	    dsl_dataset_destroy_sync, ds, tag, 0);
    907 	dsl_sync_task_create(dstg, dsl_dir_destroy_check,
    908 	    dsl_dir_destroy_sync, dd, FTAG, 0);
    909 	err = dsl_sync_task_group_wait(dstg);
    910 	dsl_sync_task_grou