Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #pragma ident	"@(#)txg.c	1.3	08/01/03 SMI"
     27 
     28 #include <sys/zfs_context.h>
     29 #include <sys/txg_impl.h>
     30 #include <sys/dmu_impl.h>
     31 #include <sys/dsl_pool.h>
     32 #include <sys/callb.h>
     33 
     34 /*
     35  * Pool-wide transaction groups.
     36  */
     37 
     38 static void txg_sync_thread(dsl_pool_t *dp);
     39 static void txg_quiesce_thread(dsl_pool_t *dp);
     40 static void txg_timelimit_thread(dsl_pool_t *dp);
     41 
     42 int txg_time = 5;	/* max 5 seconds worth of delta per txg */
     43 
     44 /*
     45  * Prepare the txg subsystem.
     46  */
     47 void
     48 txg_init(dsl_pool_t *dp, uint64_t txg)
     49 {
     50 	tx_state_t *tx = &dp->dp_tx;
     51 	int c;
     52 	bzero(tx, sizeof (tx_state_t));
     53 
     54 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
     55 
     56 	for (c = 0; c < max_ncpus; c++) {
     57 		int i;
     58 
     59 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
     60 		for (i = 0; i < TXG_SIZE; i++) {
     61 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
     62 			    NULL);
     63 		}
     64 	}
     65 
     66 	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
     67 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
     68 
     69 	tx->tx_open_txg = txg;
     70 }
     71 
     72 /*
     73  * Close down the txg subsystem.
     74  */
     75 void
     76 txg_fini(dsl_pool_t *dp)
     77 {
     78 	tx_state_t *tx = &dp->dp_tx;
     79 	int c;
     80 
     81 	ASSERT(tx->tx_threads == 0);
     82 
     83 	rw_destroy(&tx->tx_suspend);
     84 	mutex_destroy(&tx->tx_sync_lock);
     85 
     86 	for (c = 0; c < max_ncpus; c++) {
     87 		int i;
     88 
     89 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
     90 		for (i = 0; i < TXG_SIZE; i++)
     91 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
     92 	}
     93 
     94 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
     95 
     96 	bzero(tx, sizeof (tx_state_t));
     97 }
     98 
     99 /*
    100  * Start syncing transaction groups.
    101  */
    102 void
    103 txg_sync_start(dsl_pool_t *dp)
    104 {
    105 	tx_state_t *tx = &dp->dp_tx;
    106 
    107 	mutex_enter(&tx->tx_sync_lock);
    108 
    109 	dprintf("pool %p\n", dp);
    110 
    111 	ASSERT(tx->tx_threads == 0);
    112 
    113 	tx->tx_threads = 3;
    114 
    115 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
    116 	    dp, 0, &p0, TS_RUN, minclsyspri);
    117 
    118 	tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread,
    119 	    dp, 0, &p0, TS_RUN, minclsyspri);
    120 
    121 	tx->tx_timelimit_thread = thread_create(NULL, 0, txg_timelimit_thread,
    122 	    dp, 0, &p0, TS_RUN, minclsyspri);
    123 
    124 	mutex_exit(&tx->tx_sync_lock);
    125 }
    126 
    127 static void
    128 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
    129 {
    130 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
    131 	mutex_enter(&tx->tx_sync_lock);
    132 }
    133 
    134 static void
    135 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
    136 {
    137 	ASSERT(*tpp != NULL);
    138 	*tpp = NULL;
    139 	tx->tx_threads--;
    140 	cv_broadcast(&tx->tx_exit_cv);
    141 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
    142 	thread_exit();
    143 }
    144 
    145 static void
    146 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, int secmax)
    147 {
    148 	CALLB_CPR_SAFE_BEGIN(cpr);
    149 
    150 	if (secmax)
    151 		(void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + secmax * hz);
    152 	else
    153 		cv_wait(cv, &tx->tx_sync_lock);
    154 
    155 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
    156 }
    157 
    158 /*
    159  * Stop syncing transaction groups.
    160  */
    161 void
    162 txg_sync_stop(dsl_pool_t *dp)
    163 {
    164 	tx_state_t *tx = &dp->dp_tx;
    165 
    166 	dprintf("pool %p\n", dp);
    167 	/*
    168 	 * Finish off any work in progress.
    169 	 */
    170 	ASSERT(tx->tx_threads == 3);
    171 	txg_wait_synced(dp, 0);
    172 
    173 	/*
    174 	 * Wake all 3 sync threads (one per state) and wait for them to die.
    175 	 */
    176 	mutex_enter(&tx->tx_sync_lock);
    177 
    178 	ASSERT(tx->tx_threads == 3);
    179 
    180 	tx->tx_exiting = 1;
    181 
    182 	cv_broadcast(&tx->tx_quiesce_more_cv);
    183 	cv_broadcast(&tx->tx_quiesce_done_cv);
    184 	cv_broadcast(&tx->tx_sync_more_cv);
    185 	cv_broadcast(&tx->tx_timeout_exit_cv);
    186 
    187 	while (tx->tx_threads != 0)
    188 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
    189 
    190 	tx->tx_exiting = 0;
    191 
    192 	mutex_exit(&tx->tx_sync_lock);
    193 }
    194 
    195 uint64_t
    196 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
    197 {
    198 	tx_state_t *tx = &dp->dp_tx;
    199 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
    200 	uint64_t txg;
    201 
    202 	mutex_enter(&tc->tc_lock);
    203 
    204 	txg = tx->tx_open_txg;
    205 	tc->tc_count[txg & TXG_MASK]++;
    206 
    207 	th->th_cpu = tc;
    208 	th->th_txg = txg;
    209 
    210 	return (txg);
    211 }
    212 
    213 void
    214 txg_rele_to_quiesce(txg_handle_t *th)
    215 {
    216 	tx_cpu_t *tc = th->th_cpu;
    217 
    218 	mutex_exit(&tc->tc_lock);
    219 }
    220 
    221 void
    222 txg_rele_to_sync(txg_handle_t *th)
    223 {
    224 	tx_cpu_t *tc = th->th_cpu;
    225 	int g = th->th_txg & TXG_MASK;
    226 
    227 	mutex_enter(&tc->tc_lock);
    228 	ASSERT(tc->tc_count[g] != 0);
    229 	if (--tc->tc_count[g] == 0)
    230 		cv_broadcast(&tc->tc_cv[g]);
    231 	mutex_exit(&tc->tc_lock);
    232 
    233 	th->th_cpu = NULL;	/* defensive */
    234 }
    235 
    236 static void
    237 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
    238 {
    239 	tx_state_t *tx = &dp->dp_tx;
    240 	int g = txg & TXG_MASK;
    241 	int c;
    242 
    243 	/*
    244 	 * Grab all tx_cpu locks so nobody else can get into this txg.
    245 	 */
    246 	for (c = 0; c < max_ncpus; c++)
    247 		mutex_enter(&tx->tx_cpu[c].tc_lock);
    248 
    249 	ASSERT(txg == tx->tx_open_txg);
    250 	tx->tx_open_txg++;
    251 
    252 	/*
    253 	 * Now that we've incremented tx_open_txg, we can let threads
    254 	 * enter the next transaction group.
    255 	 */
    256 	for (c = 0; c < max_ncpus; c++)
    257 		mutex_exit(&tx->tx_cpu[c].tc_lock);
    258 
    259 	/*
    260 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
    261 	 */
    262 	for (c = 0; c < max_ncpus; c++) {
    263 		tx_cpu_t *tc = &tx->tx_cpu[c];
    264 		mutex_enter(&tc->tc_lock);
    265 		while (tc->tc_count[g] != 0)
    266 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
    267 		mutex_exit(&tc->tc_lock);
    268 	}
    269 }
    270 
    271 static void
    272 txg_sync_thread(dsl_pool_t *dp)
    273 {
    274 	tx_state_t *tx = &dp->dp_tx;
    275 	callb_cpr_t cpr;
    276 
    277 	txg_thread_enter(tx, &cpr);
    278 
    279 	for (;;) {
    280 		uint64_t txg;
    281 
    282 		/*
    283 		 * We sync when there's someone waiting on us, or the
    284 		 * quiesce thread has handed off a txg to us.
    285 		 */
    286 		while (!tx->tx_exiting &&
    287 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
    288 		    tx->tx_quiesced_txg == 0) {
    289 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
    290 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    291 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, 0);
    292 		}
    293 
    294 		/*
    295 		 * Wait until the quiesce thread hands off a txg to us,
    296 		 * prompting it to do so if necessary.
    297 		 */
    298 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
    299 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
    300 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
    301 			cv_broadcast(&tx->tx_quiesce_more_cv);
    302 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
    303 		}
    304 
    305 		if (tx->tx_exiting)
    306 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
    307 
    308 		rw_enter(&tx->tx_suspend, RW_WRITER);
    309 
    310 		/*
    311 		 * Consume the quiesced txg which has been handed off to
    312 		 * us.  This may cause the quiescing thread to now be
    313 		 * able to quiesce another txg, so we must signal it.
    314 		 */
    315 		txg = tx->tx_quiesced_txg;
    316 		tx->tx_quiesced_txg = 0;
    317 		tx->tx_syncing_txg = txg;
    318 		cv_broadcast(&tx->tx_quiesce_more_cv);
    319 		rw_exit(&tx->tx_suspend);
    320 
    321 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    322 		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    323 		mutex_exit(&tx->tx_sync_lock);
    324 		spa_sync(dp->dp_spa, txg);
    325 		mutex_enter(&tx->tx_sync_lock);
    326 		rw_enter(&tx->tx_suspend, RW_WRITER);
    327 		tx->tx_synced_txg = txg;
    328 		tx->tx_syncing_txg = 0;
    329 		rw_exit(&tx->tx_suspend);
    330 		cv_broadcast(&tx->tx_sync_done_cv);
    331 	}
    332 }
    333 
    334 static void
    335 txg_quiesce_thread(dsl_pool_t *dp)
    336 {
    337 	tx_state_t *tx = &dp->dp_tx;
    338 	callb_cpr_t cpr;
    339 
    340 	txg_thread_enter(tx, &cpr);
    341 
    342 	for (;;) {
    343 		uint64_t txg;
    344 
    345 		/*
    346 		 * We quiesce when there's someone waiting on us.
    347 		 * However, we can only have one txg in "quiescing" or
    348 		 * "quiesced, waiting to sync" state.  So we wait until
    349 		 * the "quiesced, waiting to sync" txg has been consumed
    350 		 * by the sync thread.
    351 		 */
    352 		while (!tx->tx_exiting &&
    353 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
    354 		    tx->tx_quiesced_txg != 0))
    355 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
    356 
    357 		if (tx->tx_exiting)
    358 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
    359 
    360 		txg = tx->tx_open_txg;
    361 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    362 		    txg, tx->tx_quiesce_txg_waiting,
    363 		    tx->tx_sync_txg_waiting);
    364 		mutex_exit(&tx->tx_sync_lock);
    365 		txg_quiesce(dp, txg);
    366 		mutex_enter(&tx->tx_sync_lock);
    367 
    368 		/*
    369 		 * Hand this txg off to the sync thread.
    370 		 */
    371 		dprintf("quiesce done, handing off txg %llu\n", txg);
    372 		tx->tx_quiesced_txg = txg;
    373 		cv_broadcast(&tx->tx_sync_more_cv);
    374 		cv_broadcast(&tx->tx_quiesce_done_cv);
    375 	}
    376 }
    377 
    378 void
    379 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
    380 {
    381 	tx_state_t *tx = &dp->dp_tx;
    382 
    383 	mutex_enter(&tx->tx_sync_lock);
    384 	ASSERT(tx->tx_threads == 3);
    385 	if (txg == 0)
    386 		txg = tx->tx_open_txg;
    387 	if (tx->tx_sync_txg_waiting < txg)
    388 		tx->tx_sync_txg_waiting = txg;
    389 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    390 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    391 	while (tx->tx_synced_txg < txg) {
    392 		dprintf("broadcasting sync more "
    393 		    "tx_synced=%llu waiting=%llu dp=%p\n",
    394 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    395 		cv_broadcast(&tx->tx_sync_more_cv);
    396 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
    397 	}
    398 	mutex_exit(&tx->tx_sync_lock);
    399 }
    400 
    401 void
    402 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
    403 {
    404 	tx_state_t *tx = &dp->dp_tx;
    405 
    406 	mutex_enter(&tx->tx_sync_lock);
    407 	ASSERT(tx->tx_threads == 3);
    408 	if (txg == 0)
    409 		txg = tx->tx_open_txg + 1;
    410 	if (tx->tx_quiesce_txg_waiting < txg)
    411 		tx->tx_quiesce_txg_waiting = txg;
    412 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    413 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    414 	while (tx->tx_open_txg < txg) {
    415 		cv_broadcast(&tx->tx_quiesce_more_cv);
    416 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
    417 	}
    418 	mutex_exit(&tx->tx_sync_lock);
    419 }
    420 
    421 static void
    422 txg_timelimit_thread(dsl_pool_t *dp)
    423 {
    424 	tx_state_t *tx = &dp->dp_tx;
    425 	callb_cpr_t cpr;
    426 
    427 	txg_thread_enter(tx, &cpr);
    428 
    429 	while (!tx->tx_exiting) {
    430 		uint64_t txg = tx->tx_open_txg + 1;
    431 
    432 		txg_thread_wait(tx, &cpr, &tx->tx_timeout_exit_cv, txg_time);
    433 
    434 		if (tx->tx_quiesce_txg_waiting < txg)
    435 			tx->tx_quiesce_txg_waiting = txg;
    436 
    437 		while (!tx->tx_exiting && tx->tx_open_txg < txg) {
    438 			dprintf("pushing out %llu\n", txg);
    439 			cv_broadcast(&tx->tx_quiesce_more_cv);
    440 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
    441 		}
    442 	}
    443 	txg_thread_exit(tx, &cpr, &tx->tx_timelimit_thread);
    444 }
    445 
    446 int
    447 txg_stalled(dsl_pool_t *dp)
    448 {
    449 	tx_state_t *tx = &dp->dp_tx;
    450 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
    451 }
    452 
    453 void
    454 txg_suspend(dsl_pool_t *dp)
    455 {
    456 	tx_state_t *tx = &dp->dp_tx;
    457 	/* XXX some code paths suspend when they are already suspended! */
    458 	rw_enter(&tx->tx_suspend, RW_READER);
    459 }
    460 
    461 void
    462 txg_resume(dsl_pool_t *dp)
    463 {
    464 	tx_state_t *tx = &dp->dp_tx;
    465 	rw_exit(&tx->tx_suspend);
    466 }
    467 
    468 /*
    469  * Per-txg object lists.
    470  */
    471 void
    472 txg_list_create(txg_list_t *tl, size_t offset)
    473 {
    474 	int t;
    475 
    476 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
    477 
    478 	tl->tl_offset = offset;
    479 
    480 	for (t = 0; t < TXG_SIZE; t++)
    481 		tl->tl_head[t] = NULL;
    482 }
    483 
    484 void
    485 txg_list_destroy(txg_list_t *tl)
    486 {
    487 	int t;
    488 
    489 	for (t = 0; t < TXG_SIZE; t++)
    490 		ASSERT(txg_list_empty(tl, t));
    491 
    492 	mutex_destroy(&tl->tl_lock);
    493 }
    494 
    495 int
    496 txg_list_empty(txg_list_t *tl, uint64_t txg)
    497 {
    498 	return (tl->tl_head[txg & TXG_MASK] == NULL);
    499 }
    500 
    501 /*
    502  * Add an entry to the list.
    503  * Returns 0 if it's a new entry, 1 if it's already there.
    504  */
    505 int
    506 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
    507 {
    508 	int t = txg & TXG_MASK;
    509 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    510 	int already_on_list;
    511 
    512 	mutex_enter(&tl->tl_lock);
    513 	already_on_list = tn->tn_member[t];
    514 	if (!already_on_list) {
    515 		tn->tn_member[t] = 1;
    516 		tn->tn_next[t] = tl->tl_head[t];
    517 		tl->tl_head[t] = tn;
    518 	}
    519 	mutex_exit(&tl->tl_lock);
    520 
    521 	return (already_on_list);
    522 }
    523 
    524 /*
    525  * Remove the head of the list and return it.
    526  */
    527 void *
    528 txg_list_remove(txg_list_t *tl, uint64_t txg)
    529 {
    530 	int t = txg & TXG_MASK;
    531 	txg_node_t *tn;
    532 	void *p = NULL;
    533 
    534 	mutex_enter(&tl->tl_lock);
    535 	if ((tn = tl->tl_head[t]) != NULL) {
    536 		p = (char *)tn - tl->tl_offset;
    537 		tl->tl_head[t] = tn->tn_next[t];
    538 		tn->tn_next[t] = NULL;
    539 		tn->tn_member[t] = 0;
    540 	}
    541 	mutex_exit(&tl->tl_lock);
    542 
    543 	return (p);
    544 }
    545 
    546 /*
    547  * Remove a specific item from the list and return it.
    548  */
    549 void *
    550 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
    551 {
    552 	int t = txg & TXG_MASK;
    553 	txg_node_t *tn, **tp;
    554 
    555 	mutex_enter(&tl->tl_lock);
    556 
    557 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
    558 		if ((char *)tn - tl->tl_offset == p) {
    559 			*tp = tn->tn_next[t];
    560 			tn->tn_next[t] = NULL;
    561 			tn->tn_member[t] = 0;
    562 			mutex_exit(&tl->tl_lock);
    563 			return (p);
    564 		}
    565 	}
    566 
    567 	mutex_exit(&tl->tl_lock);
    568 
    569 	return (NULL);
    570 }
    571 
    572 int
    573 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
    574 {
    575 	int t = txg & TXG_MASK;
    576 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    577 
    578 	return (tn->tn_member[t]);
    579 }
    580 
    581 /*
    582  * Walk a txg list -- only safe if you know it's not changing.
    583  */
    584 void *
    585 txg_list_head(txg_list_t *tl, uint64_t txg)
    586 {
    587 	int t = txg & TXG_MASK;
    588 	txg_node_t *tn = tl->tl_head[t];
    589 
    590 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    591 }
    592 
    593 void *
    594 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
    595 {
    596 	int t = txg & TXG_MASK;
    597 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    598 
    599 	tn = tn->tn_next[t];
    600 
    601 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    602 }
    603