Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include <sys/zfs_context.h>
     27 #include <sys/txg_impl.h>
     28 #include <sys/dmu_impl.h>
     29 #include <sys/dsl_pool.h>
     30 #include <sys/callb.h>
     31 
     32 /*
     33  * Pool-wide transaction groups.
     34  */
     35 
     36 static void txg_sync_thread(dsl_pool_t *dp);
     37 static void txg_quiesce_thread(dsl_pool_t *dp);
     38 
     39 int zfs_txg_timeout = 30;	/* max seconds worth of delta per txg */
     40 
     41 /*
     42  * Prepare the txg subsystem.
     43  */
     44 void
     45 txg_init(dsl_pool_t *dp, uint64_t txg)
     46 {
     47 	tx_state_t *tx = &dp->dp_tx;
     48 	int c;
     49 	bzero(tx, sizeof (tx_state_t));
     50 
     51 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
     52 
     53 	for (c = 0; c < max_ncpus; c++) {
     54 		int i;
     55 
     56 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
     57 		for (i = 0; i < TXG_SIZE; i++) {
     58 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
     59 			    NULL);
     60 		}
     61 	}
     62 
     63 	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
     64 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
     65 
     66 	tx->tx_open_txg = txg;
     67 }
     68 
     69 /*
     70  * Close down the txg subsystem.
     71  */
     72 void
     73 txg_fini(dsl_pool_t *dp)
     74 {
     75 	tx_state_t *tx = &dp->dp_tx;
     76 	int c;
     77 
     78 	ASSERT(tx->tx_threads == 0);
     79 
     80 	rw_destroy(&tx->tx_suspend);
     81 	mutex_destroy(&tx->tx_sync_lock);
     82 
     83 	for (c = 0; c < max_ncpus; c++) {
     84 		int i;
     85 
     86 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
     87 		for (i = 0; i < TXG_SIZE; i++)
     88 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
     89 	}
     90 
     91 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
     92 
     93 	bzero(tx, sizeof (tx_state_t));
     94 }
     95 
     96 /*
     97  * Start syncing transaction groups.
     98  */
     99 void
    100 txg_sync_start(dsl_pool_t *dp)
    101 {
    102 	tx_state_t *tx = &dp->dp_tx;
    103 
    104 	mutex_enter(&tx->tx_sync_lock);
    105 
    106 	dprintf("pool %p\n", dp);
    107 
    108 	ASSERT(tx->tx_threads == 0);
    109 
    110 	tx->tx_threads = 2;
    111 
    112 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
    113 	    dp, 0, &p0, TS_RUN, minclsyspri);
    114 
    115 	/*
    116 	 * The sync thread can need a larger-than-default stack size on
    117 	 * 32-bit x86.  This is due in part to nested pools and
    118 	 * scrub_visitbp() recursion.
    119 	 */
    120 	tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
    121 	    dp, 0, &p0, TS_RUN, minclsyspri);
    122 
    123 	mutex_exit(&tx->tx_sync_lock);
    124 }
    125 
    126 static void
    127 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
    128 {
    129 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
    130 	mutex_enter(&tx->tx_sync_lock);
    131 }
    132 
    133 static void
    134 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
    135 {
    136 	ASSERT(*tpp != NULL);
    137 	*tpp = NULL;
    138 	tx->tx_threads--;
    139 	cv_broadcast(&tx->tx_exit_cv);
    140 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
    141 	thread_exit();
    142 }
    143 
    144 static void
    145 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
    146 {
    147 	CALLB_CPR_SAFE_BEGIN(cpr);
    148 
    149 	if (time)
    150 		(void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time);
    151 	else
    152 		cv_wait(cv, &tx->tx_sync_lock);
    153 
    154 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
    155 }
    156 
    157 /*
    158  * Stop syncing transaction groups.
    159  */
    160 void
    161 txg_sync_stop(dsl_pool_t *dp)
    162 {
    163 	tx_state_t *tx = &dp->dp_tx;
    164 
    165 	dprintf("pool %p\n", dp);
    166 	/*
    167 	 * Finish off any work in progress.
    168 	 */
    169 	ASSERT(tx->tx_threads == 2);
    170 	txg_wait_synced(dp, 0);
    171 
    172 	/*
    173 	 * Wake all sync threads and wait for them to die.
    174 	 */
    175 	mutex_enter(&tx->tx_sync_lock);
    176 
    177 	ASSERT(tx->tx_threads == 2);
    178 
    179 	tx->tx_exiting = 1;
    180 
    181 	cv_broadcast(&tx->tx_quiesce_more_cv);
    182 	cv_broadcast(&tx->tx_quiesce_done_cv);
    183 	cv_broadcast(&tx->tx_sync_more_cv);
    184 
    185 	while (tx->tx_threads != 0)
    186 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
    187 
    188 	tx->tx_exiting = 0;
    189 
    190 	mutex_exit(&tx->tx_sync_lock);
    191 }
    192 
    193 uint64_t
    194 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
    195 {
    196 	tx_state_t *tx = &dp->dp_tx;
    197 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
    198 	uint64_t txg;
    199 
    200 	mutex_enter(&tc->tc_lock);
    201 
    202 	txg = tx->tx_open_txg;
    203 	tc->tc_count[txg & TXG_MASK]++;
    204 
    205 	th->th_cpu = tc;
    206 	th->th_txg = txg;
    207 
    208 	return (txg);
    209 }
    210 
    211 void
    212 txg_rele_to_quiesce(txg_handle_t *th)
    213 {
    214 	tx_cpu_t *tc = th->th_cpu;
    215 
    216 	mutex_exit(&tc->tc_lock);
    217 }
    218 
    219 void
    220 txg_rele_to_sync(txg_handle_t *th)
    221 {
    222 	tx_cpu_t *tc = th->th_cpu;
    223 	int g = th->th_txg & TXG_MASK;
    224 
    225 	mutex_enter(&tc->tc_lock);
    226 	ASSERT(tc->tc_count[g] != 0);
    227 	if (--tc->tc_count[g] == 0)
    228 		cv_broadcast(&tc->tc_cv[g]);
    229 	mutex_exit(&tc->tc_lock);
    230 
    231 	th->th_cpu = NULL;	/* defensive */
    232 }
    233 
    234 static void
    235 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
    236 {
    237 	tx_state_t *tx = &dp->dp_tx;
    238 	int g = txg & TXG_MASK;
    239 	int c;
    240 
    241 	/*
    242 	 * Grab all tx_cpu locks so nobody else can get into this txg.
    243 	 */
    244 	for (c = 0; c < max_ncpus; c++)
    245 		mutex_enter(&tx->tx_cpu[c].tc_lock);
    246 
    247 	ASSERT(txg == tx->tx_open_txg);
    248 	tx->tx_open_txg++;
    249 
    250 	/*
    251 	 * Now that we've incremented tx_open_txg, we can let threads
    252 	 * enter the next transaction group.
    253 	 */
    254 	for (c = 0; c < max_ncpus; c++)
    255 		mutex_exit(&tx->tx_cpu[c].tc_lock);
    256 
    257 	/*
    258 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
    259 	 */
    260 	for (c = 0; c < max_ncpus; c++) {
    261 		tx_cpu_t *tc = &tx->tx_cpu[c];
    262 		mutex_enter(&tc->tc_lock);
    263 		while (tc->tc_count[g] != 0)
    264 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
    265 		mutex_exit(&tc->tc_lock);
    266 	}
    267 }
    268 
    269 static void
    270 txg_sync_thread(dsl_pool_t *dp)
    271 {
    272 	tx_state_t *tx = &dp->dp_tx;
    273 	callb_cpr_t cpr;
    274 	uint64_t start, delta;
    275 
    276 	txg_thread_enter(tx, &cpr);
    277 
    278 	start = delta = 0;
    279 	for (;;) {
    280 		uint64_t timer, timeout = zfs_txg_timeout * hz;
    281 		uint64_t txg;
    282 
    283 		/*
    284 		 * We sync when there's someone waiting on us, or the
    285 		 * quiesce thread has handed off a txg to us, or we have
    286 		 * reached our timeout.
    287 		 */
    288 		timer = (delta >= timeout ? 0 : timeout - delta);
    289 		while (!tx->tx_exiting && timer > 0 &&
    290 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
    291 		    tx->tx_quiesced_txg == 0) {
    292 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
    293 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    294 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
    295 			delta = lbolt - start;
    296 			timer = (delta > timeout ? 0 : timeout - delta);
    297 		}
    298 
    299 		/*
    300 		 * Wait until the quiesce thread hands off a txg to us,
    301 		 * prompting it to do so if necessary.
    302 		 */
    303 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
    304 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
    305 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
    306 			cv_broadcast(&tx->tx_quiesce_more_cv);
    307 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
    308 		}
    309 
    310 		if (tx->tx_exiting)
    311 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
    312 
    313 		rw_enter(&tx->tx_suspend, RW_WRITER);
    314 
    315 		/*
    316 		 * Consume the quiesced txg which has been handed off to
    317 		 * us.  This may cause the quiescing thread to now be
    318 		 * able to quiesce another txg, so we must signal it.
    319 		 */
    320 		txg = tx->tx_quiesced_txg;
    321 		tx->tx_quiesced_txg = 0;
    322 		tx->tx_syncing_txg = txg;
    323 		cv_broadcast(&tx->tx_quiesce_more_cv);
    324 		rw_exit(&tx->tx_suspend);
    325 
    326 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    327 		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    328 		mutex_exit(&tx->tx_sync_lock);
    329 
    330 		start = lbolt;
    331 		spa_sync(dp->dp_spa, txg);
    332 		delta = lbolt - start;
    333 
    334 		mutex_enter(&tx->tx_sync_lock);
    335 		rw_enter(&tx->tx_suspend, RW_WRITER);
    336 		tx->tx_synced_txg = txg;
    337 		tx->tx_syncing_txg = 0;
    338 		rw_exit(&tx->tx_suspend);
    339 		cv_broadcast(&tx->tx_sync_done_cv);
    340 	}
    341 }
    342 
    343 static void
    344 txg_quiesce_thread(dsl_pool_t *dp)
    345 {
    346 	tx_state_t *tx = &dp->dp_tx;
    347 	callb_cpr_t cpr;
    348 
    349 	txg_thread_enter(tx, &cpr);
    350 
    351 	for (;;) {
    352 		uint64_t txg;
    353 
    354 		/*
    355 		 * We quiesce when there's someone waiting on us.
    356 		 * However, we can only have one txg in "quiescing" or
    357 		 * "quiesced, waiting to sync" state.  So we wait until
    358 		 * the "quiesced, waiting to sync" txg has been consumed
    359 		 * by the sync thread.
    360 		 */
    361 		while (!tx->tx_exiting &&
    362 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
    363 		    tx->tx_quiesced_txg != 0))
    364 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
    365 
    366 		if (tx->tx_exiting)
    367 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
    368 
    369 		txg = tx->tx_open_txg;
    370 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    371 		    txg, tx->tx_quiesce_txg_waiting,
    372 		    tx->tx_sync_txg_waiting);
    373 		mutex_exit(&tx->tx_sync_lock);
    374 		txg_quiesce(dp, txg);
    375 		mutex_enter(&tx->tx_sync_lock);
    376 
    377 		/*
    378 		 * Hand this txg off to the sync thread.
    379 		 */
    380 		dprintf("quiesce done, handing off txg %llu\n", txg);
    381 		tx->tx_quiesced_txg = txg;
    382 		cv_broadcast(&tx->tx_sync_more_cv);
    383 		cv_broadcast(&tx->tx_quiesce_done_cv);
    384 	}
    385 }
    386 
    387 /*
    388  * Delay this thread by 'ticks' if we are still in the open transaction
    389  * group and there is already a waiting txg quiesing or quiesced.  Abort
    390  * the delay if this txg stalls or enters the quiesing state.
    391  */
    392 void
    393 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
    394 {
    395 	tx_state_t *tx = &dp->dp_tx;
    396 	int timeout = lbolt + ticks;
    397 
    398 	/* don't delay if this txg could transition to quiesing immediately */
    399 	if (tx->tx_open_txg > txg ||
    400 	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
    401 		return;
    402 
    403 	mutex_enter(&tx->tx_sync_lock);
    404 	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
    405 		mutex_exit(&tx->tx_sync_lock);
    406 		return;
    407 	}
    408 
    409 	while (lbolt < timeout &&
    410 	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
    411 		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
    412 		    timeout);
    413 
    414 	mutex_exit(&tx->tx_sync_lock);
    415 }
    416 
    417 void
    418 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
    419 {
    420 	tx_state_t *tx = &dp->dp_tx;
    421 
    422 	mutex_enter(&tx->tx_sync_lock);
    423 	ASSERT(tx->tx_threads == 2);
    424 	if (txg == 0)
    425 		txg = tx->tx_open_txg;
    426 	if (tx->tx_sync_txg_waiting < txg)
    427 		tx->tx_sync_txg_waiting = txg;
    428 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    429 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    430 	while (tx->tx_synced_txg < txg) {
    431 		dprintf("broadcasting sync more "
    432 		    "tx_synced=%llu waiting=%llu dp=%p\n",
    433 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    434 		cv_broadcast(&tx->tx_sync_more_cv);
    435 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
    436 	}
    437 	mutex_exit(&tx->tx_sync_lock);
    438 }
    439 
    440 void
    441 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
    442 {
    443 	tx_state_t *tx = &dp->dp_tx;
    444 
    445 	mutex_enter(&tx->tx_sync_lock);
    446 	ASSERT(tx->tx_threads == 2);
    447 	if (txg == 0)
    448 		txg = tx->tx_open_txg + 1;
    449 	if (tx->tx_quiesce_txg_waiting < txg)
    450 		tx->tx_quiesce_txg_waiting = txg;
    451 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    452 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    453 	while (tx->tx_open_txg < txg) {
    454 		cv_broadcast(&tx->tx_quiesce_more_cv);
    455 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
    456 	}
    457 	mutex_exit(&tx->tx_sync_lock);
    458 }
    459 
    460 boolean_t
    461 txg_stalled(dsl_pool_t *dp)
    462 {
    463 	tx_state_t *tx = &dp->dp_tx;
    464 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
    465 }
    466 
    467 boolean_t
    468 txg_sync_waiting(dsl_pool_t *dp)
    469 {
    470 	tx_state_t *tx = &dp->dp_tx;
    471 
    472 	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
    473 	    tx->tx_quiesced_txg != 0);
    474 }
    475 
    476 void
    477 txg_suspend(dsl_pool_t *dp)
    478 {
    479 	tx_state_t *tx = &dp->dp_tx;
    480 	/* XXX some code paths suspend when they are already suspended! */
    481 	rw_enter(&tx->tx_suspend, RW_READER);
    482 }
    483 
    484 void
    485 txg_resume(dsl_pool_t *dp)
    486 {
    487 	tx_state_t *tx = &dp->dp_tx;
    488 	rw_exit(&tx->tx_suspend);
    489 }
    490 
    491 /*
    492  * Per-txg object lists.
    493  */
    494 void
    495 txg_list_create(txg_list_t *tl, size_t offset)
    496 {
    497 	int t;
    498 
    499 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
    500 
    501 	tl->tl_offset = offset;
    502 
    503 	for (t = 0; t < TXG_SIZE; t++)
    504 		tl->tl_head[t] = NULL;
    505 }
    506 
    507 void
    508 txg_list_destroy(txg_list_t *tl)
    509 {
    510 	int t;
    511 
    512 	for (t = 0; t < TXG_SIZE; t++)
    513 		ASSERT(txg_list_empty(tl, t));
    514 
    515 	mutex_destroy(&tl->tl_lock);
    516 }
    517 
    518 int
    519 txg_list_empty(txg_list_t *tl, uint64_t txg)
    520 {
    521 	return (tl->tl_head[txg & TXG_MASK] == NULL);
    522 }
    523 
    524 /*
    525  * Add an entry to the list.
    526  * Returns 0 if it's a new entry, 1 if it's already there.
    527  */
    528 int
    529 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
    530 {
    531 	int t = txg & TXG_MASK;
    532 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    533 	int already_on_list;
    534 
    535 	mutex_enter(&tl->tl_lock);
    536 	already_on_list = tn->tn_member[t];
    537 	if (!already_on_list) {
    538 		tn->tn_member[t] = 1;
    539 		tn->tn_next[t] = tl->tl_head[t];
    540 		tl->tl_head[t] = tn;
    541 	}
    542 	mutex_exit(&tl->tl_lock);
    543 
    544 	return (already_on_list);
    545 }
    546 
    547 /*
    548  * Remove the head of the list and return it.
    549  */
    550 void *
    551 txg_list_remove(txg_list_t *tl, uint64_t txg)
    552 {
    553 	int t = txg & TXG_MASK;
    554 	txg_node_t *tn;
    555 	void *p = NULL;
    556 
    557 	mutex_enter(&tl->tl_lock);
    558 	if ((tn = tl->tl_head[t]) != NULL) {
    559 		p = (char *)tn - tl->tl_offset;
    560 		tl->tl_head[t] = tn->tn_next[t];
    561 		tn->tn_next[t] = NULL;
    562 		tn->tn_member[t] = 0;
    563 	}
    564 	mutex_exit(&tl->tl_lock);
    565 
    566 	return (p);
    567 }
    568 
    569 /*
    570  * Remove a specific item from the list and return it.
    571  */
    572 void *
    573 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
    574 {
    575 	int t = txg & TXG_MASK;
    576 	txg_node_t *tn, **tp;
    577 
    578 	mutex_enter(&tl->tl_lock);
    579 
    580 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
    581 		if ((char *)tn - tl->tl_offset == p) {
    582 			*tp = tn->tn_next[t];
    583 			tn->tn_next[t] = NULL;
    584 			tn->tn_member[t] = 0;
    585 			mutex_exit(&tl->tl_lock);
    586 			return (p);
    587 		}
    588 	}
    589 
    590 	mutex_exit(&tl->tl_lock);
    591 
    592 	return (NULL);
    593 }
    594 
    595 int
    596 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
    597 {
    598 	int t = txg & TXG_MASK;
    599 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    600 
    601 	return (tn->tn_member[t]);
    602 }
    603 
    604 /*
    605  * Walk a txg list -- only safe if you know it's not changing.
    606  */
    607 void *
    608 txg_list_head(txg_list_t *tl, uint64_t txg)
    609 {
    610 	int t = txg & TXG_MASK;
    611 	txg_node_t *tn = tl->tl_head[t];
    612 
    613 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    614 }
    615 
    616 void *
    617 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
    618 {
    619 	int t = txg & TXG_MASK;
    620 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    621 
    622 	tn = tn->tn_next[t];
    623 
    624 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    625 }
    626