Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include <sys/dmu_objset.h>
     27 #include <sys/dsl_dataset.h>
     28 #include <sys/dsl_dir.h>
     29 #include <sys/dsl_prop.h>
     30 #include <sys/dsl_synctask.h>
     31 #include <sys/dmu_traverse.h>
     32 #include <sys/dmu_tx.h>
     33 #include <sys/arc.h>
     34 #include <sys/zio.h>
     35 #include <sys/zap.h>
     36 #include <sys/unique.h>
     37 #include <sys/zfs_context.h>
     38 #include <sys/zfs_ioctl.h>
     39 #include <sys/spa.h>
     40 #include <sys/zfs_znode.h>
     41 #include <sys/sunddi.h>
     42 
     43 static char *dsl_reaper = "the grim reaper";
     44 
     45 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
     46 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
     47 static dsl_checkfunc_t dsl_dataset_rollback_check;
     48 static dsl_syncfunc_t dsl_dataset_rollback_sync;
     49 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
     50 
     51 #define	DS_REF_MAX	(1ULL << 62)
     52 
     53 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
     54 
     55 #define	DSL_DATASET_IS_DESTROYED(ds)	((ds)->ds_owner == dsl_reaper)
     56 
     57 
     58 /*
     59  * Figure out how much of this delta should be propogated to the dsl_dir
     60  * layer.  If there's a refreservation, that space has already been
     61  * partially accounted for in our ancestors.
     62  */
     63 static int64_t
     64 parent_delta(dsl_dataset_t *ds, int64_t delta)
     65 {
     66 	uint64_t old_bytes, new_bytes;
     67 
     68 	if (ds->ds_reserved == 0)
     69 		return (delta);
     70 
     71 	old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
     72 	new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
     73 
     74 	ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
     75 	return (new_bytes - old_bytes);
     76 }
     77 
     78 void
     79 dsl_dataset_block_born(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
     80 {
     81 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
     82 	int compressed = BP_GET_PSIZE(bp);
     83 	int uncompressed = BP_GET_UCSIZE(bp);
     84 	int64_t delta;
     85 
     86 	dprintf_bp(bp, "born, ds=%p\n", ds);
     87 
     88 	ASSERT(dmu_tx_is_syncing(tx));
     89 	/* It could have been compressed away to nothing */
     90 	if (BP_IS_HOLE(bp))
     91 		return;
     92 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
     93 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
     94 	if (ds == NULL) {
     95 		/*
     96 		 * Account for the meta-objset space in its placeholder
     97 		 * dsl_dir.
     98 		 */
     99 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
    100 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
    101 		    used, compressed, uncompressed, tx);
    102 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
    103 		return;
    104 	}
    105 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
    106 	mutex_enter(&ds->ds_lock);
    107 	delta = parent_delta(ds, used);
    108 	ds->ds_phys->ds_used_bytes += used;
    109 	ds->ds_phys->ds_compressed_bytes += compressed;
    110 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
    111 	ds->ds_phys->ds_unique_bytes += used;
    112 	mutex_exit(&ds->ds_lock);
    113 	dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
    114 	    compressed, uncompressed, tx);
    115 	dsl_dir_transfer_space(ds->ds_dir, used - delta,
    116 	    DD_USED_REFRSRV, DD_USED_HEAD, tx);
    117 }
    118 
    119 int
    120 dsl_dataset_block_kill(dsl_dataset_t *ds, blkptr_t *bp, zio_t *pio,
    121     dmu_tx_t *tx)
    122 {
    123 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
    124 	int compressed = BP_GET_PSIZE(bp);
    125 	int uncompressed = BP_GET_UCSIZE(bp);
    126 
    127 	ASSERT(dmu_tx_is_syncing(tx));
    128 	/* No block pointer => nothing to free */
    129 	if (BP_IS_HOLE(bp))
    130 		return (0);
    131 
    132 	ASSERT(used > 0);
    133 	if (ds == NULL) {
    134 		int err;
    135 		/*
    136 		 * Account for the meta-objset space in its placeholder
    137 		 * dataset.
    138 		 */
    139 		err = dsl_free(pio, tx->tx_pool,
    140 		    tx->tx_txg, bp, NULL, NULL, pio ? ARC_NOWAIT: ARC_WAIT);
    141 		ASSERT(err == 0);
    142 
    143 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
    144 		    -used, -compressed, -uncompressed, tx);
    145 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
    146 		return (used);
    147 	}
    148 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
    149 
    150 	ASSERT(!dsl_dataset_is_snapshot(ds));
    151 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
    152 
    153 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
    154 		int err;
    155 		int64_t delta;
    156 
    157 		dprintf_bp(bp, "freeing: %s", "");
    158 		err = dsl_free(pio, tx->tx_pool,
    159 		    tx->tx_txg, bp, NULL, NULL, pio ? ARC_NOWAIT : ARC_WAIT);
    160 		ASSERT(err == 0);
    161 
    162 		mutex_enter(&ds->ds_lock);
    163 		ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
    164 		    !DS_UNIQUE_IS_ACCURATE(ds));
    165 		delta = parent_delta(ds, -used);
    166 		ds->ds_phys->ds_unique_bytes -= used;
    167 		mutex_exit(&ds->ds_lock);
    168 		dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
    169 		    delta, -compressed, -uncompressed, tx);
    170 		dsl_dir_transfer_space(ds->ds_dir, -used - delta,
    171 		    DD_USED_REFRSRV, DD_USED_HEAD, tx);
    172 	} else {
    173 		dprintf_bp(bp, "putting on dead list: %s", "");
    174 		VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
    175 		ASSERT3U(ds->ds_prev->ds_object, ==,
    176 		    ds->ds_phys->ds_prev_snap_obj);
    177 		ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
    178 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
    179 		if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
    180 		    ds->ds_object && bp->blk_birth >
    181 		    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
    182 			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
    183 			mutex_enter(&ds->ds_prev->ds_lock);
    184 			ds->ds_prev->ds_phys->ds_unique_bytes += used;
    185 			mutex_exit(&ds->ds_prev->ds_lock);
    186 		}
    187 		if (bp->blk_birth > ds->ds_origin_txg) {
    188 			dsl_dir_transfer_space(ds->ds_dir, used,
    189 			    DD_USED_HEAD, DD_USED_SNAP, tx);
    190 		}
    191 	}
    192 	mutex_enter(&ds->ds_lock);
    193 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
    194 	ds->ds_phys->ds_used_bytes -= used;
    195 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
    196 	ds->ds_phys->ds_compressed_bytes -= compressed;
    197 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
    198 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
    199 	mutex_exit(&ds->ds_lock);
    200 
    201 	return (used);
    202 }
    203 
    204 uint64_t
    205 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
    206 {
    207 	uint64_t trysnap = 0;
    208 
    209 	if (ds == NULL)
    210 		return (0);
    211 	/*
    212 	 * The snapshot creation could fail, but that would cause an
    213 	 * incorrect FALSE return, which would only result in an
    214 	 * overestimation of the amount of space that an operation would
    215 	 * consume, which is OK.
    216 	 *
    217 	 * There's also a small window where we could miss a pending
    218 	 * snapshot, because we could set the sync task in the quiescing
    219 	 * phase.  So this should only be used as a guess.
    220 	 */
    221 	if (ds->ds_trysnap_txg >
    222 	    spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
    223 		trysnap = ds->ds_trysnap_txg;
    224 	return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
    225 }
    226 
    227 int
    228 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
    229 {
    230 	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
    231 }
    232 
    233 /* ARGSUSED */
    234 static void
    235 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
    236 {
    237 	dsl_dataset_t *ds = dsv;
    238 
    239 	ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
    240 
    241 	dprintf_ds(ds, "evicting %s\n", "");
    242 
    243 	unique_remove(ds->ds_fsid_guid);
    244 
    245 	if (ds->ds_user_ptr != NULL)
    246 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
    247 
    248 	if (ds->ds_prev) {
    249 		dsl_dataset_drop_ref(ds->ds_prev, ds);
    250 		ds->ds_prev = NULL;
    251 	}
    252 
    253 	bplist_close(&ds->ds_deadlist);
    254 	if (ds->ds_dir)
    255 		dsl_dir_close(ds->ds_dir, ds);
    256 
    257 	ASSERT(!list_link_active(&ds->ds_synced_link));
    258 
    259 	mutex_destroy(&ds->ds_lock);
    260 	mutex_destroy(&ds->ds_opening_lock);
    261 	mutex_destroy(&ds->ds_deadlist.bpl_lock);
    262 	rw_destroy(&ds->ds_rwlock);
    263 	cv_destroy(&ds->ds_exclusive_cv);
    264 
    265 	kmem_free(ds, sizeof (dsl_dataset_t));
    266 }
    267 
    268 static int
    269 dsl_dataset_get_snapname(dsl_dataset_t *ds)
    270 {
    271 	dsl_dataset_phys_t *headphys;
    272 	int err;
    273 	dmu_buf_t *headdbuf;
    274 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
    275 	objset_t *mos = dp->dp_meta_objset;
    276 
    277 	if (ds->ds_snapname[0])
    278 		return (0);
    279 	if (ds->ds_phys->ds_next_snap_obj == 0)
    280 		return (0);
    281 
    282 	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
    283 	    FTAG, &headdbuf);
    284 	if (err)
    285 		return (err);
    286 	headphys = headdbuf->db_data;
    287 	err = zap_value_search(dp->dp_meta_objset,
    288 	    headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
    289 	dmu_buf_rele(headdbuf, FTAG);
    290 	return (err);
    291 }
    292 
    293 static int
    294 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
    295 {
    296 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
    297 	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
    298 	matchtype_t mt;
    299 	int err;
    300 
    301 	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
    302 		mt = MT_FIRST;
    303 	else
    304 		mt = MT_EXACT;
    305 
    306 	err = zap_lookup_norm(mos, snapobj, name, 8, 1,
    307 	    value, mt, NULL, 0, NULL);
    308 	if (err == ENOTSUP && mt == MT_FIRST)
    309 		err = zap_lookup(mos, snapobj, name, 8, 1, value);
    310 	return (err);
    311 }
    312 
    313 static int
    314 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
    315 {
    316 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
    317 	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
    318 	matchtype_t mt;
    319 	int err;
    320 
    321 	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
    322 		mt = MT_FIRST;
    323 	else
    324 		mt = MT_EXACT;
    325 
    326 	err = zap_remove_norm(mos, snapobj, name, mt, tx);
    327 	if (err == ENOTSUP && mt == MT_FIRST)
    328 		err = zap_remove(mos, snapobj, name, tx);
    329 	return (err);
    330 }
    331 
    332 static int
    333 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
    334     dsl_dataset_t **dsp)
    335 {
    336 	objset_t *mos = dp->dp_meta_objset;
    337 	dmu_buf_t *dbuf;
    338 	dsl_dataset_t *ds;
    339 	int err;
    340 
    341 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
    342 	    dsl_pool_sync_context(dp));
    343 
    344 	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
    345 	if (err)
    346 		return (err);
    347 	ds = dmu_buf_get_user(dbuf);
    348 	if (ds == NULL) {
    349 		dsl_dataset_t *winner;
    350 
    351 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
    352 		ds->ds_dbuf = dbuf;
    353 		ds->ds_object = dsobj;
    354 		ds->ds_phys = dbuf->db_data;
    355 
    356 		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
    357 		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
    358 		mutex_init(&ds->ds_deadlist.bpl_lock, NULL, MUTEX_DEFAULT,
    359 		    NULL);
    360 		rw_init(&ds->ds_rwlock, 0, 0, 0);
    361 		cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
    362 
    363 		err = bplist_open(&ds->ds_deadlist,
    364 		    mos, ds->ds_phys->ds_deadlist_obj);
    365 		if (err == 0) {
    366 			err = dsl_dir_open_obj(dp,
    367 			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
    368 		}
    369 		if (err) {
    370 			/*
    371 			 * we don't really need to close the blist if we
    372 			 * just opened it.
    373 			 */
    374 			mutex_destroy(&ds->ds_lock);
    375 			mutex_destroy(&ds->ds_opening_lock);
    376 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
    377 			rw_destroy(&ds->ds_rwlock);
    378 			cv_destroy(&ds->ds_exclusive_cv);
    379 			kmem_free(ds, sizeof (dsl_dataset_t));
    380 			dmu_buf_rele(dbuf, tag);
    381 			return (err);
    382 		}
    383 
    384 		if (!dsl_dataset_is_snapshot(ds)) {
    385 			ds->ds_snapname[0] = '\0';
    386 			if (ds->ds_phys->ds_prev_snap_obj) {
    387 				err = dsl_dataset_get_ref(dp,
    388 				    ds->ds_phys->ds_prev_snap_obj,
    389 				    ds, &ds->ds_prev);
    390 			}
    391 
    392 			if (err == 0 && dsl_dir_is_clone(ds->ds_dir)) {
    393 				dsl_dataset_t *origin;
    394 
    395 				err = dsl_dataset_hold_obj(dp,
    396 				    ds->ds_dir->dd_phys->dd_origin_obj,
    397 				    FTAG, &origin);
    398 				if (err == 0) {
    399 					ds->ds_origin_txg =
    400 					    origin->ds_phys->ds_creation_txg;
    401 					dsl_dataset_rele(origin, FTAG);
    402 				}
    403 			}
    404 		} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
    405 			err = dsl_dataset_get_snapname(ds);
    406 		}
    407 
    408 		if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
    409 			/*
    410 			 * In sync context, we're called with either no lock
    411 			 * or with the write lock.  If we're not syncing,
    412 			 * we're always called with the read lock held.
    413 			 */
    414 			boolean_t need_lock =
    415 			    !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
    416 			    dsl_pool_sync_context(dp);
    417 
    418 			if (need_lock)
    419 				rw_enter(&dp->dp_config_rwlock, RW_READER);
    420 
    421 			err = dsl_prop_get_ds(ds,
    422 			    "refreservation", sizeof (uint64_t), 1,
    423 			    &ds->ds_reserved, NULL);
    424 			if (err == 0) {
    425 				err = dsl_prop_get_ds(ds,
    426 				    "refquota", sizeof (uint64_t), 1,
    427 				    &ds->ds_quota, NULL);
    428 			}
    429 
    430 			if (need_lock)
    431 				rw_exit(&dp->dp_config_rwlock);
    432 		} else {
    433 			ds->ds_reserved = ds->ds_quota = 0;
    434 		}
    435 
    436 		if (err == 0) {
    437 			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
    438 			    dsl_dataset_evict);
    439 		}
    440 		if (err || winner) {
    441 			bplist_close(&ds->ds_deadlist);
    442 			if (ds->ds_prev)
    443 				dsl_dataset_drop_ref(ds->ds_prev, ds);
    444 			dsl_dir_close(ds->ds_dir, ds);
    445 			mutex_destroy(&ds->ds_lock);
    446 			mutex_destroy(&ds->ds_opening_lock);
    447 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
    448 			rw_destroy(&ds->ds_rwlock);
    449 			cv_destroy(&ds->ds_exclusive_cv);
    450 			kmem_free(ds, sizeof (dsl_dataset_t));
    451 			if (err) {
    452 				dmu_buf_rele(dbuf, tag);
    453 				return (err);
    454 			}
    455 			ds = winner;
    456 		} else {
    457 			ds->ds_fsid_guid =
    458 			    unique_insert(ds->ds_phys->ds_fsid_guid);
    459 		}
    460 	}
    461 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
    462 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
    463 	ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
    464 	    spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
    465 	    dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
    466 	mutex_enter(&ds->ds_lock);
    467 	if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
    468 		mutex_exit(&ds->ds_lock);
    469 		dmu_buf_rele(ds->ds_dbuf, tag);
    470 		return (ENOENT);
    471 	}
    472 	mutex_exit(&ds->ds_lock);
    473 	*dsp = ds;
    474 	return (0);
    475 }
    476 
    477 static int
    478 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
    479 {
    480 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
    481 
    482 	/*
    483 	 * In syncing context we don't want the rwlock lock: there
    484 	 * may be an existing writer waiting for sync phase to
    485 	 * finish.  We don't need to worry about such writers, since
    486 	 * sync phase is single-threaded, so the writer can't be
    487 	 * doing anything while we are active.
    488 	 */
    489 	if (dsl_pool_sync_context(dp)) {
    490 		ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
    491 		return (0);
    492 	}
    493 
    494 	/*
    495 	 * Normal users will hold the ds_rwlock as a READER until they
    496 	 * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
    497 	 * drop their READER lock after they set the ds_owner field.
    498 	 *
    499 	 * If the dataset is being destroyed, the destroy thread will
    500 	 * obtain a WRITER lock for exclusive access after it's done its
    501 	 * open-context work and then change the ds_owner to
    502 	 * dsl_reaper once destruction is assured.  So threads
    503 	 * may block here temporarily, until the "destructability" of
    504 	 * the dataset is determined.
    505 	 */
    506 	ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
    507 	mutex_enter(&ds->ds_lock);
    508 	while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
    509 		rw_exit(&dp->dp_config_rwlock);
    510 		cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
    511 		if (DSL_DATASET_IS_DESTROYED(ds)) {
    512 			mutex_exit(&ds->ds_lock);
    513 			dsl_dataset_drop_ref(ds, tag);
    514 			rw_enter(&dp->dp_config_rwlock, RW_READER);
    515 			return (ENOENT);
    516 		}
    517 		rw_enter(&dp->dp_config_rwlock, RW_READER);
    518 	}
    519 	mutex_exit(&ds->ds_lock);
    520 	return (0);
    521 }
    522 
    523 int
    524 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
    525     dsl_dataset_t **dsp)
    526 {
    527 	int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
    528 
    529 	if (err)
    530 		return (err);
    531 	return (dsl_dataset_hold_ref(*dsp, tag));
    532 }
    533 
    534 int
    535 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, int flags, void *owner,
    536     dsl_dataset_t **dsp)
    537 {
    538 	int err = dsl_dataset_hold_obj(dp, dsobj, owner, dsp);
    539 
    540 	ASSERT(DS_MODE_TYPE(flags) != DS_MODE_USER);
    541 
    542 	if (err)
    543 		return (err);
    544 	if (!dsl_dataset_tryown(*dsp, DS_MODE_IS_INCONSISTENT(flags), owner)) {
    545 		dsl_dataset_rele(*dsp, owner);
    546 		return (EBUSY);
    547 	}
    548 	return (0);
    549 }
    550 
    551 int
    552 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
    553 {
    554 	dsl_dir_t *dd;
    555 	dsl_pool_t *dp;
    556 	const char *snapname;
    557 	uint64_t obj;
    558 	int err = 0;
    559 
    560 	err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
    561 	if (err)
    562 		return (err);
    563 
    564 	dp = dd->dd_pool;
    565 	obj = dd->dd_phys->dd_head_dataset_obj;
    566 	rw_enter(&dp->dp_config_rwlock, RW_READER);
    567 	if (obj)
    568 		err = dsl_dataset_get_ref(dp, obj, tag, dsp);
    569 	else
    570 		err = ENOENT;
    571 	if (err)
    572 		goto out;
    573 
    574 	err = dsl_dataset_hold_ref(*dsp, tag);
    575 
    576 	/* we may be looking for a snapshot */
    577 	if (err == 0 && snapname != NULL) {
    578 		dsl_dataset_t *ds = NULL;
    579 
    580 		if (*snapname++ != '@') {
    581 			dsl_dataset_rele(*dsp, tag);
    582 			err = ENOENT;
    583 			goto out;
    584 		}
    585 
    586 		dprintf("looking for snapshot '%s'\n", snapname);
    587 		err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
    588 		if (err == 0)
    589 			err = dsl_dataset_get_ref(dp, obj, tag, &ds);
    590 		dsl_dataset_rele(*dsp, tag);
    591 
    592 		ASSERT3U((err == 0), ==, (ds != NULL));
    593 
    594 		if (ds) {
    595 			mutex_enter(&ds->ds_lock);
    596 			if (ds->ds_snapname[0] == 0)
    597 				(void) strlcpy(ds->ds_snapname, snapname,
    598 				    sizeof (ds->ds_snapname));
    599 			mutex_exit(&ds->ds_lock);
    600 			err = dsl_dataset_hold_ref(ds, tag);
    601 			*dsp = err ? NULL : ds;
    602 		}
    603 	}
    604 out:
    605 	rw_exit(&dp->dp_config_rwlock);
    606 	dsl_dir_close(dd, FTAG);
    607 	return (err);
    608 }
    609 
    610 int
    611 dsl_dataset_own(const char *name, int flags, void *owner, dsl_dataset_t **dsp)
    612 {
    613 	int err = dsl_dataset_hold(name, owner, dsp);
    614 	if (err)
    615 		return (err);
    616 	if ((*dsp)->ds_phys->ds_num_children > 0 &&
    617 	    !DS_MODE_IS_READONLY(flags)) {
    618 		dsl_dataset_rele(*dsp, owner);
    619 		return (EROFS);
    620 	}
    621 	if (!dsl_dataset_tryown(*dsp, DS_MODE_IS_INCONSISTENT(flags), owner)) {
    622 		dsl_dataset_rele(*dsp, owner);
    623 		return (EBUSY);
    624 	}
    625 	return (0);
    626 }
    627 
    628 void
    629 dsl_dataset_name(dsl_dataset_t *ds, char *name)
    630 {
    631 	if (ds == NULL) {
    632 		(void) strcpy(name, "mos");
    633 	} else {
    634 		dsl_dir_name(ds->ds_dir, name);
    635 		VERIFY(0 == dsl_dataset_get_snapname(ds));
    636 		if (ds->ds_snapname[0]) {
    637 			(void) strcat(name, "@");
    638 			/*
    639 			 * We use a "recursive" mutex so that we
    640 			 * can call dprintf_ds() with ds_lock held.
    641 			 */
    642 			if (!MUTEX_HELD(&ds->ds_lock)) {
    643 				mutex_enter(&ds->ds_lock);
    644 				(void) strcat(name, ds->ds_snapname);
    645 				mutex_exit(&ds->ds_lock);
    646 			} else {
    647 				(void) strcat(name, ds->ds_snapname);
    648 			}
    649 		}
    650 	}
    651 }
    652 
    653 static int
    654 dsl_dataset_namelen(dsl_dataset_t *ds)
    655 {
    656 	int result;
    657 
    658 	if (ds == NULL) {
    659 		result = 3;	/* "mos" */
    660 	} else {
    661 		result = dsl_dir_namelen(ds->ds_dir);
    662 		VERIFY(0 == dsl_dataset_get_snapname(ds));
    663 		if (ds->ds_snapname[0]) {
    664 			++result;	/* adding one for the @-sign */
    665 			if (!MUTEX_HELD(&ds->ds_lock)) {
    666 				mutex_enter(&ds->ds_lock);
    667 				result += strlen(ds->ds_snapname);
    668 				mutex_exit(&ds->ds_lock);
    669 			} else {
    670 				result += strlen(ds->ds_snapname);
    671 			}
    672 		}
    673 	}
    674 
    675 	return (result);
    676 }
    677 
    678 void
    679 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
    680 {
    681 	dmu_buf_rele(ds->ds_dbuf, tag);
    682 }
    683 
    684 void
    685 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
    686 {
    687 	if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
    688 		rw_exit(&ds->ds_rwlock);
    689 	}
    690 	dsl_dataset_drop_ref(ds, tag);
    691 }
    692 
    693 void
    694 dsl_dataset_disown(dsl_dataset_t *ds, void *owner)
    695 {
    696 	ASSERT((ds->ds_owner == owner && ds->ds_dbuf) ||
    697 	    (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
    698 
    699 	mutex_enter(&ds->ds_lock);
    700 	ds->ds_owner = NULL;
    701 	if (RW_WRITE_HELD(&ds->ds_rwlock)) {
    702 		rw_exit(&ds->ds_rwlock);
    703 		cv_broadcast(&ds->ds_exclusive_cv);
    704 	}
    705 	mutex_exit(&ds->ds_lock);
    706 	if (ds->ds_dbuf)
    707 		dsl_dataset_drop_ref(ds, owner);
    708 	else
    709 		dsl_dataset_evict(ds->ds_dbuf, ds);
    710 }
    711 
    712 boolean_t
    713 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *owner)
    714 {
    715 	boolean_t gotit = FALSE;
    716 
    717 	mutex_enter(&ds->ds_lock);
    718 	if (ds->ds_owner == NULL &&
    719 	    (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
    720 		ds->ds_owner = owner;
    721 		if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
    722 			rw_exit(&ds->ds_rwlock);
    723 		gotit = TRUE;
    724 	}
    725 	mutex_exit(&ds->ds_lock);
    726 	return (gotit);
    727 }
    728 
    729 void
    730 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
    731 {
    732 	ASSERT3P(owner, ==, ds->ds_owner);
    733 	if (!RW_WRITE_HELD(&ds->ds_rwlock))
    734 		rw_enter(&ds->ds_rwlock, RW_WRITER);
    735 }
    736 
    737 uint64_t
    738 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
    739     uint64_t flags, dmu_tx_t *tx)
    740 {
    741 	dsl_pool_t *dp = dd->dd_pool;
    742 	dmu_buf_t *dbuf;
    743 	dsl_dataset_phys_t *dsphys;
    744 	uint64_t dsobj;
    745 	objset_t *mos = dp->dp_meta_objset;
    746 
    747 	if (origin == NULL)
    748 		origin = dp->dp_origin_snap;
    749 
    750 	ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
    751 	ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
    752 	ASSERT(dmu_tx_is_syncing(tx));
    753 	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
    754 
    755 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
    756 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
    757 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
    758 	dmu_buf_will_dirty(dbuf, tx);
    759 	dsphys = dbuf->db_data;
    760 	bzero(dsphys, sizeof (dsl_dataset_phys_t));
    761 	dsphys->ds_dir_obj = dd->dd_object;
    762 	dsphys->ds_flags = flags;
    763 	dsphys->ds_fsid_guid = unique_create();
    764 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
    765 	    sizeof (dsphys->ds_guid));
    766 	dsphys->ds_snapnames_zapobj =
    767 	    zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
    768 	    DMU_OT_NONE, 0, tx);
    769 	dsphys->ds_creation_time = gethrestime_sec();
    770 	dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
    771 	dsphys->ds_deadlist_obj =
    772 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
    773 
    774 	if (origin) {
    775 		dsphys->ds_prev_snap_obj = origin->ds_object;
    776 		dsphys->ds_prev_snap_txg =
    777 		    origin->ds_phys->ds_creation_txg;
    778 		dsphys->ds_used_bytes =
    779 		    origin->ds_phys->ds_used_bytes;
    780 		dsphys->ds_compressed_bytes =
    781 		    origin->ds_phys->ds_compressed_bytes;
    782 		dsphys->ds_uncompressed_bytes =
    783 		    origin->ds_phys->ds_uncompressed_bytes;
    784 		dsphys->ds_bp = origin->ds_phys->ds_bp;
    785 		dsphys->ds_flags |= origin->ds_phys->ds_flags;
    786 
    787 		dmu_buf_will_dirty(origin->ds_dbuf, tx);
    788 		origin->ds_phys->ds_num_children++;
    789 
    790 		if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
    791 			if (origin->ds_phys->ds_next_clones_obj == 0) {
    792 				origin->ds_phys->ds_next_clones_obj =
    793 				    zap_create(mos,
    794 				    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
    795 			}
    796 			VERIFY(0 == zap_add_int(mos,
    797 			    origin->ds_phys->ds_next_clones_obj,
    798 			    dsobj, tx));
    799 		}
    800 
    801 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
    802 		dd->dd_phys->dd_origin_obj = origin->ds_object;
    803 	}
    804 
    805 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
    806 		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
    807 
    808 	dmu_buf_rele(dbuf, FTAG);
    809 
    810 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
    811 	dd->dd_phys->dd_head_dataset_obj = dsobj;
    812 
    813 	return (dsobj);
    814 }
    815 
    816 uint64_t
    817 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
    818     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
    819 {
    820 	dsl_pool_t *dp = pdd->dd_pool;
    821 	uint64_t dsobj, ddobj;
    822 	dsl_dir_t *dd;
    823 
    824 	ASSERT(lastname[0] != '@');
    825 
    826 	ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
    827 	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
    828 
    829 	dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
    830 
    831 	dsl_deleg_set_create_perms(dd, tx, cr);
    832 
    833 	dsl_dir_close(dd, FTAG);
    834 
    835 	return (dsobj);
    836 }
    837 
    838 struct destroyarg {
    839 	dsl_sync_task_group_t *dstg;
    840 	char *snapname;
    841 	char *failed;
    842 };
    843 
    844 static int
    845 dsl_snapshot_destroy_one(char *name, void *arg)
    846 {
    847 	struct destroyarg *da = arg;
    848 	dsl_dataset_t *ds;
    849 	char *cp;
    850 	int err;
    851 
    852 	(void) strcat(name, "@");
    853 	(void) strcat(name, da->snapname);
    854 	err = dsl_dataset_own(name, DS_MODE_READONLY | DS_MODE_INCONSISTENT,
    855 	    da->dstg, &ds);
    856 	cp = strchr(name, '@');
    857 	*cp = '\0';
    858 	if (err == 0) {
    859 		dsl_dataset_make_exclusive(ds, da->dstg);
    860 		if (ds->ds_user_ptr) {
    861 			ds->ds_user_evict_func(ds, ds->ds_user_ptr);
    862 			ds->ds_user_ptr = NULL;
    863 		}
    864 		dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
    865 		    dsl_dataset_destroy_sync, ds, da->dstg, 0);
    866 	} else if (err == ENOENT) {
    867 		err = 0;
    868 	} else {
    869 		(void) strcpy(da->failed, name);
    870 	}
    871 	return (err);
    872 }
    873 
    874 /*
    875  * Destroy 'snapname' in all descendants of 'fsname'.
    876  */
    877 #pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
    878 int
    879 dsl_snapshots_destroy(char *fsname, char *snapname)
    880 {
    881 	int err;
    882 	struct destroyarg da;
    883 	dsl_sync_task_t *dst;
    884 	spa_t *spa;
    885 
    886 	err = spa_open(fsname, &spa, FTAG);
    887 	if (err)
    888 		return (err);
    889 	da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
    890 	da.snapname = snapname;
    891 	da.failed = fsname;
    892 
    893 	err = dmu_objset_find(fsname,
    894 	    dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
    895 
    896 	if (err == 0)
    897 		err = dsl_sync_task_group_wait(da.dstg);
    898 
    899 	for (dst = list_head(&da.dstg->dstg_tasks); dst;
    900 	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
    901 		dsl_dataset_t *ds = dst->