Home | History | Annotate | Download | only in io
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
     22 /*	  All Rights Reserved  	*/
     23 
     24 /*
     25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     26  * Use is subject to license terms.
     27  */
     28 
     29 #include <sys/types.h>
     30 #include <sys/param.h>
     31 #include <sys/thread.h>
     32 #include <sys/sysmacros.h>
     33 #include <sys/stropts.h>
     34 #include <sys/stream.h>
     35 #include <sys/strsubr.h>
     36 #include <sys/strsun.h>
     37 #include <sys/conf.h>
     38 #include <sys/debug.h>
     39 #include <sys/cmn_err.h>
     40 #include <sys/kmem.h>
     41 #include <sys/atomic.h>
     42 #include <sys/errno.h>
     43 #include <sys/vtrace.h>
     44 #include <sys/ftrace.h>
     45 #include <sys/ontrap.h>
     46 #include <sys/multidata.h>
     47 #include <sys/multidata_impl.h>
     48 #include <sys/sdt.h>
     49 #include <sys/strft.h>
     50 
     51 #ifdef DEBUG
     52 #include <sys/kmem_impl.h>
     53 #endif
     54 
     55 /*
     56  * This file contains all the STREAMS utility routines that may
     57  * be used by modules and drivers.
     58  */
     59 
     60 /*
     61  * STREAMS message allocator: principles of operation
     62  *
     63  * The streams message allocator consists of all the routines that
     64  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
     65  * dupb(), freeb() and freemsg().  What follows is a high-level view
     66  * of how the allocator works.
     67  *
     68  * Every streams message consists of one or more mblks, a dblk, and data.
     69  * All mblks for all types of messages come from a common mblk_cache.
     70  * The dblk and data come in several flavors, depending on how the
     71  * message is allocated:
     72  *
     73  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
     74  *     fixed-size dblk/data caches. For message sizes that are multiples of
     75  *     PAGESIZE, dblks are allocated separately from the buffer.
     76  *     The associated buffer is allocated by the constructor using kmem_alloc().
     77  *     For all other message sizes, dblk and its associated data is allocated
     78  *     as a single contiguous chunk of memory.
     79  *     Objects in these caches consist of a dblk plus its associated data.
     80  *     allocb() determines the nearest-size cache by table lookup:
     81  *     the dblk_cache[] array provides the mapping from size to dblk cache.
     82  *
     83  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
     84  *     kmem_alloc()'ing a buffer for the data and supplying that
     85  *     buffer to gesballoc(), described below.
     86  *
     87  * (3) The four flavors of [d]esballoc[a] are all implemented by a
     88  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
     89  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
     90  *     db_lim and db_frtnp to describe the caller-supplied buffer.
     91  *
     92  * While there are several routines to allocate messages, there is only
     93  * one routine to free messages: freeb().  freeb() simply invokes the
     94  * dblk's free method, dbp->db_free(), which is set at allocation time.
     95  *
     96  * dupb() creates a new reference to a message by allocating a new mblk,
     97  * incrementing the dblk reference count and setting the dblk's free
     98  * method to dblk_decref().  The dblk's original free method is retained
     99  * in db_lastfree.  dblk_decref() decrements the reference count on each
    100  * freeb().  If this is not the last reference it just frees the mblk;
    101  * if this *is* the last reference, it restores db_free to db_lastfree,
    102  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
    103  *
    104  * The implementation makes aggressive use of kmem object caching for
    105  * maximum performance.  This makes the code simple and compact, but
    106  * also a bit abstruse in some places.  The invariants that constitute a
    107  * message's constructed state, described below, are more subtle than usual.
    108  *
    109  * Every dblk has an "attached mblk" as part of its constructed state.
    110  * The mblk is allocated by the dblk's constructor and remains attached
    111  * until the message is either dup'ed or pulled up.  In the dupb() case
    112  * the mblk association doesn't matter until the last free, at which time
    113  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
    114  * the mblk association because it swaps the leading mblks of two messages,
    115  * so it is responsible for swapping their db_mblk pointers accordingly.
    116  * From a constructed-state viewpoint it doesn't matter that a dblk's
    117  * attached mblk can change while the message is allocated; all that
    118  * matters is that the dblk has *some* attached mblk when it's freed.
    119  *
    120  * The sizes of the allocb() small-message caches are not magical.
    121  * They represent a good trade-off between internal and external
    122  * fragmentation for current workloads.  They should be reevaluated
    123  * periodically, especially if allocations larger than DBLK_MAX_CACHE
    124  * become common.  We use 64-byte alignment so that dblks don't
    125  * straddle cache lines unnecessarily.
    126  */
    127 #define	DBLK_MAX_CACHE		73728
    128 #define	DBLK_CACHE_ALIGN	64
    129 #define	DBLK_MIN_SIZE		8
    130 #define	DBLK_SIZE_SHIFT		3
    131 
    132 #ifdef _BIG_ENDIAN
    133 #define	DBLK_RTFU_SHIFT(field)	\
    134 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
    135 #else
    136 #define	DBLK_RTFU_SHIFT(field)	\
    137 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
    138 #endif
    139 
    140 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
    141 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
    142 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
    143 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
    144 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
    145 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
    146 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
    147 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
    148 
    149 static size_t dblk_sizes[] = {
    150 #ifdef _LP64
    151 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
    152 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
    153 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
    154 #else
    155 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
    156 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
    157 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
    158 #endif
    159 	DBLK_MAX_CACHE, 0
    160 };
    161 
    162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
    163 static struct kmem_cache *mblk_cache;
    164 static struct kmem_cache *dblk_esb_cache;
    165 static struct kmem_cache *fthdr_cache;
    166 static struct kmem_cache *ftblk_cache;
    167 
    168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
    169 static mblk_t *allocb_oversize(size_t size, int flags);
    170 static int allocb_tryhard_fails;
    171 static void frnop_func(void *arg);
    172 frtn_t frnop = { frnop_func };
    173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
    174 
    175 static boolean_t rwnext_enter(queue_t *qp);
    176 static void rwnext_exit(queue_t *qp);
    177 
    178 /*
    179  * Patchable mblk/dblk kmem_cache flags.
    180  */
    181 int dblk_kmem_flags = 0;
    182 int mblk_kmem_flags = 0;
    183 
    184 static int
    185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
    186 {
    187 	dblk_t *dbp = buf;
    188 	ssize_t msg_size = (ssize_t)cdrarg;
    189 	size_t index;
    190 
    191 	ASSERT(msg_size != 0);
    192 
    193 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
    194 
    195 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
    196 
    197 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
    198 		return (-1);
    199 	if ((msg_size & PAGEOFFSET) == 0) {
    200 		dbp->db_base = kmem_alloc(msg_size, kmflags);
    201 		if (dbp->db_base == NULL) {
    202 			kmem_cache_free(mblk_cache, dbp->db_mblk);
    203 			return (-1);
    204 		}
    205 	} else {
    206 		dbp->db_base = (unsigned char *)&dbp[1];
    207 	}
    208 
    209 	dbp->db_mblk->b_datap = dbp;
    210 	dbp->db_cache = dblk_cache[index];
    211 	dbp->db_lim = dbp->db_base + msg_size;
    212 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
    213 	dbp->db_frtnp = NULL;
    214 	dbp->db_fthdr = NULL;
    215 	dbp->db_credp = NULL;
    216 	dbp->db_cpid = -1;
    217 	dbp->db_struioflag = 0;
    218 	dbp->db_struioun.cksum.flags = 0;
    219 	return (0);
    220 }
    221 
    222 /*ARGSUSED*/
    223 static int
    224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
    225 {
    226 	dblk_t *dbp = buf;
    227 
    228 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
    229 		return (-1);
    230 	dbp->db_mblk->b_datap = dbp;
    231 	dbp->db_cache = dblk_esb_cache;
    232 	dbp->db_fthdr = NULL;
    233 	dbp->db_credp = NULL;
    234 	dbp->db_cpid = -1;
    235 	dbp->db_struioflag = 0;
    236 	dbp->db_struioun.cksum.flags = 0;
    237 	return (0);
    238 }
    239 
    240 static int
    241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
    242 {
    243 	dblk_t *dbp = buf;
    244 	bcache_t *bcp = cdrarg;
    245 
    246 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
    247 		return (-1);
    248 
    249 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
    250 	if (dbp->db_base == NULL) {
    251 		kmem_cache_free(mblk_cache, dbp->db_mblk);
    252 		return (-1);
    253 	}
    254 
    255 	dbp->db_mblk->b_datap = dbp;
    256 	dbp->db_cache = (void *)bcp;
    257 	dbp->db_lim = dbp->db_base + bcp->size;
    258 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
    259 	dbp->db_frtnp = NULL;
    260 	dbp->db_fthdr = NULL;
    261 	dbp->db_credp = NULL;
    262 	dbp->db_cpid = -1;
    263 	dbp->db_struioflag = 0;
    264 	dbp->db_struioun.cksum.flags = 0;
    265 	return (0);
    266 }
    267 
    268 /*ARGSUSED*/
    269 static void
    270 dblk_destructor(void *buf, void *cdrarg)
    271 {
    272 	dblk_t *dbp = buf;
    273 	ssize_t msg_size = (ssize_t)cdrarg;
    274 
    275 	ASSERT(dbp->db_mblk->b_datap == dbp);
    276 	ASSERT(msg_size != 0);
    277 	ASSERT(dbp->db_struioflag == 0);
    278 	ASSERT(dbp->db_struioun.cksum.flags == 0);
    279 
    280 	if ((msg_size & PAGEOFFSET) == 0) {
    281 		kmem_free(dbp->db_base, msg_size);
    282 	}
    283 
    284 	kmem_cache_free(mblk_cache, dbp->db_mblk);
    285 }
    286 
    287 static void
    288 bcache_dblk_destructor(void *buf, void *cdrarg)
    289 {
    290 	dblk_t *dbp = buf;
    291 	bcache_t *bcp = cdrarg;
    292 
    293 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
    294 
    295 	ASSERT(dbp->db_mblk->b_datap == dbp);
    296 	ASSERT(dbp->db_struioflag == 0);
    297 	ASSERT(dbp->db_struioun.cksum.flags == 0);
    298 
    299 	kmem_cache_free(mblk_cache, dbp->db_mblk);
    300 }
    301 
    302 /* ARGSUSED */
    303 static int
    304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
    305 {
    306 	ftblk_t *fbp = buf;
    307 	int i;
    308 
    309 	bzero(fbp, sizeof (ftblk_t));
    310 	if (str_ftstack != 0) {
    311 		for (i = 0; i < FTBLK_EVNTS; i++)
    312 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
    313 	}
    314 
    315 	return (0);
    316 }
    317 
    318 /* ARGSUSED */
    319 static void
    320 ftblk_destructor(void *buf, void *cdrarg)
    321 {
    322 	ftblk_t *fbp = buf;
    323 	int i;
    324 
    325 	if (str_ftstack != 0) {
    326 		for (i = 0; i < FTBLK_EVNTS; i++) {
    327 			if (fbp->ev[i].stk != NULL) {
    328 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
    329 				fbp->ev[i].stk = NULL;
    330 			}
    331 		}
    332 	}
    333 }
    334 
    335 static int
    336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
    337 {
    338 	fthdr_t *fhp = buf;
    339 
    340 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
    341 }
    342 
    343 static void
    344 fthdr_destructor(void *buf, void *cdrarg)
    345 {
    346 	fthdr_t *fhp = buf;
    347 
    348 	ftblk_destructor(&fhp->first, cdrarg);
    349 }
    350 
    351 void
    352 streams_msg_init(void)
    353 {
    354 	char name[40];
    355 	size_t size;
    356 	size_t lastsize = DBLK_MIN_SIZE;
    357 	size_t *sizep;
    358 	struct kmem_cache *cp;
    359 	size_t tot_size;
    360 	int offset;
    361 
    362 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
    363 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
    364 
    365 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
    366 
    367 		if ((offset = (size & PAGEOFFSET)) != 0) {
    368 			/*
    369 			 * We are in the middle of a page, dblk should
    370 			 * be allocated on the same page
    371 			 */
    372 			tot_size = size + sizeof (dblk_t);
    373 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
    374 			    < PAGESIZE);
    375 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
    376 
    377 		} else {
    378 
    379 			/*
    380 			 * buf size is multiple of page size, dblk and
    381 			 * buffer are allocated separately.
    382 			 */
    383 
    384 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
    385 			tot_size = sizeof (dblk_t);
    386 		}
    387 
    388 		(void) sprintf(name, "streams_dblk_%ld", size);
    389 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
    390 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
    391 		    NULL, dblk_kmem_flags);
    392 
    393 		while (lastsize <= size) {
    394 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
    395 			lastsize += DBLK_MIN_SIZE;
    396 		}
    397 	}
    398 
    399 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
    400 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
    401 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
    402 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
    403 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
    404 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
    405 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
    406 
    407 	/* Initialize Multidata caches */
    408 	mmd_init();
    409 
    410 	/* initialize throttling queue for esballoc */
    411 	esballoc_queue_init();
    412 }
    413 
    414 /*ARGSUSED*/
    415 mblk_t *
    416 allocb(size_t size, uint_t pri)
    417 {
    418 	dblk_t *dbp;
    419 	mblk_t *mp;
    420 	size_t index;
    421 
    422 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
    423 
    424 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
    425 		if (size != 0) {
    426 			mp = allocb_oversize(size, KM_NOSLEEP);
    427 			goto out;
    428 		}
    429 		index = 0;
    430 	}
    431 
    432 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
    433 		mp = NULL;
    434 		goto out;
    435 	}
    436 
    437 	mp = dbp->db_mblk;
    438 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
    439 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
    440 	mp->b_rptr = mp->b_wptr = dbp->db_base;
    441 	mp->b_queue = NULL;
    442 	MBLK_BAND_FLAG_WORD(mp) = 0;
    443 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
    444 out:
    445 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
    446 
    447 	return (mp);
    448 }
    449 
    450 /*
    451  * Allocate an mblk taking db_credp and db_cpid from the template.
    452  * Allow the cred to be NULL.
    453  */
    454 mblk_t *
    455 allocb_tmpl(size_t size, const mblk_t *tmpl)
    456 {
    457 	mblk_t *mp = allocb(size, 0);
    458 
    459 	if (mp != NULL) {
    460 		dblk_t *src = tmpl->b_datap;
    461 		dblk_t *dst = mp->b_datap;
    462 		cred_t *cr = src->db_credp;
    463 
    464 		if (cr != NULL)
    465 			crhold(dst->db_credp = cr);
    466 		dst->db_cpid = src->db_cpid;
    467 		dst->db_type = src->db_type;
    468 	}
    469 	return (mp);
    470 }
    471 
    472 mblk_t *
    473 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
    474 {
    475 	mblk_t *mp = allocb(size, 0);
    476 
    477 	ASSERT(cr != NULL);
    478 	if (mp != NULL) {
    479 		dblk_t *dbp = mp->b_datap;
    480 
    481 		crhold(dbp->db_credp = cr);
    482 		dbp->db_cpid = cpid;
    483 	}
    484 	return (mp);
    485 }
    486 
    487 mblk_t *
    488 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
    489 {
    490 	mblk_t *mp = allocb_wait(size, 0, flags, error);
    491 
    492 	ASSERT(cr != NULL);
    493 	if (mp != NULL) {
    494 		dblk_t *dbp = mp->b_datap;
    495 
    496 		crhold(dbp->db_credp = cr);
    497 		dbp->db_cpid = cpid;
    498 	}
    499 
    500 	return (mp);
    501 }
    502 
    503 /*
    504  * Extract the db_cred (and optionally db_cpid) from a message.
    505  * We find the first mblk which has a non-NULL db_cred and use that.
    506  * If none found we return NULL.
    507  * Does NOT get a hold on the cred.
    508  */
    509 cred_t *
    510 msg_getcred(const mblk_t *mp, pid_t *cpidp)
    511 {
    512 	cred_t *cr = NULL;
    513 	cred_t *cr2;
    514 
    515 	while (mp != NULL) {
    516 		dblk_t *dbp = mp->b_datap;
    517 
    518 		cr = dbp->db_credp;
    519 		if (cr == NULL) {
    520 			mp = mp->b_cont;
    521 			continue;
    522 		}
    523 		if (cpidp != NULL)
    524 			*cpidp = dbp->db_cpid;
    525 
    526 #ifdef DEBUG
    527 		/*
    528 		 * Normally there should at most one db_credp in a message.
    529 		 * But if there are multiple (as in the case of some M_IOC*
    530 		 * and some internal messages in TCP/IP bind logic) then
    531 		 * they must be identical in the normal case.
    532 		 * However, a socket can be shared between different uids
    533 		 * in which case data queued in TCP would be from different
    534 		 * creds. Thus we can only assert for the zoneid being the
    535 		 * same. Due to Multi-level Level Ports for TX, some
    536 		 * cred_t can have a NULL cr_zone, and we skip the comparison
    537 		 * in that case.
    538 		 */
    539 		cr2 = msg_getcred(mp->b_cont, NULL);
    540 		if (cr2 != NULL) {
    541 			DTRACE_PROBE2(msg__getcred,
    542 			    cred_t *, cr, cred_t *, cr2);
    543 			ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
    544 			    crgetzone(cr) == NULL ||
    545 			    crgetzone(cr2) == NULL);
    546 		}
    547 #endif
    548 		return (cr);
    549 	}
    550 	if (cpidp != NULL)
    551 		*cpidp = NOPID;
    552 	return (NULL);
    553 }
    554 
    555 /*
    556  * Variant of msg_getcred which, when a cred is found
    557  * 1. Returns with a hold on the cred
    558  * 2. Clears the first cred in the mblk.
    559  * This is more efficient to use than a msg_getcred() + crhold() when
    560  * the message is freed after the cred has been extracted.
    561  *
    562  * The caller is responsible for ensuring that there is no other reference
    563  * on the message since db_credp can not be cleared when there are other
    564  * references.
    565  */
    566 cred_t *
    567 msg_extractcred(mblk_t *mp, pid_t *cpidp)
    568 {
    569 	cred_t *cr = NULL;
    570 	cred_t *cr2;
    571 
    572 	while (mp != NULL) {
    573 		dblk_t *dbp = mp->b_datap;
    574 
    575 		cr = dbp->db_credp;
    576 		if (cr == NULL) {
    577 			mp = mp->b_cont;
    578 			continue;
    579 		}
    580 		ASSERT(dbp->db_ref == 1);
    581 		dbp->db_credp = NULL;
    582 		if (cpidp != NULL)
    583 			*cpidp = dbp->db_cpid;
    584 #ifdef DEBUG
    585 		/*
    586 		 * Normally there should at most one db_credp in a message.
    587 		 * But if there are multiple (as in the case of some M_IOC*
    588 		 * and some internal messages in TCP/IP bind logic) then
    589 		 * they must be identical in the normal case.
    590 		 * However, a socket can be shared between different uids
    591 		 * in which case data queued in TCP would be from different
    592 		 * creds. Thus we can only assert for the zoneid being the
    593 		 * same. Due to Multi-level Level Ports for TX, some
    594 		 * cred_t can have a NULL cr_zone, and we skip the comparison
    595 		 * in that case.
    596 		 */
    597 		cr2 = msg_getcred(mp->b_cont, NULL);
    598 		if (cr2 != NULL) {
    599 			DTRACE_PROBE2(msg__extractcred,
    600 			    cred_t *, cr, cred_t *, cr2);
    601 			ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
    602 			    crgetzone(cr) == NULL ||
    603 			    crgetzone(cr2) == NULL);
    604 		}
    605 #endif
    606 		return (cr);
    607 	}
    608 	return (NULL);
    609 }
    610 /*
    611  * Get the label for a message. Uses the first mblk in the message
    612  * which has a non-NULL db_credp.
    613  * Returns NULL if there is no credp.
    614  */
    615 extern struct ts_label_s *
    616 msg_getlabel(const mblk_t *mp)
    617 {
    618 	cred_t *cr = msg_getcred(mp, NULL);
    619 
    620 	if (cr == NULL)
    621 		return (NULL);
    622 
    623 	return (crgetlabel(cr));
    624 }
    625 
    626 void
    627 freeb(mblk_t *mp)
    628 {
    629 	dblk_t *dbp = mp->b_datap;
    630 
    631 	ASSERT(dbp->db_ref > 0);
    632 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
    633 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
    634 
    635 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
    636 
    637 	dbp->db_free(mp, dbp);
    638 }
    639 
    640 void
    641 freemsg(mblk_t *mp)
    642 {
    643 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
    644 	while (mp) {
    645 		dblk_t *dbp = mp->b_datap;
    646 		mblk_t *mp_cont = mp->b_cont;
    647 
    648 		ASSERT(dbp->db_ref > 0);
    649 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
    650 
    651 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
    652 
    653 		dbp->db_free(mp, dbp);
    654 		mp = mp_cont;
    655 	}
    656 }
    657 
    658 /*
    659  * Reallocate a block for another use.  Try hard to use the old block.
    660  * If the old data is wanted (copy), leave b_wptr at the end of the data,
    661  * otherwise return b_wptr = b_rptr.
    662  *
    663  * This routine is private and unstable.
    664  */
    665 mblk_t	*
    666 reallocb(mblk_t *mp, size_t size, uint_t copy)
    667 {
    668 	mblk_t		*mp1;
    669 	unsigned char	*old_rptr;
    670 	ptrdiff_t	cur_size;
    671 
    672 	if (mp == NULL)
    673 		return (allocb(size, BPRI_HI));
    674 
    675 	cur_size = mp->b_wptr - mp->b_rptr;
    676 	old_rptr = mp->b_rptr;
    677 
    678 	ASSERT(mp->b_datap->db_ref != 0);
    679 
    680 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
    681 		/*
    682 		 * If the data is wanted and it will fit where it is, no
    683 		 * work is required.
    684 		 */
    685 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
    686 			return (mp);
    687 
    688 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
    689 		mp1 = mp;
    690 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
    691 		/* XXX other mp state could be copied too, db_flags ... ? */
    692 		mp1->b_cont = mp->b_cont;
    693 	} else {
    694 		return (NULL);
    695 	}
    696 
    697 	if (copy) {
    698 		bcopy(old_rptr, mp1->b_rptr, cur_size);
    699 		mp1->b_wptr = mp1->b_rptr + cur_size;
    700 	}
    701 
    702 	if (mp != mp1)
    703 		freeb(mp);
    704 
    705 	return (mp1);
    706 }
    707 
    708 static void
    709 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
    710 {
    711 	ASSERT(dbp->db_mblk == mp);
    712 	if (dbp->db_fthdr != NULL)
    713 		str_ftfree(dbp);
    714 
    715 	/* set credp and projid to be 'unspecified' before returning to cache */
    716 	if (dbp->db_credp != NULL) {
    717 		crfree(dbp->db_credp);
    718 		dbp->db_credp = NULL;
    719 	}
    720 	dbp->db_cpid = -1;
    721 
    722 	/* Reset the struioflag and the checksum flag fields */
    723 	dbp->db_struioflag = 0;
    724 	dbp->db_struioun.cksum.flags = 0;
    725 
    726 	/* and the COOKED and/or UIOA flag(s) */
    727 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
    728 
    729 	kmem_cache_free(dbp->db_cache, dbp);
    730 }
    731 
    732 static void
    733 dblk_decref(mblk_t *mp, dblk_t *dbp)
    734 {
    735 	if (dbp->db_ref != 1) {
    736 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
    737 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
    738 		/*
    739 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
    740 		 * have a reference to the dblk, which means another thread
    741 		 * could free it.  Therefore we cannot examine the dblk to
    742 		 * determine whether ours was the last reference.  Instead,
    743 		 * we extract the new and minimum reference counts from rtfu.
    744 		 * Note that all we're really saying is "if (ref != refmin)".
    745 		 */
    746 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
    747 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
    748 			kmem_cache_free(mblk_cache, mp);
    749 			return;
    750 		}
    751 	}
    752 	dbp->db_mblk = mp;
    753 	dbp->db_free = dbp->db_lastfree;
    754 	dbp->db_lastfree(mp, dbp);
    755 }
    756 
    757 mblk_t *
    758 dupb(mblk_t *mp)
    759 {
    760 	dblk_t *dbp = mp->b_datap;
    761 	mblk_t *new_mp;
    762 	uint32_t oldrtfu, newrtfu;
    763 
    764 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
    765 		goto out;
    766 
    767 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
    768 	new_mp->b_rptr = mp->b_rptr;
    769 	new_mp->b_wptr = mp->b_wptr;
    770 	new_mp->b_datap = dbp;
    771 	new_mp->b_queue = NULL;
    772 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
    773 
    774 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
    775 
    776 	dbp->db_free = dblk_decref;
    777 	do {
    778 		ASSERT(dbp->db_ref > 0);
    779 		oldrtfu = DBLK_RTFU_WORD(dbp);
    780 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
    781 		/*
    782 		 * If db_ref is maxed out we can't dup this message anymore.
    783 		 */
    784 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
    785 			kmem_cache_free(mblk_cache, new_mp);
    786 			new_mp = NULL;
    787 			goto out;
    788 		}
    789 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
    790 
    791 out:
    792 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
    793 	return (new_mp);
    794 }
    795 
    796 static void
    797 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
    798 {
    799 	frtn_t *frp = dbp->db_frtnp;
    800 
    801 	ASSERT(dbp->db_mblk == mp);
    802 	frp->free_func(frp->free_arg);
    803 	if (dbp->db_fthdr != NULL)
    804 		str_ftfree(dbp);
    805 
    806 	/* set credp and projid to be 'unspecified' before returning to cache */
    807 	if (dbp->db_credp != NULL) {
    808 		crfree(dbp->db_credp);
    809 		dbp->db_credp = NULL;
    810 	}
    811 	dbp->db_cpid = -1;
    812 	dbp->db_struioflag = 0;
    813 	dbp->db_struioun.cksum.flags = 0;
    814 
    815 	kmem_cache_free(dbp->db_cache, dbp);
    816 }
    817 
    818 /*ARGSUSED*/
    819 static void
    820 frnop_func(void *arg)
    821 {
    822 }
    823 
    824 /*
    825  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
    826  */
    827 static mblk_t *
    828 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
    829 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
    830 {
    831 	dblk_t *dbp;
    832 	mblk_t *mp;
    833 
    834 	ASSERT(base != NULL && frp != NULL);
    835 
    836 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
    837 		mp = NULL;
    838 		goto out;
    839 	}
    840 
    841 	mp = dbp->db_mblk;
    842 	dbp->db_base = base;
    843 	dbp->db_lim = base + size;
    844 	dbp->db_free = dbp->db_lastfree = lastfree;
    845 	dbp->db_frtnp = frp;
    846 	DBLK_RTFU_WORD(dbp) = db_rtfu;
    847 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
    848 	mp->b_rptr = mp->b_wptr = base;
    849 	mp->b_queue = NULL;
    850 	MBLK_BAND_FLAG_WORD(mp) = 0;
    851 
    852 out:
    853 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
    854 	return (mp);
    855 }
    856 
    857 /*ARGSUSED*/
    858 mblk_t *
    859 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
    860 {
    861 	mblk_t *mp;
    862 
    863 	/*
    864 	 * Note that this is structured to allow the common case (i.e.
    865 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
    866 	 * call optimization.
    867 	 */
    868 	if (!str_ftnever) {
    869 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
    870 		    frp, freebs_enqueue, KM_NOSLEEP);
    871 
    872 		if (mp != NULL)
    873 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
    874 		return (mp);
    875 	}
    876 
    877 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
    878 	    frp, freebs_enqueue, KM_NOSLEEP));
    879 }
    880 
    881 /*
    882  * Same as esballoc() but sleeps waiting for memory.
    883  */
    884 /*ARGSUSED*/
    885 mblk_t *
    886 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
    887 {
    888 	mblk_t *mp;
    889 
    890 	/*
    891 	 * Note that this is structured to allow the common case (i.e.
    892 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
    893 	 * call optimization.
    894 	 */
    895 	if (!str_ftnever) {
    896 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
    897 		    frp, freebs_enqueue, KM_SLEEP);
    898 
    899 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
    900 		return (mp);
    901 	}
    902 
    903 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
    904 	    frp, freebs_enqueue, KM_SLEEP));
    905 }
    906 
    907 /*ARGSUSED*/
    908 mblk_t *
    909 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
    910 {
    911 	mblk_t *mp;
    912 
    913 	/*
    914 	 * Note that this is structured to allow the common case (i.e.
    915 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
    916 	 * call optimization.
    917 	 */
    918 	if (!str_ftnever) {
    919 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
    920 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
    921 
    922 		if (mp != NULL)
    923 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
    924 		return (mp);
    925 	}
    926 
    927 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
    928 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
    929 }
    930 
    931 /*ARGSUSED*/
    932 mblk_t *
    933 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
    934 {
    935 	mblk_t *mp;
    936 
    937 	/*
    938 	 * Note that this is structured to allow the common case (i.e.
    939 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
    940 	 * call optimization.
    941 	 */
    942 	if (!str_ftnever) {
    943 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
    944 		    frp, freebs_enqueue, KM_NOSLEEP);
    945 
    946 		if (mp != NULL)
    947 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
    948 		return (mp);
    949 	}
    950 
    951 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
    952 	    frp, freebs_enqueue, KM_NOSLEEP));
    953 }
    954 
    955 /*ARGSUSED*/
    956 mblk_t *
    957 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
    958 {
    959 	mblk_t *mp;
    960 
    961 	/*
    962 	 * Note that this is structured to allow the common case (i.e.
    963 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
    964 	 * call optimization.
    965 	 */
    966 	if (!str_ftnever) {
    967 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
    968 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
    969 
    970 		if (mp != NULL)
    971 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
    972 		return (mp);
    973 	}
    974 
    975 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
    976 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
    977 }
    978 
    979 static void
    980 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
    981 {
    982 	bcache_t *bcp = dbp->db_cache;
    983 
    984 	ASSERT(dbp->db_mblk == mp);
    985 	if (dbp->db_fthdr != NULL)
    986 		str_ftfree(dbp);
    987 
    988 	/* set credp and projid to be 'unspecified' before returning to cache */
    989 	if (dbp->db_credp != NULL) {
    990 		crfree(dbp->db_credp);
    991 		dbp->db_credp = NULL;
    992 	}
    993 	dbp->db_cpid = -1;
    994 	dbp->db_struioflag = 0;
    995 	dbp->db_struioun.cksum.flags = 0;
    996 
    997 	mutex_enter(&bcp->mutex);
    998 	kmem_cache_free(bcp->dblk_cache, dbp);
    999 	bcp->alloc--;
   1000 
   1001 	if (bcp->alloc == 0 && bcp->destroy != 0) {
   1002 		kmem_cache_destroy(bcp->dblk_cache);
   1003 		kmem_cache_destroy(bcp->buffer_cache);
   1004 		mutex_exit(&bcp->mutex);
   1005 		mutex_destroy(&bcp->mutex);
   1006 		kmem_free(bcp, sizeof (bcache_t));
   1007 	} else {
   1008 		mutex_exit(&bcp->mutex);
   1009 	}
   1010 }
   1011 
   1012 bcache_t *
   1013 bcache_create(char *name, size_t size, uint_t align)
   1014 {
   1015 	bcache_t *bcp;
   1016 	char buffer[255];
   1017 
   1018 	ASSERT((align & (align - 1)) == 0);
   1019 
   1020 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
   1021 		return (NULL);
   1022 
   1023 	bcp->size = size;
   1024 	bcp->align = align;
   1025 	bcp->alloc = 0;
   1026 	bcp->destroy = 0;
   1027 
   1028 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
   1029 
   1030 	(void) sprintf(buffer, "%s_buffer_cache", name);
   1031 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
   1032 	    NULL, NULL, NULL, 0);
   1033 	(void) sprintf(buffer, "%s_dblk_cache", name);
   1034 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
   1035 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
   1036 	    NULL, (void *)bcp, NULL, 0);
   1037 
   1038 	return (bcp);
   1039 }
   1040 
   1041 void
   1042 bcache_destroy(bcache_t *bcp)
   1043 {
   1044 	ASSERT(bcp != NULL);
   1045 
   1046 	mutex_enter(&bcp->mutex);
   1047 	if (bcp->alloc == 0) {
   1048 		kmem_cache_destroy(bcp->dblk_cache);
   1049 		kmem_cache_destroy(bcp->buffer_cache);
   1050 		mutex_exit(&bcp->mutex);
   1051 		mutex_destroy(&bcp->mutex);
   1052 		kmem_free(bcp, sizeof (bcache_t));
   1053 	} else {
   1054 		bcp->destroy++;
   1055 		mutex_exit(&bcp->mutex);
   1056 	}
   1057 }
   1058 
   1059 /*ARGSUSED*/
   1060 mblk_t *
   1061 bcache_allocb(bcache_t *bcp, uint_t pri)
   1062 {
   1063 	dblk_t *dbp;
   1064 	mblk_t *mp = NULL;
   1065 
   1066 	ASSERT(bcp != NULL);
   1067 
   1068 	mutex_enter(&bcp->mutex);
   1069 	if (bcp->destroy != 0) {
   1070 		mutex_exit(&bcp->mutex);
   1071 		goto out;
   1072 	}
   1073 
   1074 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
   1075 		mutex_exit(&bcp->mutex);
   1076 		goto out;
   1077 	}
   1078 	bcp->alloc++;
   1079 	mutex_exit(&bcp->mutex);
   1080 
   1081 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
   1082 
   1083 	mp = dbp->db_mblk;
   1084 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
   1085 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
   1086 	mp->b_rptr = mp->b_wptr = dbp->db_base;
   1087 	mp->b_queue = NULL;
   1088 	MBLK_BAND_FLAG_WORD(mp) = 0;
   1089 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
   1090 out:
   1091 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
   1092 
   1093 	return (mp);
   1094 }
   1095 
   1096 static void
   1097 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
   1098 {
   1099 	ASSERT(dbp->db_mblk == mp);
   1100 	if (dbp->db_fthdr != NULL)
   1101 		str_ftfree(dbp);
   1102 
   1103 	/* set credp and projid to be 'unspecified' before returning to cache */
   1104 	if (dbp->db_credp != NULL) {
   1105 		crfree(dbp->db_credp);
   1106 		dbp->db_credp = NULL;
   1107 	}
   1108 	dbp->db_cpid = -1;
   1109 	dbp->db_struioflag = 0;
   1110 	dbp->db_struioun.cksum.flags = 0;
   1111 
   1112 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
   1113 	kmem_cache_free(dbp->db_cache, dbp);
   1114 }
   1115 
   1116 static mblk_t *
   1117 allocb_oversize(size_t size, int kmflags)
   1118 {
   1119 	mblk_t *mp;
   1120 	void *buf;
   1121 
   1122 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
   1123 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
   1124 		return (NULL);
   1125 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
   1126 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
   1127 		kmem_free(buf, size);
   1128 
   1129 	if (mp != NULL)
   1130 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
   1131 
   1132 	return (mp);
   1133 }
   1134 
   1135 mblk_t *
   1136 allocb_tryhard(size_t target_size)
   1137 {
   1138 	size_t size;
   1139 	mblk_t *bp;
   1140 
   1141 	for (size = target_size; size < target_size + 512;
   1142 	    size += DBLK_CACHE_ALIGN)
   1143 		if ((bp = allocb(size, BPRI_HI)) != NULL)
   1144 			return (bp);
   1145 	allocb_tryhard_fails++;
   1146 	return (NULL);
   1147 }
   1148 
   1149 /*
   1150  * This routine is consolidation private for STREAMS internal use
   1151  * This routine may only be called from sync routines (i.e., not
   1152  * from put or service procedures).  It is located here (rather
   1153  * than strsubr.c) so that we don't have to expose all of the
   1154  * allocb() implementation details in header files.
   1155  */
   1156 mblk_t *
   1157 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
   1158 {
   1159 	dblk_t *dbp;
   1160 	mblk_t *mp;
   1161 	size_t index;
   1162 
   1163 	index = (size -1) >> DBLK_SIZE_SHIFT;
   1164 
   1165 	if (flags & STR_NOSIG) {
   1166 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
   1167 			if (size != 0) {
   1168 				mp = allocb_oversize(size, KM_SLEEP);
   1169 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
   1170 				    (uintptr_t)mp);
   1171 				return (mp);
   1172 			}
   1173 			index = 0;
   1174 		}
   1175 
   1176 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
   1177 		mp = dbp->db_mblk;
   1178 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
   1179 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
   1180 		mp->b_rptr = mp->b_wptr = dbp->db_base;
   1181 		mp->b_queue = NULL;
   1182 		MBLK_BAND_FLAG_WORD(mp) = 0;
   1183 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
   1184 
   1185 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
   1186 
   1187 	} else {
   1188 		while ((mp = allocb(size, pri)) == NULL) {
   1189 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
   1190 				return (NULL);
   1191 		}
   1192 	}
   1193 
   1194 	return (mp);
   1195 }
   1196 
   1197 /*
   1198  * Call function 'func' with 'arg' when a class zero block can
   1199  * be allocated with priority 'pri'.
   1200  */
   1201 bufcall_id_t
   1202 esbbcall(uint_t pri, void (*func)(void *), void *arg)
   1203 {
   1204 	return (bufcall(1, pri, func, arg));
   1205 }
   1206 
   1207 /*
   1208  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
   1209  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
   1210  * This provides consistency for all internal allocators of ioctl.
   1211  */
   1212 mblk_t *
   1213 mkiocb(uint_t cmd)
   1214 {
   1215 	struct iocblk	*ioc;
   1216 	mblk_t		*mp;
   1217 
   1218 	/*
   1219 	 * Allocate enough space for any of the ioctl related messages.
   1220 	 */
   1221 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
   1222 		return (NULL);
   1223 
   1224 	bzero(mp->b_rptr, sizeof (union ioctypes));
   1225 
   1226 	/*
   1227 	 * Set the mblk_t information and ptrs correctly.
   1228 	 */
   1229 	mp->b_wptr += sizeof (struct iocblk);
   1230 	mp->b_datap->db_type = M_IOCTL;
   1231 
   1232 	/*
   1233 	 * Fill in the fields.
   1234 	 */
   1235 	ioc		= (struct iocblk *)mp->b_rptr;
   1236 	ioc->ioc_cmd	= cmd;
   1237 	ioc->ioc_cr	= kcred;
   1238 	ioc->ioc_id	= getiocseqno();
   1239 	ioc->ioc_flag	= IOC_NATIVE;
   1240 	return (mp);
   1241 }
   1242 
   1243 /*
   1244  * test if block of given size can be allocated with a request of
   1245  * the given priority.
   1246  * 'pri' is no longer used, but is retained for compatibility.
   1247  */
   1248 /* ARGSUSED */
   1249 int
   1250 testb(size_t size, uint_t pri)
   1251 {
   1252 	return ((size + sizeof (dblk_t)) <= kmem_avail());
   1253 }
   1254 
   1255 /*
   1256  * Call function 'func' with argument 'arg' when there is a reasonably
   1257  * good chance that a block of size 'size' can be allocated.
   1258  * 'pri' is no longer used, but is retained for compatibility.
   1259  */
   1260 /* ARGSUSED */
   1261 bufcall_id_t
   1262 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
   1263 {
   1264 	static long bid = 1;	/* always odd to save checking for zero */
   1265 	bufcall_id_t bc_id;
   1266 	struct strbufcall *bcp;
   1267 
   1268 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
   1269 		return (0);
   1270 
   1271 	bcp->bc_func = func;
   1272 	bcp->bc_arg = arg;
   1273 	bcp->bc_size = size;
   1274 	bcp->bc_next = NULL;
   1275 	bcp->bc_executor = NULL;
   1276 
   1277 	mutex_enter(&strbcall_lock);
   1278 	/*
   1279 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
   1280 	 * should be no references to bcp since it may be freed by
   1281 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
   1282 	 * the local var.
   1283 	 */
   1284 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
   1285 
   1286 	/*
   1287 	 * add newly allocated stream event to existing
   1288 	 * linked list of events.
   1289 	 */
   1290 	if (strbcalls.bc_head == NULL) {
   1291 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
   1292 	} else {
   1293 		strbcalls.bc_tail->bc_next = bcp;
   1294 		strbcalls.bc_tail = bcp;
   1295 	}
   1296 
   1297 	cv_signal(&strbcall_cv);
   1298 	mutex_exit(&strbcall_lock);
   1299 	return (bc_id);
   1300 }
   1301 
   1302 /*
   1303  * Cancel a bufcall request.
   1304  */
   1305 void
   1306 unbufcall(bufcall_id_t id)
   1307 {
   1308 	strbufcall_t *bcp, *pbcp;
   1309 
   1310 	mutex_enter(&strbcall_lock);
   1311 again:
   1312 	pbcp = NULL;
   1313 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
   1314 		if (id == bcp->bc_id)
   1315 			break;
   1316 		pbcp = bcp;
   1317 	}
   1318 	if (bcp) {
   1319 		if (bcp->bc_executor != NULL) {
   1320 			if (bcp->bc_executor != curthread) {
   1321 				cv_wait(&bcall_cv, &strbcall_lock);
   1322 				goto again;
   1323 			}
   1324 		} else {
   1325 			if (pbcp)
   1326 				pbcp->bc_next = bcp->bc_next;
   1327 			else
   1328 				strbcalls.bc_head = bcp->bc_next;
   1329 			if (bcp == strbcalls.bc_tail)
   1330 				strbcalls.bc_tail = pbcp;
   1331 			kmem_free(bcp, sizeof (strbufcall_t));
   1332 		}
   1333 	}
   1334 	mutex_exit(&strbcall_lock);
   1335 }
   1336 
   1337 /*
   1338  * Duplicate a message block by block (uses dupb), returning
   1339  * a pointer to the duplicate message.
   1340  * Returns a non-NULL value only if the entire message
   1341  * was dup'd.
   1342  */
   1343 mblk_t *
   1344 dupmsg(mblk_t *bp)
   1345 {
   1346 	mblk_t *head, *nbp;
   1347 
   1348 	if (!bp || !(nbp = head = dupb(bp)))
   1349 		return (NULL);
   1350 
   1351 	while (bp->b_cont) {
   1352 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
   1353 			freemsg(head);
   1354 			return (NULL);
   1355 		}
   1356 		nbp = nbp->b_cont;
   1357 		bp = bp->b_cont;
   1358 	}
   1359 	return (head);
   1360 }
   1361 
   1362 #define	DUPB_NOLOAN(bp) \
   1363 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
   1364 	copyb((bp)) : dupb((bp)))
   1365 
   1366 mblk_t *
   1367 dupmsg_noloan(mblk_t *bp)
   1368 {
   1369 	mblk_t *head, *nbp;
   1370 
   1371 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
   1372 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
   1373 		return (NULL);
   1374 
   1375 	while (bp->b_cont) {
   1376 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
   1377 			freemsg(head);
   1378 			return (NULL);
   1379 		}
   1380 		nbp = nbp->b_cont;
   1381 		bp = bp->b_cont;
   1382 	}
   1383 	return (head);
   1384 }
   1385 
   1386 /*
   1387  * Copy data from message and data block to newly allocated message and
   1388  * data block. Returns new message block pointer, or NULL if error.
   1389  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
   1390  * as in the original even when db_base is not word aligned. (bug 1052877)
   1391  */
   1392 mblk_t *
   1393 copyb(mblk_t *bp)
   1394 {
   1395 	mblk_t	*nbp;
   1396 	dblk_t	*dp, *ndp;
   1397 	uchar_t *base;
   1398 	size_t	size;
   1399 	size_t	unaligned;
   1400 
   1401 	ASSERT(bp->b_wptr >= bp->b_rptr);
   1402 
   1403 	dp = bp->b_datap;
   1404 	if (dp->db_fthdr != NULL)
   1405 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
   1406 
   1407 	/*
   1408 	 * Special handling for Multidata message; this should be
   1409 	 * removed once a copy-callback routine is made available.
   1410 	 */
   1411 	if (dp->db_type == M_MULTIDATA) {
   1412 		cred_t *cr;
   1413 
   1414 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
   1415 			return (NULL);
   1416 
   1417 		nbp->b_flag = bp->b_flag;
   1418 		nbp->b_band = bp->b_band;
   1419 		ndp = nbp->b_datap;
   1420 
   1421 		/* See comments below on potential issues. */
   1422 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
   1423 
   1424 		ASSERT(ndp->db_type == dp->db_type);
   1425 		cr = dp->db_credp;
   1426 		if (cr != NULL)
   1427 			crhold(ndp->db_credp = cr);
   1428 		ndp->db_cpid = dp->db_cpid;
   1429 		return (nbp);
   1430 	}
   1431 
   1432 	size = dp->db_lim - dp->db_base;
   1433 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
   1434 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
   1435 		return (NULL);
   1436 	nbp->b_flag = bp->b_flag;
   1437 	nbp->b_band = bp->b_band;
   1438 	ndp = nbp->b_datap;
   1439 
   1440 	/*
   1441 	 * Well, here is a potential issue.  If we are trying to
   1442 	 * trace a flow, and we copy the message, we might lose
   1443 	 * information about where this message might have been.
   1444 	 * So we should inherit the FT data.  On the other hand,
   1445 	 * a user might be interested only in alloc to free data.
   1446 	 * So I guess the real answer is to provide a tunable.
   1447 	 */
   1448 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
   1449 
   1450 	base = ndp->db_base + unaligned;
   1451 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
   1452 
   1453 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
   1454 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
   1455 
   1456 	return (nbp);
   1457 }
   1458 
   1459 /*
   1460  * Copy data from message to newly allocated message using new
   1461  * data blocks.  Returns a pointer to the new message, or NULL if error.
   1462  */
   1463 mblk_t *
   1464 copymsg(mblk_t *bp)
   1465 {
   1466 	mblk_t *head, *nbp;
   1467 
   1468 	if (!bp || !(nbp = head = copyb(bp)))
   1469 		return (NULL);
   1470 
   1471 	while (bp->b_cont) {
   1472 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
   1473 			freemsg(head);
   1474 			return (NULL);
   1475 		}
   1476 		nbp = nbp->b_cont;
   1477 		bp = bp->b_cont;
   1478 	}
   1479 	return (head);
   1480 }
   1481 
   1482 /*
   1483  * link a message block to tail of message
   1484  */
   1485 void
   1486 linkb(mblk_t *mp, mblk_t *bp)
   1487 {
   1488 	ASSERT(mp && bp);
   1489 
   1490 	for (; mp->b_cont; mp = mp->b_cont)
   1491 		;
   1492 	mp->b_cont = bp;
   1493 }
   1494 
   1495 /*
   1496  * unlink a message block from head of message
   1497  * return pointer to new message.
   1498  * NULL if message becomes empty.
   1499  */
   1500 mblk_t *
   1501 unlinkb(mblk_t *bp)
   1502 {
   1503 	mblk_t *bp1;
   1504 
   1505 	bp1 = bp->b_cont;
   1506 	bp->b_cont = NULL;
   1507 	return (bp1);
   1508 }
   1509 
   1510 /*
   1511  * remove a message block "bp" from message "mp"
   1512  *
   1513  * Return pointer to new message or NULL if no message remains.
   1514  * Return -1 if bp is not found in message.
   1515  */
   1516 mblk_t *
   1517 rmvb(mblk_t *mp, mblk_t *bp)
   1518 {
   1519 	mblk_t *tmp;
   1520 	mblk_t *lastp = NULL;
   1521 
   1522 	ASSERT(mp && bp);
   1523 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
   1524 		if (tmp == bp) {
   1525 			if (lastp)
   1526 				lastp->b_cont = tmp->b_cont;
   1527 			else
   1528 				mp = tmp->b_cont;
   1529 			tmp->b_cont = NULL;
   1530 			return (mp);
   1531 		}
   1532 		lastp = tmp;
   1533 	}
   1534 	return ((mblk_t *)-1);
   1535 }
   1536 
   1537 /*
   1538  * Concatenate and align first len bytes of common
   1539  * message type.  Len == -1, means concat everything.
   1540  * Returns 1 on success, 0 on failure
   1541  * After the pullup, mp points to the pulled up data.
   1542  */
   1543 int
   1544 pullupmsg(mblk_t *mp, ssize_t len)
   1545 {
   1546 	mblk_t *bp, *b_cont;
   1547 	dblk_t *dbp;
   1548 	ssize_t n;
   1549 
   1550 	ASSERT(mp->b_datap->db_ref > 0);
   1551 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
   1552 
   1553 	/*
   1554 	 * We won't handle Multidata message, since it contains
   1555 	 * metadata which this function has no knowledge of; we
   1556 	 * assert on DEBUG, and return failure otherwise.
   1557 	 */
   1558 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
   1559 	if (mp->b_datap->db_type == M_MULTIDATA)
   1560 		return (0);
   1561 
   1562 	if (len == -1) {
   1563 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
   1564 			return (1);
   1565 		len = xmsgsize(mp);
   1566 	} else {
   1567 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
   1568 		ASSERT(first_mblk_len >= 0);
   1569 		/*
   1570 		 * If the length is less than that of the first mblk,
   1571 		 * we want to pull up the message into an aligned mblk.
   1572 		 * Though not part of the spec, some callers assume it.
   1573 		 */
   1574 		if (len <= first_mblk_len) {
   1575 			if (str_aligned(mp->b_rptr))
   1576 				return (1);
   1577 			len = first_mblk_len;
   1578 		} else if (xmsgsize(mp) < len)
   1579 			return (0);
   1580 	}
   1581 
   1582 	if ((bp = allocb_tmpl(len, mp)) == NULL)
   1583 		return (0);
   1584 
   1585 	dbp = bp->b_datap;
   1586 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
   1587 	mp->b_datap = dbp;	/* ... and mp heads the new message */
   1588 	mp->b_datap->db_mblk = mp;
   1589 	bp->b_datap->db_mblk = bp;
   1590 	mp->b_rptr = mp->b_wptr = dbp->db_base;
   1591 
   1592 	do {
   1593 		ASSERT(bp->b_datap->db_ref > 0);
   1594 		ASSERT(bp->b_wptr >= bp->b_rptr);
   1595 		n = MIN(bp->b_wptr - bp->b_rptr, len);
   1596 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
   1597 		mp->b_wptr += n;
   1598 		bp->b_rptr += n;
   1599 		len -= n;
   1600 		if (bp->b_rptr != bp->b_wptr)
   1601 			break;
   1602 		b_cont = bp->b_cont;
   1603 		freeb(bp);
   1604 		bp = b_cont;
   1605 	} while (len && bp);
   1606 
   1607 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
   1608 
   1609 	return (1);
   1610 }
   1611 
   1612 /*
   1613  * Concatenate and align at least the first len bytes of common message
   1614  * type.  Len == -1 means concatenate everything.  The original message is
   1615  * unaltered.  Returns a pointer to a new message on success, otherwise
   1616  * returns NULL.
   1617  */
   1618 mblk_t *
   1619 msgpullup(mblk_t *mp, ssize_t len)
   1620 {
   1621 	mblk_t	*newmp;
   1622 	ssize_t	totlen;
   1623 	ssize_t	n;
   1624 
   1625 	/*
   1626 	 * We won't handle Multidata message, since it contains
   1627 	 * metadata which this function has no knowledge of; we
   1628 	 * assert on DEBUG, and return failure otherwise.
   1629 	 */
   1630 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
   1631 	if (mp->b_datap->db_type == M_MULTIDATA)
   1632 		return (NULL);
   1633 
   1634 	totlen = xmsgsize(mp);
   1635 
   1636 	if ((len > 0) && (len > totlen))
   1637 		return (NULL);
   1638 
   1639 	/*
   1640 	 * Copy all of the first msg type into one new mblk, then dupmsg
   1641 	 * and link the rest onto this.
   1642 	 */
   1643 
   1644 	len = totlen;
   1645 
   1646 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
   1647 		return (NULL);
   1648 
   1649 	newmp->b_flag = mp->b_flag;
   1650 	newmp->b_band = mp->b_band;
   1651 
   1652 	while (len > 0) {
   1653 		n = mp->b_wptr - mp->b_rptr;
   1654 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
   1655 		if (n > 0)
   1656 			bcopy(mp->b_rptr, newmp->b_wptr, n);
   1657 		newmp->b_wptr += n;
   1658 		len -= n;
   1659 		mp = mp->b_cont;
   1660 	}
   1661 
   1662 	if (mp != NULL) {
   1663 		newmp->b_cont = dupmsg(mp);
   1664 		if (newmp->b_cont == NULL) {
   1665 			freemsg(newmp);
   1666 			return (NULL);
   1667 		}
   1668 	}
   1669 
   1670 	return (newmp);
   1671 }
   1672 
   1673 /*
   1674  * Trim bytes from message
   1675  *  len > 0, trim from head
   1676  *  len < 0, trim from tail
   1677  * Returns 1 on success, 0 on failure.
   1678  */
   1679 int
   1680 adjmsg(mblk_t *mp, ssize_t len)
   1681 {
   1682 	mblk_t *bp;
   1683 	mblk_t *save_bp = NULL;
   1684 	mblk_t *prev_bp;
   1685 	mblk_t *bcont;
   1686 	unsigned char type;
   1687 	ssize_t n;
   1688 	int fromhead;
   1689 	int first;
   1690 
   1691 	ASSERT(mp != NULL);
   1692 	/*
   1693 	 * We won't handle Multidata message, since it contains
   1694 	 * metadata which this function has no knowledge of; we
   1695 	 * assert on DEBUG, and return failure otherwise.
   1696 	 */
   1697 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
   1698 	if (mp->b_datap->db_type == M_MULTIDATA)
   1699 		return (0);
   1700 
   1701 	if (len < 0) {
   1702 		fromhead = 0;
   1703 		len = -len;
   1704 	} else {
   1705 		fromhead = 1;
   1706 	}
   1707 
   1708 	if (xmsgsize(mp) < len)
   1709 		return (0);
   1710 
   1711 	if (fromhead) {
   1712 		first = 1;
   1713 		while (len) {
   1714 			ASSERT(mp->b_wptr >= mp->b_rptr);
   1715 			n = MIN(mp->b_wptr - mp->b_rptr, len);
   1716 			mp->b_rptr += n;
   1717 			len -= n;
   1718 
   1719 			/*
   1720 			 * If this is not the first zero length
   1721 			 * message remove it
   1722 			 */
   1723 			if (!first && (mp->b_wptr == mp->b_rptr)) {
   1724 				bcont = mp->b_cont;
   1725 				freeb(mp);
   1726 				mp = save_bp->b_cont = bcont;
   1727 			} else {
   1728 				save_bp = mp;
   1729 				mp = mp->b_cont;
   1730 			}
   1731 			first = 0;
   1732 		}
   1733 	} else {
   1734 		type = mp->b_datap->db_type;
   1735 		while (len) {
   1736 			bp = mp;
   1737 			save_bp = NULL;
   1738 
   1739 			/*
   1740 			 * Find the last message of same type
   1741 			 */
   1742 			while (bp && bp->b_datap->db_type == type) {
   1743 				ASSERT(bp->b_wptr >= bp->b_rptr);
   1744 				prev_bp = save_bp;
   1745 				save_bp = bp;
   1746 				bp = bp->b_cont;
   1747 			}
   1748 			if (save_bp == NULL)
   1749 				break;
   1750 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
   1751 			save_bp->b_wptr -= n;
   1752 			len -= n;
   1753 
   1754 			/*
   1755 			 * If this is not the first message
   1756 			 * and we have taken away everything
   1757 			 * from this message, remove it
   1758 			 */
   1759 
   1760 			if ((save_bp != mp) &&
   1761 			    (save_bp->b_wptr == save_bp->b_rptr)) {
   1762 				bcont = save_bp->b_cont;
   1763 				freeb(save_bp);
   1764 				prev_bp->b_cont = bcont;
   1765 			}
   1766 		}
   1767 	}
   1768 	return (1);
   1769 }
   1770 
   1771 /*
   1772  * get number of data bytes in message
   1773  */
   1774 size_t
   1775 msgdsize(mblk_t *bp)
   1776 {
   1777 	size_t count = 0;
   1778 
   1779 	for (; bp; bp = bp->b_cont)
   1780 		if (bp->b_datap->db_type == M_DATA) {
   1781 			ASSERT(bp->b_wptr >= bp->b_rptr);
   1782 			count += bp->b_wptr - bp->b_rptr;
   1783 		}
   1784 	return (count);
   1785 }
   1786 
   1787 /*
   1788  * Get a message off head of queue
   1789  *
   1790  * If queue has no buffers then mark queue
   1791  * with QWANTR. (queue wants to be read by
   1792  * someone when data becomes available)
   1793  *
   1794  * If there is something to take off then do so.
   1795  * If queue falls below hi water mark turn off QFULL
   1796  * flag.  Decrement weighted count of queue.
   1797  * Also turn off QWANTR because queue is being read.
   1798  *
   1799  * The queue count is maintained on a per-band basis.
   1800  * Priority band 0 (normal messages) uses q_count,
   1801  * q_lowat, etc.  Non-zero priority bands use the
   1802  * fields in their respective qband structures
   1803  * (qb_count, qb_lowat, etc.)  All messages appear
   1804  * on the same list, linked via their b_next pointers.
   1805  * q_first is the head of the list.  q_count does
   1806  * not reflect the size of all the messages on the
   1807  * queue.  It only reflects those messages in the
   1808  * normal band of flow.  The one exception to this
   1809  * deals with high priority messages.  They are in
   1810  * their own conceptual "band", but are accounted
   1811  * against q_count.
   1812  *
   1813  * If queue count is below the lo water mark and QWANTW
   1814  * is set, enable the closest backq which has a service
   1815  * procedure and turn off the QWANTW flag.
   1816  *
   1817  * getq could be built on top of rmvq, but isn't because
   1818  * of performance considerations.
   1819  *
   1820  * A note on the use of q_count and q_mblkcnt:
   1821  *   q_count is the traditional byte count for messages that
   1822  *   have been put on a queue.  Documentation tells us that
   1823  *   we shouldn't rely on that count, but some drivers/modules
   1824  *   do.  What was needed, however, is a mechanism to prevent
   1825  *   runaway streams from consuming all of the resources,
   1826  *   and particularly be able to flow control zero-length
   1827  *   messages.  q_mblkcnt is used for this purpose.  It
   1828  *   counts the number of mblk's that are being put on
   1829  *   the queue.  The intention here, is that each mblk should
   1830  *   contain one byte of data and, for the purpose of
   1831  *   flow-control, logically does.  A queue will become
   1832  *   full when EITHER of these values (q_count and q_mblkcnt)
   1833  *   reach the highwater mark.  It will clear when BOTH
   1834  *   of them drop below the highwater mark.  And it will
   1835  *   backenable when BOTH of them drop below the lowwater
   1836  *   mark.
   1837  *   With this algorithm, a driver/module might be able
   1838  *   to find a reasonably accurate q_count, and the
   1839  *   framework can still try and limit resource usage.
   1840  */
   1841 mblk_t *
   1842 getq(queue_t *q)
   1843 {
   1844 	mblk_t *bp;
   1845 	uchar_t band = 0;
   1846 
   1847 	bp = getq_noenab(q, 0);
   1848 	if (bp != NULL)
   1849 		band = bp->b_band;
   1850 
   1851 	/*
   1852 	 * Inlined from qbackenable().
   1853 	 * Quick check without holding the lock.
   1854 	 */
   1855 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
   1856 		return (bp);
   1857 
   1858 	qbackenable(q, band);
   1859 	return (bp);
   1860 }
   1861 
   1862 /*
   1863  * Calculate number of data bytes in a single data message block taking
   1864  * multidata messages into account.
   1865  */
   1866 
   1867 #define	ADD_MBLK_SIZE(mp, size) 					\
   1868 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
   1869 		(size) += MBLKL(mp);					\
   1870 	} else {							\
   1871 		uint_t	pinuse;						\
   1872 									\
   1873 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
   1874 		(size) += pinuse;					\
   1875 	}
   1876 
   1877 /*
   1878  * Returns the number of bytes in a message (a message is defined as a
   1879  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
   1880  * also return the number of distinct mblks in the message.
   1881  */
   1882 int
   1883 mp_cont_len(mblk_t *bp, int *mblkcnt)
   1884 {
   1885 	mblk_t	*mp;
   1886 	int	mblks = 0;
   1887 	int	bytes = 0;
   1888 
   1889 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
   1890 		ADD_MBLK_SIZE(mp, bytes);
   1891 		mblks++;
   1892 	}
   1893 
   1894 	if (mblkcnt != NULL)
   1895 		*mblkcnt = mblks;
   1896 
   1897 	return (bytes);
   1898 }
   1899 
   1900 /*
   1901  * Like getq() but does not backenable.  This is used by the stream
   1902  * head when a putback() is likely.  The caller must call qbackenable()
   1903  * after it is done with accessing the queue.
   1904  * The rbytes arguments to getq_noneab() allows callers to specify a
   1905  * the maximum number of bytes to return. If the current amount on the
   1906  * queue is less than this then the entire message will be returned.
   1907  * A value of 0 returns the entire message and is equivalent to the old
   1908  * default behaviour prior to the addition of the rbytes argument.
   1909  */
   1910 mblk_t *
   1911 getq_noenab(queue_t *q, ssize_t rbytes)
   1912 {
   1913 	mblk_t *bp, *mp1;
   1914 	mblk_t *mp2 = NULL;
   1915 	qband_t *qbp;
   1916 	kthread_id_t freezer;
   1917 	int	bytecnt = 0, mblkcnt = 0;
   1918 
   1919 	/* freezestr should allow its caller to call getq/putq */
   1920 	freezer = STREAM(q)->sd_freezer;
   1921 	if (freezer == curthread) {
   1922 		ASSERT(frozenstr(q));
   1923 		ASSERT(MUTEX_HELD(QLOCK(q)));
   1924 	} else
   1925 		mutex_enter(QLOCK(q));
   1926 
   1927 	if ((bp = q->q_first) == 0) {
   1928 		q->q_flag |= QWANTR;
   1929 	} else {
   1930 		/*
   1931 		 * If the caller supplied a byte threshold and there is
   1932 		 * more than this amount on the queue then break up the
   1933 		 * the message appropriately.  We can only safely do
   1934 		 * this for M_DATA messages.
   1935 		 */
   1936 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
   1937 		    (q->q_count > rbytes)) {
   1938 			/*
   1939 			 * Inline version of mp_cont_len() which terminates
   1940 			 * when we meet or exceed rbytes.
   1941 			 */
   1942 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
   1943 				mblkcnt++;
   1944 				ADD_MBLK_SIZE(mp1, bytecnt);
   1945 				if (bytecnt  >= rbytes)
   1946 					break;
   1947 			}
   1948 			/*
   1949 			 * We need to account for the following scenarios:
   1950 			 *
   1951 			 * 1) Too much data in the first message:
   1952 			 *	mp1 will be the mblk which puts us over our
   1953 			 *	byte limit.
   1954 			 * 2) Not enough data in the first message:
   1955 			 *	mp1 will be NULL.
   1956 			 * 3) Exactly the right amount of data contained within
   1957 			 *    whole mblks:
   1958 			 *	mp1->b_cont will be where we break the message.
   1959 			 */
   1960 			if (bytecnt > rbytes) {
   1961 				/*
   1962 				 * Dup/copy mp1 and put what we don't need
   1963 				 * back onto the queue. Adjust the read/write
   1964 				 * and continuation pointers appropriately
   1965 				 * and decrement the current mblk count to
   1966 				 * reflect we are putting an mblk back onto
   1967 				 * the queue.
   1968 				 * When adjusting the message pointers, it's
   1969 				 * OK to use the existing bytecnt and the
   1970 				 * requested amount (rbytes) to calculate the
   1971 				 * the new write offset (b_wptr) of what we
   1972 				 * are taking. However, we  cannot use these
   1973 				 * values when calculating the read offset of
   1974 				 * the mblk we are putting back on the queue.
   1975 				 * This is because the begining (b_rptr) of the
   1976 				 * mblk represents some arbitrary point within
   1977 				 * the message.
   1978 				 * It's simplest to do this by advancing b_rptr
   1979 				 * by the new length of mp1 as we don't have to
   1980 				 * remember any intermediate state.
   1981 				 */
   1982 				ASSERT(mp1 != NULL);
   1983 				mblkcnt--;
   1984 				if ((mp2 = dupb(mp1)) == NULL &&
   1985 				    (mp2 = copyb(mp1)) == NULL) {
   1986 					bytecnt = mblkcnt = 0;
   1987 					goto dup_failed;
   1988 				}
   1989 				mp2->b_cont = mp1->b_cont;
   1990 				mp1->b_wptr -= bytecnt - rbytes;
   1991 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
   1992 				mp1->b_cont = NULL;
   1993 				bytecnt = rbytes;
   1994 			} else {
   1995 				/*
   1996 				 * Either there is not enough data in the first
   1997 				 * message or there is no excess data to deal
   1998 				 * with. If mp1 is NULL, we are taking the
   1999 				 * whole message. No need to do anything.
   2000 				 * Otherwise we assign mp1->b_cont to mp2 as
   2001 				 * we will be putting this back onto the head of
   2002 				 * the queue.
   2003 				 */
   2004 				if (mp1 != NULL) {
   2005 					mp2 = mp1->b_cont;
   2006 					mp1->b_cont = NULL;
   2007 				}
   2008 			}
   2009 			/*
   2010 			 * If mp2 is not NULL then we have part of the message
   2011 			 * to put back onto the queue.
   2012 			 */
   2013 			if (mp2 != NULL) {
   2014 				if ((mp2->b_next = bp->b_next) == NULL)
   2015 					q->q_last = mp2;
   2016 				else
   2017 					bp->b_next->b_prev = mp2;
   2018 				q->q_first = mp2;
   2019 			} else {
   2020 				if ((q->q_first = bp->b_next) == NULL)
   2021 					q->q_last = NULL;
   2022 				else
   2023 					q->q_first->b_prev = NULL;
   2024 			}
   2025 		} else {
   2026 			/*
   2027 			 * Either no byte threshold was supplied, there is
   2028 			 * not enough on the queue or we failed to
   2029 			 * duplicate/copy a data block. In these cases we
   2030 			 * just take the entire first message.
   2031 			 */
   2032 dup_failed:
   2033 			bytecnt = mp_cont_len(bp, &mblkcnt);
   2034 			if ((q->q_first = bp->b_next) == NULL)
   2035 				q->q_last = NULL;
   2036 			else
   2037 				q->q_first->b_prev = NULL;
   2038 		}
   2039 		if (bp->b_band == 0) {
   2040 			q->q_count -= bytecnt;
   2041 			q->q_mblkcnt -= mblkcnt;
   2042 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
   2043 			    (q->q_mblkcnt < q->q_hiwat))) {
   2044 				q->q_flag &= ~QFULL;
   2045 			}
   2046 		} else {
   2047 			int i;
   2048 
   2049 			ASSERT(bp->b_band <= q->q_nband);
   2050 			ASSERT(q->q_bandp != NULL);
   2051 			ASSERT(MUTEX_HELD(QLOCK(q)));
   2052 			qbp = q->q_bandp;
   2053 			i = bp->b_band;
   2054 			while (--i > 0)
   2055 				qbp = qbp->qb_next;
   2056 			if (qbp->qb_first == qbp->qb_last) {
   2057 				qbp->qb_first = NULL;
   2058 				qbp->qb_last = NULL;
   2059 			} else {
   2060 				qbp->qb_first = bp->b_next;
   2061 			}
   2062 			qbp->qb_count -= bytecnt;
   2063 			qbp->qb_mblkcnt -= mblkcnt;
   2064 			if (qbp->qb_mblkcnt == 0 ||
   2065 			    ((qbp->qb_count < qbp->qb_hiwat) &&
   2066 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
   2067 				qbp->qb_flag &= ~QB_FULL;
   2068 			}
   2069 		}
   2070 		q->q_flag &= ~QWANTR;
   2071 		bp->b_next = NULL;
   2072 		bp->b_prev = NULL;
   2073 	}
   2074 	if (freezer != curthread)
   2075 		mutex_exit(QLOCK(q));
   2076 
   2077 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
   2078 
   2079 	return (bp);
   2080 }
   2081 
   2082 /*
   2083  * Determine if a backenable is needed after removing a message in the
   2084  * specified band.
   2085  * NOTE: This routine assumes that something like getq_noenab() has been
   2086  * already called.
   2087  *
   2088  * For the read side it is ok to hold sd_lock across calling this (and the
   2089  * stream head often does).
   2090  * But for the write side strwakeq might be invoked and it acquires sd_lock.
   2091  */
   2092 void
   2093 qbackenable(queue_t *q, uchar_t band)
   2094 {
   2095 	int backenab = 0;
   2096 	qband_t *qbp;
   2097 	kthread_id_t freezer;
   2098 
   2099 	ASSERT(q);
   2100 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
   2101 
   2102 	/*
   2103 	 * Quick check without holding the lock.
   2104 	 * OK since after getq() has lowered the q_count these flags
   2105 	 * would not change unless either the qbackenable() is done by
   2106 	 * another thread (which is ok) or the queue has gotten QFULL
   2107 	 * in which case another backenable will take place when the queue
   2108 	 * drops below q_lowat.
   2109 	 */
   2110 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
   2111 		return;
   2112 
   2113 	/* freezestr should allow its caller to call getq/putq */
   2114 	freezer = STREAM(q)->sd_freezer;
   2115 	if (freezer == curthread) {
   2116 		ASSERT(frozenstr(q));
   2117 		ASSERT(MUTEX_HELD(QLOCK(q)));
   2118 	} else
   2119 		mutex_enter(QLOCK(q));
   2120 
   2121 	if (band == 0) {
   2122 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
   2123 		    q->q_mblkcnt < q->q_lowat)) {
   2124 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
   2125 		}
   2126 	} else {
   2127 		int i;
   2128 
   2129 		ASSERT((unsigned)band <= q->q_nband);
   2130 		ASSERT(q->q_bandp != NULL);
   2131 
   2132 		qbp = q->q_bandp;
   2133 		i = band;
   2134 		while (--i > 0)
   2135 			qbp = qbp->qb_next;
   2136 
   2137 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
   2138 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
   2139 			backenab = qbp->qb_flag & QB_WANTW;
   2140 		}
   2141 	}
   2142 
   2143 	if (backenab == 0) {
   2144 		if (freezer != curthread)
   2145 			mutex_exit(QLOCK(q));
   2146 		return;
   2147 	}
   2148 
   2149 	/* Have to drop the lock across strwakeq and backenable */
   2150 	if (backenab & QWANTWSYNC)
   2151 		q->q_flag &= ~QWANTWSYNC;
   2152 	if (backenab & (QWANTW|QB_WANTW)) {
   2153 		if (band != 0)
   2154 			qbp->qb_flag &= ~QB_WANTW;
   2155 		else {
   2156 			q->q_flag &= ~QWANTW;
   2157 		}
   2158 	}
   2159 
   2160 	if (freezer != curthread)
   2161 		mutex_exit(QLOCK(q));
   2162 
   2163 	if (backenab & QWANTWSYNC)
   2164 		strwakeq(q, QWANTWSYNC);
   2165 	if (backenab & (QWANTW|QB_WANTW))
   2166 		backenable(q, band);
   2167 }
   2168 
   2169 /*
   2170  * Remove a message from a queue.  The queue count and other
   2171  * flow control parameters are adjusted and the back queue
   2172  * enabled if necessary.
   2173  *
   2174  * rmvq can be called with the stream frozen, but other utility functions
   2175  * holding QLOCK, and by streams modules without any locks/frozen.
   2176  */
   2177 void
   2178 rmvq(queue_t *q, mblk_t *mp)
   2179 {
   2180 	ASSERT(mp != NULL);
   2181 
   2182 	rmvq_noenab(q, mp);
   2183 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
   2184 		/*
   2185 		 * qbackenable can handle a frozen stream but not a "random"
   2186 		 * qlock being held. Drop lock across qbackenable.
   2187 		 */
   2188 		mutex_exit(QLOCK(q));
   2189 		qbackenable(q, mp->b_band);
   2190 		mutex_enter(QLOCK(q));
   2191 	} else {
   2192 		qbackenable(q, mp->b_band);
   2193 	}
   2194 }
   2195 
   2196 /*
   2197  * Like rmvq() but without any backenabling.
   2198  * This exists to handle SR_CONSOL_DATA in strrput().
   2199  */
   2200 void
   2201 rmvq_noenab(queue_t *q, mblk_t *mp)
   2202 {
   2203 	int i;
   2204 	qband_t *qbp = NULL;
   2205 	kthread_id_t freezer;
   2206 	int	bytecnt = 0, mblkcnt = 0;
   2207 
   2208 	freezer = STREAM(q)->sd_freezer;
   2209 	if (freezer == curthread) {
   2210 		ASSERT(frozenstr(q));
   2211 		ASSERT(MUTEX_HELD(QLOCK(q)));
   2212 	} else if (MUTEX_HELD(QLOCK(q))) {
   2213 		/* Don't drop lock on exit */
   2214 		freezer = curthread;
   2215 	} else
   2216 		mutex_enter(QLOCK(q));
   2217 
   2218 	ASSERT(mp->b_band <= q->q_nband);
   2219 	if (mp->b_band != 0) {		/* Adjust band pointers */
   2220 		ASSERT(q->q_bandp != NULL);
   2221 		qbp = q->q_bandp;
   2222 		i = mp->b_band;
   2223 		while (--i > 0)
   2224 			qbp = qbp->qb_next;
   2225 		if (mp == qbp->qb_first) {
   2226 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
   2227 				qbp->qb_first = mp->b_next;
   2228 			else
   2229 				qbp->qb_first = NULL;
   2230 		}
   2231 		if (mp == qbp->qb_last) {
   2232 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
   2233 				qbp->qb_last = mp->b_prev;
   2234 			else
   2235 				qbp->qb_last = NULL;
   2236 		}
   2237 	}
   2238 
   2239 	/*
   2240 	 * Remove the message from the list.
   2241 	 */
   2242 	if (mp->b_prev)
   2243 		mp->b_prev->b_next = mp->b_next;
   2244 	else
   2245 		q->q_first = mp->b_next;
   2246 	if (mp->b_next)
   2247 		mp->b_next->b_prev = mp->b_prev;
   2248 	else
   2249 		q->q_last = mp->b_prev;
   2250 	mp->b_next = NULL;
   2251 	mp->b_prev = NULL;
   2252 
   2253 	/* Get the size of the message for q_count accounting */
   2254 	bytecnt = mp_cont_len(mp, &mblkcnt);
   2255 
   2256 	if (mp->b_band == 0) {		/* Perform q_count accounting */
   2257 		q->q_count -= bytecnt;
   2258 		q->q_mblkcnt -= mblkcnt;
   2259 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
   2260 		    (q->q_mblkcnt < q->q_hiwat))) {
   2261 			q->q_flag &= ~QFULL;
   2262 		}
   2263 	} else {			/* Perform qb_count accounting */
   2264 		qbp->qb_count -= bytecnt;
   2265 		qbp->qb_mblkcnt -= mblkcnt;
   2266 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
   2267 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
   2268 			qbp->qb_flag &= ~QB_FULL;
   2269 		}
   2270 	}
   2271 	if (freezer != curthread)
   2272 		mutex_exit(QLOCK(q));
   2273 
   2274 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
   2275 }
   2276 
   2277 /*
   2278  * Empty a queue.
   2279  * If flag is set, remove all messages.  Otherwise, remove
   2280  * only non-control messages.  If queue falls below its low
   2281  * water mark, and QWANTW is set, enable the nearest upstream
   2282  * service procedure.
   2283  *
   2284  * Historical note: when merging the M_FLUSH code in strrput with this
   2285  * code one difference was discovered. flushq did not have a check
   2286  * for q_lowat == 0 in the backenabling test.
   2287  *
   2288  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
   2289  * if one exists on the queue.
   2290  */
   2291 void
   2292 flushq_common(queue_t *q, int flag, int pcproto_flag)
   2293 {
   2294 	mblk_t *mp, *nmp;
   2295 	qband_t *qbp;
   2296 	int backenab = 0;
   2297 	unsigned char bpri;
   2298 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
   2299 
   2300 	if (q->q_first == NULL)
   2301 		return;
   2302 
   2303 	mutex_enter(QLOCK(q));
   2304 	mp = q->q_first;
   2305 	q->q_first = NULL;
   2306 	q->q_last = NULL;
   2307 	q->q_count = 0;
   2308 	q->q_mblkcnt = 0;
   2309 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
   2310 		qbp->qb_first = NULL;
   2311 		qbp->qb_last = NULL;
   2312 		qbp->qb_count = 0;
   2313 		qbp->qb_mblkcnt = 0;
   2314 		qbp->qb_flag &= ~QB_FULL;
   2315 	}
   2316 	q->q_flag &= ~QFULL;
   2317 	mutex_exit(QLOCK(q));
   2318 	while (mp) {
   2319 		nmp = mp->b_next;
   2320 		mp->b_next = mp->b_prev = NULL;
   2321 
   2322 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
   2323 
   2324 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
   2325 			(void) putq(q, mp);
   2326 		else if (flag || datamsg(mp->b_datap->db_type))
   2327 			freemsg(mp);
   2328 		else
   2329 			(void) putq(q, mp);
   2330 		mp = nmp;
   2331 	}
   2332 	bpri = 1;
   2333 	mutex_enter(QLOCK(q));
   2334 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
   2335 		if ((qbp->qb_flag & QB_WANTW) &&
   2336 		    (((qbp->qb_count < qbp->qb_lowat) &&
   2337 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
   2338 		    qbp->qb_lowat == 0)) {
   2339 			qbp->qb_flag &= ~QB_WANTW;
   2340 			backenab = 1;
   2341 			qbf[bpri] = 1;
   2342 		} else
   2343 			qbf[bpri] = 0;
   2344 		bpri++;
   2345 	}
   2346 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
   2347 	if ((q->q_flag & QWANTW) &&
   2348 	    (((q->q_count < q->q_lowat) &&
   2349 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
   2350 		q->q_flag &= ~QWANTW;
   2351 		backenab = 1;
   2352 		qbf[0] = 1;
   2353 	} else
   2354 		qbf[0] = 0;
   2355 
   2356 	/*
   2357 	 * If any band can now be written to, and there is a writer
   2358 	 * for that band, then backenable the closest service procedure.
   2359 	 */
   2360 	if (backenab) {
   2361 		mutex_exit(QLOCK(q));
   2362 		for (bpri = q->q_nband; bpri != 0; bpri--)
   2363 			if (qbf[bpri])
   2364 				backenable(q, bpri);
   2365 		if (qbf[0])
   2366 			backenable(q, 0);
   2367 	} else
   2368 		mutex_exit(QLOCK(q));
   2369 }
   2370 
   2371 /*
   2372  * The real flushing takes place in flushq_common. This is done so that
   2373  * a flag which specifies whether or not M_PCPROTO messages should be flushed
   2374  * or not. Currently the only place that uses this flag is the stream head.
   2375  */
   2376 void
   2377 flushq(queue_t *q, int flag)
   2378 {
   2379 	flushq_common(q, flag, 0);
   2380 }
   2381 
   2382 /*
   2383  * Flush the queue of messages of the given priority band.
   2384  * There is some duplication of code between flushq and flushband.
   2385  * This is because we want to optimize the code as much as possible.
   2386  * The assumption is that there will be more messages in the normal
   2387  * (priority 0) band than in any other.
   2388  *
   2389  * Historical note: when merging the M_FLUSH code in strrput with this
   2390  * code one difference was discovered. flushband had an extra check for
   2391  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
   2392  * case. That check does not match the man page for flushband and was not
   2393  * in the strrput flush code hence it was removed.
   2394  */
   2395 void
   2396 flushband(queue_t *q, unsigned char pri, int flag)
   2397 {
   2398 	mblk_t *mp;
   2399 	mblk_t *nmp;
   2400 	mblk_t *last;
   2401 	qband_t *qbp;
   2402 	int band;
   2403 
   2404 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
   2405 	if (pri > q->q_nband) {
   2406 		return;
   2407 	}
   2408 	mutex_enter(QLOCK(q));
   2409 	if (pri == 0) {
   2410 		mp = q->q_first;
   2411 		q->q_first = NULL;
   2412 		q->q_last = NULL;
   2413 		q->q_count = 0;
   2414 		q->q_mblkcnt = 0;
   2415 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
   2416 			qbp->qb_first = NULL;
   2417 			qbp->qb_last = NULL;
   2418 			qbp->qb_count = 0;
   2419 			qbp->qb_mblkcnt = 0;
   2420 			qbp->qb_flag &= ~QB_FULL;
   2421 		}
   2422 		q->q_flag &= ~QFULL;
   2423 		mutex_exit(QLOCK(q));
   2424 		while (mp) {
   2425 			nmp = mp->b_next;
   2426 			mp->b_next = mp->b_prev = NULL;
   2427 			if ((mp->b_band == 0) &&
   2428 			    ((flag == FLUSHALL) ||
   2429 			    datamsg(mp->b_datap->db_type)))
   2430 				freemsg(mp);
   2431 			else
   2432 				(void) putq(q, mp);
   2433 			mp = nmp;
   2434 		}
   2435 		mutex_enter(QLOCK(q));
   2436 		if ((q->q_flag & QWANTW) &&
   2437 		    (((q->q_count < q->q_lowat) &&
   2438 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
   2439 			q->q_flag &= ~QWANTW;
   2440 			mutex_exit(QLOCK(q));
   2441 
   2442 			backenable(q, pri);
   2443 		} else
   2444 			mutex_exit(QLOCK(q));
   2445 	} else {	/* pri != 0 */
   2446 		boolean_t flushed = B_FALSE;
   2447 		band = pri;
   2448 
   2449 		ASSERT(MUTEX_HELD(QLOCK(q)));
   2450 		qbp = q->q_bandp;
   2451 		while (--band > 0)
   2452 			qbp = qbp->qb_next;
   2453 		mp = qbp->qb_first;
   2454 		if (mp == NULL) {
   2455 			mutex_exit(QLOCK(q));
   2456 			return;
   2457 		}
   2458 		last = qbp->qb_last->b_next;
   2459 		/*
   2460 		 * rmvq_noenab() and freemsg() are called for each mblk that
   2461 		 * meets the criteria.  The loop is executed until the last
   2462 		 * mblk has been processed.
   2463 		 */
   2464 		while (mp != last) {
   2465 			ASSERT(mp->b_band == pri);
   2466 			nmp = mp->b_next;
   2467 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
   2468 				rmvq_noenab(q, mp);
   2469 				freemsg(mp);
   2470 				flushed = B_TRUE;
   2471 			}
   2472 			mp = nmp;
   2473 		}
   2474 		mutex_exit(QLOCK(q));
   2475 
   2476 		/*
   2477 		 * If any mblk(s) has been freed, we know that qbackenable()
   2478 		 * will need to be called.
   2479 		 */
   2480 		if (flushed)
   2481 			qbackenable(q, pri);
   2482 	}
   2483 }
   2484 
   2485 /*
   2486  * Return 1 if the queue is not full.  If the queue is full, return
   2487  * 0 (may not put message) and set QWANTW flag (caller wants to write
   2488  * to the queue).
   2489  */
   2490 int
   2491 canput(queue_t *q)
   2492 {
   2493 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
   2494 
   2495 	/* this is for loopback transports, they should not do a canput */
   2496 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
   2497 
   2498 	/* Find next forward module that has a service procedure */
   2499 	q = q->q_nfsrv;
   2500 
   2501 	if (!(q->q_flag & QFULL)) {
   2502 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
   2503 		return (1);
   2504 	}
   2505 	mutex_enter(QLOCK(q));
   2506 	if (q->q_flag & QFULL) {
   2507 		q->q_flag |= QWANTW;
   2508 		mutex_exit(QLOCK(q));
   2509 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
   2510 		return (0);
   2511 	}
   2512 	mutex_exit(QLOCK(q));
   2513 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
   2514 	return (1);
   2515 }
   2516 
   2517 /*
   2518  * This is the new canput for use with priority bands.  Return 1 if the
   2519  * band is not full.  If the band is full, return 0 (may not put message)
   2520  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
   2521  * write to the queue).
   2522  */
   2523 int
   2524 bcanput(queue_t *q, unsigned char pri)
   2525 {
   2526 	qband_t *qbp;
   2527 
   2528 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
   2529 	if (!q)
   2530 		return (0);
   2531 
   2532 	/* Find next forward module that has a service procedure */
   2533 	q = q->q_nfsrv;
   2534 
   2535 	mutex_enter(QLOCK(q));
   2536 	if (pri == 0) {
   2537 		if (q->q_flag & QFULL) {
   2538 			q->q_flag |= QWANTW;
   2539 			mutex_exit(QLOCK(q));
   2540 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
   2541 			    "bcanput:%p %X %d", q, pri, 0);
   2542 			return (0);
   2543 		}
   2544 	} else {	/* pri != 0 */
   2545 		if (pri > q->q_nband) {
   2546 			/*
   2547 			 * No band exists yet, so return success.
   2548 			 */
   2549 			mutex_exit(QLOCK(q));
   2550 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
   2551 			    "bcanput:%p %X %d", q, pri, 1);
   2552 			return (1);
   2553 		}
   2554 		qbp = q->q_bandp;
   2555 		while (--pri)
   2556 			qbp = qbp->qb_next;
   2557 		if (qbp->qb_flag & QB_FULL) {
   2558 			qbp->qb_flag |= QB_WANTW;
   2559 			mutex_exit(QLOCK(q));
   2560 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
   2561 			    "bcanput:%p %X %d", q, pri, 0);
   2562 			return (0);
   2563 		}
   2564 	}
   2565 	mutex_exit(QLOCK(q));
   2566 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
   2567 	    "bcanput:%p %X %d", q, pri, 1);
   2568 	return (1);
   2569 }
   2570 
   2571 /*
   2572  * Put a message on a queue.
   2573  *
   2574  * Messages are enqueued on a priority basis.  The priority classes
   2575  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
   2576  * and B_NORMAL (type < QPCTL && band == 0).
   2577  *
   2578  * Add appropriate weighted data block sizes to queue count.
   2579  * If queue hits high water mark then set QFULL flag.
   2580  *
   2581  * If QNOENAB is not set (putq is allowed to enable the queue),
   2582  * enable the queue only if the message is PRIORITY,
   2583  * or the QWANTR flag is set (indicating that the service procedure
   2584  * is ready to read the queue.  This implies that a service
   2585  * procedure must NEVER put a high priority message back on its own
   2586  * queue, as this would result in an infinite loop (!).
   2587  */
   2588 int
   2589 putq(queue_t *q, mblk_t *bp)
   2590 {
   2591 	mblk_t *tmp;
   2592 	qband_t *qbp = NULL;
   2593 	int mcls = (int)queclass(bp);
   2594 	kthread_id_t freezer;
   2595 	int	bytecnt = 0, mblkcnt = 0;
   2596 
   2597 	freezer = STREAM(q)->sd_freezer;
   2598 	if (freezer == curthread) {
   2599 		ASSERT(frozenstr(q));
   2600 		ASSERT(MUTEX_HELD(QLOCK(q)));
   2601 	} else
   2602 		mutex_enter(QLOCK(q));
   2603 
   2604 	/*
   2605 	 * Make sanity checks and if qband structure is not yet
   2606 	 * allocated, do so.
   2607 	 */
   2608 	if (mcls == QPCTL) {
   2609 		if (bp->b_band != 0)
   2610 			bp->b_band = 0;		/* force to be correct */
   2611 	} else if (bp->b_band != 0) {
   2612 		int i;
   2613 		qband_t **qbpp;
   2614 
   2615 		if (bp->b_band > q->q_nband) {
   2616 
   2617 			/*
   2618 			 * The qband structure for this priority band is
   2619 			 * not on the queue yet, so we have to allocate
   2620 			 * one on the fly.  It would be wasteful to
   2621 			 * associate the qband structures with every
   2622 			 * queue when the queues are allocated.  This is
   2623 			 * because most queues will only need the normal
   2624 			 * band of flow which can be described entirely
   2625 			 * by the queue itself.
   2626 			 */
   2627 			qbpp = &q->q_bandp;
   2628 			while (*qbpp)
   2629 				qbpp = &(*qbpp)->qb_next;
   2630 			while (bp->b_band > q->q_nband) {
   2631 				if ((*qbpp = allocband()) == NULL) {
   2632 					if (freezer != curthread)
   2633 						mutex_exit(QLOCK(q));
   2634 					return (0);
   2635 				}
   2636 				(*qbpp)->qb_hiwat = q->q_hiwat;
   2637 				(*qbpp)->qb_lowat = q->q_lowat;
   2638 				q->q_nband++;
   2639 				qbpp = &(*qbpp)->qb_next;
   2640 			}
   2641 		}
   2642 		ASSERT(MUTEX_HELD(QLOCK(q)));
   2643 		qbp = q->q_bandp;
   2644 		i = bp->b_band;
   2645 		while (--i)
   2646 			qbp = qbp->qb_next;
   2647 	}
   2648 
   2649 	/*
   2650 	 * If queue is empty, add the message and initialize the pointers.
   2651 	 * Otherwise, adjust message pointers and queue pointers based on
   2652 	 * the type of the message and where it belongs on the queue.  Some
   2653 	 * code is duplicated to minimize the number of conditionals and
   2654 	 * hopefully minimize the amount of time this routine takes.
   2655 	 */
   2656 	if (!q->q_first) {
   2657 		bp->b_next = NULL;
   2658 		bp->b_prev = NULL;
   2659 		q->q_first = bp;
   2660 		q->q_last = bp;
   2661 		if (qbp) {
   2662 			qbp->qb_first = bp;
   2663 			qbp->qb_last = bp;
   2664 		}
   2665 	} else if (!qbp) {	/* bp->b_band == 0 */
   2666 
   2667 		/*
   2668 		 * If queue class of message is less than or equal to
   2669 		 * that of the last one on the queue, tack on to the end.
   2670 		 */
   2671 		tmp = q->q_last;
   2672 		if (mcls <= (int)queclass(tmp)) {
   2673 			bp->b_next = NULL;
   2674 			bp->b_prev = tmp;
   2675 			tmp->b_next = bp;
   2676 			q->q_last = bp;
   2677 		} else {
   2678 			tmp = q->q_first;
   2679 			while ((int)queclass(tmp) >= mcls)
   2680 				tmp = tmp->b_next;
   2681 
   2682 			/*
   2683 			 * Insert bp before tmp.
   2684 			 */
   2685 			bp->b_next = tmp;
   2686 			bp->b_prev = tmp->b_prev;
   2687 			if (tmp->b_prev)
   2688 				tmp->b_prev->b_next = bp;
   2689 			else
   2690 				q->q_first = bp;
   2691 			tmp->b_prev = bp;
   2692 		}
   2693 	} else {		/* bp->b_band != 0 */
   2694 		if (qbp->qb_first) {
   2695 			tmp = qbp->qb_last;
   2696 
   2697 			/*
   2698 			 * Insert bp after the last message in this band.
   2699 			 */
   2700 			bp->b_next = tmp->b_next;
   2701 			if (tmp->b_next)
   2702 				tmp->b_next->b_prev = bp;
   2703 			else
   2704 				q->q_last = bp;
   2705 			bp->b_prev = tmp;
   2706 			tmp->b_next = bp;
   2707 		} else {
   2708 			tmp = q->q_last;
   2709 			if ((mcls < (int)queclass(tmp)) ||
   2710 			    (bp->b_band <= tmp->b_band)) {
   2711 
   2712 				/*
   2713 				 * Tack bp on end of queue.
   2714 				 */
   2715 				bp->b_next = NULL;
   2716 				bp->b_prev = tmp;
   2717 				tmp->b_next = bp;
   2718 				q->q_last = bp;
   2719 			} else {
   2720 				tmp = q->q_first;
   2721 				while (tmp->b_datap->db_type >= QPCTL)
   2722 					tmp = tmp->b_next;
   2723 				while (tmp->b_band >= bp->b_band)
   2724 					tmp = tmp->b_next;
   2725 
   2726 				/*
   2727 				 * Insert bp before tmp.
   2728 				 */
   2729 				bp->b_next = tmp;
   2730 				bp->b_prev = tmp->b_prev;
   2731 				if (tmp->b_prev)
   2732 					tmp->b_prev->b_next = bp;
   2733 				else
   2734 					q->q_first = bp;
   2735 				tmp->b_prev = bp;
   2736 			}
   2737 			qbp->qb_first = bp;
   2738 		}
   2739 		qbp->qb_last = bp;
   2740 	}
   2741 
   2742 	/* Get message byte count for q_count accounting */
   2743 	bytecnt = mp_cont_len(bp, &mblkcnt);
   2744 
   2745 	if (qbp) {
   2746 		qbp->qb_count += bytecnt;
   2747 		qbp->qb_mblkcnt += mblkcnt;
   2748 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
   2749 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
   2750 			qbp->qb_flag |= QB_FULL;
   2751 		}
   2752 	} else {
   2753 		q->q_count += bytecnt;
   2754 		q->q_mblkcnt += mblkcnt;
   2755 		if ((q->q_count >= q->q_hiwat) ||
   2756 		    (q->q_mblkcnt >= q->q_hiwat)) {
   2757 			q->q_flag |= QFULL;
   2758 		}
   2759 	}
   2760 
   2761 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
   2762 
   2763 	if ((mcls > QNORM) ||
   2764 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
   2765 		qenable_locked(q);
   2766 	ASSERT(MUTEX_HELD(QLOCK(q)));
   2767 	if (freezer != curthread)
   2768 		mutex_exit(QLOCK(q));
   2769 
   2770 	return (1);
   2771 }
   2772 
   2773 /*
   2774  * Put stuff back at beginning of Q according to priority order.
   2775  * See comment on putq above for details.
   2776  */
   2777 int
   2778 putbq(queue_t *q, mblk_t *bp)
   2779 {
   2780 	mblk_t *tmp;
   2781 	qband_t *qbp = NULL;
   2782 	int mcls = (int)queclass(bp);
   2783 	kthread_id_t freezer;
   2784 	int	bytecnt = 0, mblkcnt = 0;
   2785 
   2786 	ASSERT(q && bp);
   2787 	ASSERT(bp->b_next == NULL);
   2788 	freezer = STREAM(q)->sd_freezer;
   2789 	if (freezer == curthread) {
   2790 		ASSERT(frozenstr(q));
   2791 		ASSERT(MUTEX_HELD(QLOCK(q)));
   2792 	} else
   2793 		mutex_enter(QLOCK(q));
   2794 
   2795 	/*
   2796 	 * Make sanity checks and if qband structure is not yet
   2797 	 * allocated, do so.
   2798 	 */
   2799 	if (mcls == QPCTL) {
   2800 		if (bp->b_band != 0)
   2801 			bp->b_band = 0;		/* force to be correct */
   2802 	} else if (bp->b_band != 0) {
   2803 		int i;
   2804 		qband_t **qbpp;
   2805 
   2806 		if (bp->b_band > q->q_nband) {
   2807 			qbpp = &q->q_bandp;
   2808 			while (*qbpp)
   2809 				qbpp = &(*qbpp)->qb_next;
   2810 			while (bp->b_band > q->q_nband) {
   2811 				if ((*qbpp = allocband()) == NULL) {
   2812 					if (freezer != curthread)
   2813 						mutex_exit(QLOCK(q));
   2814 					return (0);
   2815 				}
   2816 				(*qbpp)->qb_hiwat = q->q_hiwat;
   2817 				(*qbpp)->qb_lowat = q->q_lowat;
   2818 				q->q_nband++;
   2819 				qbpp = &(*qbpp)->qb_next;
   2820 			}
   2821 		}
   2822 		qbp = q->q_bandp;
   2823 		i = bp->b_band;
   2824 		while (--i)
   2825 			qbp = qbp->qb_next;
   2826 	}
   2827 
   2828 	/*
   2829 	 * If queue is empty or if message is high priority,
   2830 	 * place on the front of the queue.
   2831 	 */
   2832 	tmp = q->q_first;
   2833 	if ((!tmp) || (mcls == QPCTL)) {
   2834 		bp->b_next = tmp;
   2835 		if (tmp)
   2836 			tmp->b_prev = bp;
   2837 		else
   2838 			q->q_last = bp;
   2839 		q->q_first = bp;
   2840 		bp->b_prev = NULL;
   2841 		if (qbp) {
   2842 			qbp->qb_first = bp;
   2843 			qbp->qb_last = bp;
   2844 		}
   2845 	} else if (qbp) {	/* bp->b_band != 0 */
   2846 		tmp = qbp->qb_first;
   2847 		if (tmp) {
   2848 
   2849 			/*
   2850 			 * Insert bp before the first message in this band.
   2851 			 */
   2852 			bp->b_next = tmp;
   2853 			bp->b_prev = tmp->b_prev;
   2854 			if (tmp->b_prev)
   2855 				tmp->b_prev->b_next = bp;
   2856 			else
   2857 				q->q_first = bp;
   2858 			tmp->b_prev = bp;
   2859 		} else {
   2860 			tmp = q->q_last;
   2861 			if ((mcls < (int)queclass(tmp)) ||
   2862 			    (bp->b_band < tmp->b_band)) {
   2863 
   2864 				/*
   2865 				 * Tack bp on end of queue.
   2866 				 */
   2867 				bp->b_next = NULL;
   2868 				bp->b_prev = tmp;
   2869 				tmp->b_next = bp;
   2870 				q->q_last = bp;
   2871 			} else {
   2872 				tmp = q->q_first;
   2873 				while (tmp->b_datap->db_type >= QPCTL)
   2874 					tmp = tmp->b_next;
   2875 				while (tmp->b_band > bp->b_band)
   2876 					tmp = tmp->b_next;
   2877 
   2878 				/*
   2879 				 * Insert bp before tmp.
   2880 				 */
   2881 				bp->b_next = tmp;
   2882 				bp->b_prev = tmp->b_prev;
   2883 				if (tmp->b_prev)
   2884 					tmp->b_prev->b_next = bp;
   2885 				else
   2886 					q->q_first = bp;
   2887 				tmp->b_prev = bp;
   2888 			}
   2889 			qbp->qb_last = bp;
   2890 		}
   2891 		qbp->qb_first = bp;
   2892 	} else {		/* bp->b_band == 0 && !QPCTL */
   2893 
   2894 		/*
   2895 		 * If the queue class or band is less than that of the last
   2896 		 * message on the queue, tack bp on the end of the queue.
   2897 		 */
   2898 		tmp = q->q_last;
   2899 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
   2900 			bp->b_next = NULL;
   2901 			bp->b_prev = tmp;
   2902 			tmp->b_next = bp;
   2903 			q->q_last = bp;
   2904 		} else {
   2905 			tmp = q->q_first;
   2906 			while (tmp->b_datap->db_type >= QPCTL)
   2907 				tmp = tmp->b_next;
   2908 			while (tmp->b_band > bp->b_band)
   2909 				tmp = tmp->b_next;
   2910 
   2911 			/*
   2912 			 * Insert bp before tmp.
   2913 			 */
   2914 			bp->b_next = tmp;
   2915 			bp->b_prev = tmp->b_prev;
   2916 			if (tmp->b_prev)
   2917 				tmp->b_prev->b_next = bp;
   2918 			else
   2919 				q->q_first = bp;
   2920 			tmp->b_prev = bp;
   2921 		}
   2922 	}
   2923 
   2924 	/* Get message byte count for q_count accounting */
   2925 	bytecnt = mp_cont_len(bp, &mblkcnt);
   2926 
   2927 	if (qbp) {
   2928 		qbp->qb_count += bytecnt;
   2929 		qbp->qb_mblkcnt += mblkcnt;
   2930 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
   2931 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
   2932 			qbp->qb_flag |= QB_FULL;
   2933 		}
   2934 	} else {
   2935 		q->q_count += bytecnt;
   2936 		q->q_mblkcnt += mblkcnt;
   2937 		if ((q->q_count >= q->q_hiwat) ||
   2938 		    (q->q_mblkcnt >= q->q_hiwat)) {
   2939 			q->q_flag |= QFULL;
   2940 		}
   2941 	}
   2942 
   2943 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
   2944 
   2945 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
   2946 		qenable_locked(q);
   2947 	ASSERT(MUTEX_HELD(QLOCK(q)));
   2948 	if (freezer != curthread)
   2949 		mutex_exit(QLOCK(q));
   2950 
   2951 	return (1);
   2952 }
   2953 
   2954 /*
   2955  * Insert a message before an existing message on the queue.  If the
   2956  * existing message is NULL, the new messages is placed on the end of
   2957  * the queue.  The queue class of the new message is ignored.  However,
   2958  * the priority band of the new message must adhere to the following
   2959  * ordering:
   2960  *
   2961  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
   2962  *
   2963  * All flow control parameters are updated.
   2964  *
   2965  * insq can be called with the stream frozen, but other utility functions
   2966  * holding QLOCK, and by streams modules without any locks/frozen.
   2967  */
   2968 int
   2969 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
   2970 {
   2971 	mblk_t *tmp;
   2972 	qband_t *qbp = NULL;
   2973 	int mcls = (int)queclass(mp);
   2974 	kthread_id_t freezer;
   2975 	int	bytecnt = 0, mblkcnt = 0;
   2976 
   2977 	freezer = STREAM(q)->sd_freezer;
   2978 	if (freezer == curthread) {
   2979 		ASSERT(frozenstr(q));
   2980 		ASSERT(MUTEX_HELD(QLOCK(q)));
   2981 	} else if (MUTEX_HELD(QLOCK(q))) {
   2982 		/* Don't drop lock on exit */
   2983 		freezer = curthread;
   2984 	} else
   2985 		mutex_enter(QLOCK(q));
   2986 
   2987 	if (mcls == QPCTL) {
   2988 		if (mp->b_band != 0)
   2989 			mp->b_band = 0;		/* force to be correct */
   2990 		if (emp && emp->b_prev &&
   2991 		    (emp->b_prev->b_datap->db_type < QPCTL))
   2992 			goto badord;
   2993 	}
   2994 	if (emp) {
   2995 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
   2996 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
   2997 		    (emp->b_prev->b_band < mp->b_band))) {
   2998 			goto badord;
   2999 		}
   3000 	} else {
   3001 		tmp = q->q_last;
   3002 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
   3003 badord:
   3004 			cmn_err(CE_WARN,
   3005 			    "insq: attempt to insert message out of order "
   3006 			    "on q %p", (void *)q);
   3007 			if (freezer != curthread)
   3008 				mutex_exit(QLOCK(q));
   3009 			return (0);
   3010 		}
   3011 	}
   3012 
   3013 	if (mp->b_band != 0) {
   3014 		int i;
   3015 		qband_t **qbpp;
   3016 
   3017 		if (mp->b_band > q->q_nband) {
   3018 			qbpp = &q->q_bandp;
   3019 			while (*qbpp)
   3020 				qbpp = &(*qbpp)->qb_next;
   3021 			while (mp->b_band > q->q_nband) {
   3022 				if ((*qbpp = allocband()) == NULL) {
   3023 					if (freezer != curthread)
   3024 						mutex_exit(QLOCK(q));
   3025 					return (0);
   3026 				}
   3027 				(*qbpp)->qb_hiwat = q->q_hiwat;
   3028 				(*qbpp)->qb_lowat = q->q_lowat;
   3029 				q->q_nband++;
   3030 				qbpp = &(*qbpp)->qb_next;
   3031 			}
   3032 		}
   3033 		qbp = q->q_bandp;
   3034 		i = mp->b_band;
   3035 		while (--i)
   3036 			qbp = qbp->qb_next;
   3037 	}
   3038 
   3039 	if ((mp->b_next = emp) != NULL) {
   3040 		if ((mp->b_prev = emp->b_prev) != NULL)
   3041 			emp->b_prev->b_next = mp;
   3042 		else
   3043 			q->q_first = mp;
   3044 		emp->b_prev = mp;
   3045 	} else {
   3046 		if ((mp->b_prev = q->q_last) != NULL)
   3047 			q->q_last->b_next = mp;
   3048 		else
   3049 			q->q_first = mp;
   3050 		q->q_last = mp;
   3051 	}
   3052 
   3053 	/* Get mblk and byte count for q_count accounting */
   3054 	bytecnt = mp_cont_len(mp, &mblkcnt);
   3055 
   3056 	if (qbp) {	/* adjust qband pointers and count */
   3057 		if (!qbp->qb_first) {
   3058 			qbp->qb_first = mp;
   3059 			qbp->qb_last = mp;
   3060 		} else {
   3061 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
   3062 			    (mp->b_prev->b_band != mp->b_band)))
   3063 				qbp->qb_first = mp;
   3064 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
   3065 			    (mp->b_next->b_band != mp->b_band)))
   3066 				qbp->qb_last = mp;
   3067 		}
   3068 		qbp->qb_count += bytecnt;
   3069 		qbp->qb_mblkcnt += mblkcnt;
   3070 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
   3071 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
   3072 			qbp->qb_flag |= QB_FULL;
   3073 		}
   3074 	} else {
   3075 		q->q_count += bytecnt;
   3076 		q->q_mblkcnt += mblkcnt;
   3077 		if ((q->q_count >= q->q_hiwat) ||
   3078 		    (q->q_mblkcnt >= q->q_hiwat)) {
   3079 			q->q_flag |= QFULL;
   3080 		}
   3081 	}
   3082 
   3083 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
   3084 
   3085 	if (canenable(q) && (q->q_flag & QWANTR))
   3086 		qenable_locked(q);
   3087 
   3088 	ASSERT(MUTEX_HELD(QLOCK(q)));
   3089 	if (freezer != curthread)
   3090 		mutex_exit(QLOCK(q));
   3091 
   3092 	return (1);
   3093 }
   3094 
   3095 /*
   3096  * Create and put a control message on queue.
   3097  */
   3098 int
   3099 putctl(queue_t *q, int type)
   3100 {
   3101 	mblk_t *bp;
   3102 
   3103 	if ((datamsg(type) && (type != M_DELAY)) ||
   3104 	    (bp = allocb_tryhard(0)) == NULL)
   3105 		return (0);
   3106 	bp->b_datap->db_type = (unsigned char) type;
   3107 
   3108 	put(q, bp);
   3109 
   3110 	return (1);
   3111 }
   3112 
   3113 /*
   3114  * Control message with a single-byte parameter
   3115  */
   3116 int
   3117 putctl1(queue_t *q, int type, int param)
   3118 {
   3119 	mblk_t *bp;
   3120 
   3121 	if ((datamsg(type) && (type != M_DELAY)) ||
   3122 	    (bp = allocb_tryhard(1)) == NULL)
   3123 		return (0);
   3124 	bp->b_datap->db_type = (unsigned char)type;
   3125 	*bp->b_wptr++ = (unsigned char)param;
   3126 
   3127 	put(q, bp);
   3128 
   3129 	return (1);
   3130 }
   3131 
   3132 int
   3133 putnextctl1(queue_t *q, int type, int param)
   3134 {
   3135 	mblk_t *bp;
   3136 
   3137 	if ((datamsg(type) && (type != M_DELAY)) ||
   3138 	    ((bp = allocb_tryhard(1)) == NULL))
   3139 		return (0);
   3140 
   3141 	bp->b_datap->db_type = (unsigned char)type;
   3142 	*bp->b_wptr++ = (unsigned char)param;
   3143 
   3144 	putnext(q, bp);
   3145 
   3146 	return (1);
   3147 }
   3148 
   3149 int
   3150 putnextctl(queue_t *q, int type)
   3151 {
   3152 	mblk_t *bp;
   3153 
   3154 	if ((datamsg(type) && (type != M_DELAY)) ||
   3155 	    ((bp = allocb_tryhard(0)) == NULL))
   3156 		return (0);
   3157 	bp->b_datap->db_type = (unsigned char)type;
   3158 
   3159 	putnext(q, bp);
   3160 
   3161 	return (1);
   3162 }
   3163 
   3164 /*
   3165  * Return the queue upstream from this one
   3166  */
   3167 queue_t *
   3168 backq(queue_t *q)
   3169 {
   3170 	q = _OTHERQ(q);
   3171 	if (q->q_next) {
   3172 		q = q->q_next;
   3173 		return (_OTHERQ(q));
   3174 	}
   3175 	return (NULL);
   3176 }
   3177 
   3178 /*
   3179  * Send a block back up the queue in reverse from this
   3180  * one (e.g. to respond to ioctls)
   3181  */
   3182 void
   3183 qreply(queue_t *q, mblk_t *bp)
   3184 {
   3185 	ASSERT(q && bp);
   3186 
   3187 	putnext(_OTHERQ(q), bp);
   3188 }
   3189 
   3190 /*
   3191  * Streams Queue Scheduling
   3192  *
   3193  * Queues are enabled through qenable() when they have messages to
   3194  * process.  They are serviced by queuerun(), which runs each enabled
   3195  * queue's service procedure.  The call to queuerun() is processor
   3196  * dependent - the general principle is that it be run whenever a queue
   3197  * is enabled but before returning to user level.  For system calls,
   3198  * the function runqueues() is called if their action causes a queue
   3199  * to be enabled.  For device interrupts, queuerun() should be
   3200  * called before returning from the last level of interrupt.  Beyond
   3201  * this, no timing assumptions should be made about queue scheduling.
   3202  */
   3203 
   3204 /*
   3205  * Enable a queue: put it on list of those whose service procedures are
   3206  * ready to run and set up the scheduling mechanism.
   3207  * The broadcast is done outside the mutex -> to avoid the woken thread
   3208  * from contending with the mutex. This is OK 'cos the queue has been
   3209  * enqueued on the runlist and flagged safely at this point.
   3210  */
   3211 void
   3212 qenable(queue_t *q)
   3213 {
   3214 	mutex_enter(QLOCK(q));
   3215 	qenable_locked(q);
   3216 	mutex_exit(QLOCK(q));
   3217 }
   3218 /*
   3219  * Return number of messages on queue
   3220  */
   3221 int
   3222 qsize(queue_t *qp)
   3223 {
   3224 	int count = 0;
   3225 	mblk_t *mp;
   3226 
   3227 	mutex_enter(QLOCK(qp));
   3228 	for (mp = qp->q_first; mp; mp = mp->b_next)
   3229 		count++;
   3230 	mutex_exit(QLOCK(qp));
   3231 	return (count);
   3232 }
   3233 
   3234 /*
   3235  * noenable - set queue so that putq() will not enable it.
   3236  * enableok - set queue so that putq() can enable it.
   3237  */
   3238 void
   3239 noenable(queue_t *q)
   3240 {
   3241 	mutex_enter(QLOCK(q));
   3242 	q->q_flag |= QNOENB;
   3243 	mutex_exit(QLOCK(q));
   3244 }
   3245 
   3246 void
   3247 enableok(queue_t *q)
   3248 {
   3249 	mutex_enter(QLOCK(q));
   3250 	q->q_flag &= ~QNOENB;
   3251 	mutex_exit(QLOCK(q));
   3252 }
   3253 
   3254 /*
   3255  * Set queue fields.
   3256  */
   3257 int
   3258 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
   3259 {
   3260 	qband_t *qbp = NULL;
   3261 	queue_t	*wrq;
   3262 	int error = 0;
   3263 	kthread_id_t freezer;
   3264 
   3265 	freezer = STREAM(q)->sd_freezer;
   3266 	if (freezer == curthread) {
   3267 		ASSERT(frozenstr(q));
   3268 		ASSERT(MUTEX_HELD(QLOCK(q)));
   3269 	} else
   3270 		mutex_enter(QLOCK(q));
   3271 
   3272 	if (what >= QBAD) {
   3273 		error = EINVAL;
   3274 		goto done;
   3275 	}
   3276 	if (pri != 0) {
   3277 		int i;
   3278 		qband_t **qbpp;
   3279 
   3280 		if (pri > q->q_nband) {
   3281 			qbpp = &q->q_bandp;
   3282 			while (*qbpp)
   3283 				qbpp = &(*qbpp)->qb_next;
   3284 			while (pri > q->q_nband) {
   3285 				if ((*qbpp = allocband()) == NULL) {
   3286 					error = EAGAIN;
   3287 					goto done;
   3288 				}
   3289 				(*qbpp)->qb_hiwat = q->q_hiwat;
   3290 				(*qbpp)->qb_lowat = q->q_lowat;
   3291 				q->q_nband++;
   3292 				qbpp = &(*qbpp)->qb_next;
   3293 			}
   3294 		}
   3295 		qbp = q->q_bandp;
   3296 		i = pri;
   3297 		while (--i)
   3298 			qbp = qbp->qb_next;
   3299 	}
   3300 	switch (what) {
   3301 
   3302 	case QHIWAT:
   3303 		if (qbp)
   3304 			qbp->qb_hiwat = (size_t)val;
   3305 		else
   3306 			q->q_hiwat = (size_t)val;
   3307 		break;
   3308 
   3309 	case QLOWAT:
   3310 		if (qbp)
   3311 			qbp->qb_lowat = (size_t)val;
   3312 		else
   3313 			q->q_lowat = (size_t)val;
   3314 		break;
   3315 
   3316 	case QMAXPSZ:
   3317 		if (qbp)
   3318 			error = EINVAL;
   3319 		else
   3320 			q->q_maxpsz = (ssize_t)val;
   3321 
   3322 		/*
   3323 		 * Performance concern, strwrite looks at the module below
   3324 		 * the stream head for the maxpsz each time it does a write
   3325 		 * we now cache it at the stream head.  Check to see if this
   3326 		 * queue is sitting directly below the stream head.
   3327 		 */
   3328 		wrq = STREAM(q)->sd_wrq;
   3329 		if (q != wrq->q_next)
   3330 			break;
   3331 
   3332 		/*
   3333 		 * If the stream is not frozen drop the current QLOCK and
   3334 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
   3335 		 */
   3336 		if (freezer != curthread) {
   3337 			mutex_exit(QLOCK(q));
   3338 			mutex_enter(QLOCK(wrq));
   3339 		}
   3340 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
   3341 
   3342 		if (strmsgsz != 0) {
   3343 			if (val == INFPSZ)
   3344 				val = strmsgsz;
   3345 			else  {
   3346 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
   3347 					val = MIN(PIPE_BUF, val);
   3348 				else
   3349 					val = MIN(strmsgsz, val);
   3350 			}
   3351 		}
   3352 		STREAM(q)->sd_qn_maxpsz = val;
   3353 		if (freezer != curthread) {
   3354 			mutex_exit(QLOCK(wrq));
   3355 			mutex_enter(QLOCK(q));
   3356 		}
   3357 		break;
   3358 
   3359 	case QMINPSZ:
   3360 		if (qbp)
   3361 			error = EINVAL;
   3362 		else
   3363 			q->q_minpsz = (ssize_t)val;
   3364 
   3365 		/*
   3366 		 * Performance concern, strwrite looks at the module below
   3367 		 * the stream head for the maxpsz each time it does a write
   3368 		 * we now cache it at the stream head.  Check to see if this
   3369 		 * queue is sitting directly below the stream head.
   3370 		 */
   3371 		wrq = STREAM(q)->sd_wrq;
   3372 		if (q != wrq->q_next)
   3373 			break;
   3374 
   3375 		/*
   3376 		 * If the stream is not frozen drop the current QLOCK and
   3377 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
   3378 		 */
   3379 		if (freezer != curthread) {
   3380 			mutex_exit(QLOCK(q));
   3381 			mutex_enter(QLOCK(wrq));
   3382 		}
   3383 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
   3384 
   3385 		if (freezer != curthread) {
   3386 			mutex_exit(QLOCK(wrq));
   3387 			mutex_enter(QLOCK(q));
   3388 		}
   3389 		break;
   3390 
   3391 	case QSTRUIOT:
   3392 		if (qbp)
   3393 			error = EINVAL;
   3394 		else
   3395 			q->q_struiot = (ushort_t)val;
   3396 		break;
   3397 
   3398 	case QCOUNT:
   3399 	case QFIRST:
   3400 	case QLAST:
   3401 	case QFLAG:
   3402 		error = EPERM;
   3403 		break;
   3404 
   3405 	default:
   3406 		error = EINVAL;
   3407 		break;
   3408 	}
   3409 done:
   3410 	if (freezer != curthread)
   3411 		mutex_exit(QLOCK(q));
   3412 	return (error);
   3413 }
   3414 
   3415 /*
   3416  * Get queue fields.
   3417  */
   3418 int
   3419 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
   3420 {
   3421 	qband_t 	*qbp = NULL;
   3422 	int 		error = 0;
   3423 	kthread_id_t 	freezer;
   3424 
   3425 	freezer = STREAM(q)->sd_freezer;
   3426 	if (freezer == curthread) {
   3427 		ASSERT(frozenstr(q));
   3428 		ASSERT(MUTEX_HELD(QLOCK(q)));
   3429 	} else
   3430 		mutex_enter(QLOCK(q));
   3431 	if (what >= QBAD) {
   3432 		error = EINVAL;
   3433 		goto done;
   3434 	}
   3435 	if (pri != 0) {
   3436 		int i;
   3437 		qband_t **qbpp;
   3438 
   3439 		if (pri > q->q_nband) {
   3440 			qbpp = &q->q_bandp;
   3441 			while (*qbpp)
   3442 				qbpp = &(*qbpp)->qb_next;
   3443 			while (pri > q->q_nband) {
   3444 				if ((*qbpp = allocband()) == NULL) {
   3445 					error = EAGAIN;
   3446 					goto done;
   3447 				}
   3448 				(*qbpp)->qb_hiwat = q->q_hiwat;
   3449 				(*qbpp)->qb_lowat = q->q_lowat;
   3450 				q->q_nband++;
   3451 				qbpp = &(*qbpp)->qb_next;
   3452 			}
   3453 		}
   3454 		qbp = q->q_bandp;
   3455 		i = pri;
   3456 		while (--i)
   3457 			qbp = qbp->qb_next;
   3458 	}
   3459 	switch (what) {
   3460 	case QHIWAT:
   3461 		if (qbp)
   3462 			*(size_t *)valp = qbp->qb_hiwat;
   3463 		else
   3464 			*(size_t *)valp = q->q_hiwat;
   3465 		break;
   3466 
   3467 	case QLOWAT:
   3468 		if (qbp)
   3469 			*(size_t *)valp = qbp->qb_lowat;
   3470 		else
   3471 			*(size_t *)valp = q->q_lowat;
   3472 		break;
   3473 
   3474 	case QMAXPSZ:
   3475 		if (qbp)
   3476 			error = EINVAL;
   3477 		else
   3478 			*(ssize_t *)valp = q->q_maxpsz;
   3479 		break;
   3480 
   3481 	case QMINPSZ:
   3482 		if (qbp)
   3483 			error = EINVAL;
   3484 		else
   3485 			*(ssize_t *)valp = q->q_minpsz;
   3486 		break;
   3487 
   3488 	case QCOUNT:
   3489 		if (qbp)
   3490 			*(size_t *)valp = qbp->qb_count;
   3491 		else
   3492 			*(size_t *)valp = q->q_count;
   3493 		break;
   3494 
   3495 	case QFIRST:
   3496 		if (qbp)
   3497 			*(mblk_t **)valp = qbp->qb_first;
   3498 		else
   3499 			*(mblk_t **)valp = q->q_first;
   3500 		break;
   3501 
   3502 	case QLAST:
   3503 		if (qbp)
   3504 			*(mblk_t **)valp = qbp->qb_last;
   3505 		else
   3506 			*(mblk_t **)valp = q->q_last;
   3507 		break;
   3508 
   3509 	case QFLAG:
   3510 		if (qbp)
   3511 			*(uint_t *)valp = qbp->qb_flag;
   3512 		else
   3513 			*(uint_t *)valp = q->q_flag;
   3514 		break;
   3515 
   3516 	case QSTRUIOT:
   3517 		if (qbp)
   3518 			error = EINVAL;
   3519 		else
   3520 			*(short *)valp = q->q_struiot;
   3521 		break;
   3522 
   3523 	default:
   3524 		error = EINVAL;
   3525 		break;
   3526 	}
   3527 done:
   3528 	if (freezer != curthread)
   3529 		mutex_exit(QLOCK(q));
   3530 	return (error);
   3531 }
   3532 
   3533 /*
   3534  * Function awakes all in cvwait/sigwait/pollwait, on one of:
   3535  *	QWANTWSYNC or QWANTR or QWANTW,
   3536  *
   3537  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
   3538  *	 deferred wakeup will be done. Also if strpoll() in progress then a
   3539  *	 deferred pollwakeup will be done.
   3540  */
   3541 void
   3542 strwakeq(queue_t *q, int flag)
   3543 {
   3544 	stdata_t 	*stp = STREAM(q);
   3545 	pollhead_t 	*pl;
   3546 
   3547 	mutex_enter(&stp->sd_lock);
   3548 	pl = &stp->sd_pollist;
   3549 	if (flag & QWANTWSYNC) {
   3550 		ASSERT(!(q->q_flag & QREADR));
   3551 		if (stp->sd_flag & WSLEEP) {
   3552 			stp->sd_flag &= ~WSLEEP;
   3553 			cv_broadcast(&stp->sd_wrq->q_wait);
   3554 		} else {
   3555 			stp->sd_wakeq |= WSLEEP;
   3556 		}
   3557 
   3558 		mutex_exit(&stp->sd_lock);
   3559 		pollwakeup(pl, POLLWRNORM);
   3560 		mutex_enter(&stp->sd_lock);
   3561 
   3562 		if (stp->sd_sigflags & S_WRNORM)
   3563 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
   3564 	} else if (flag & QWANTR) {
   3565 		if (stp->sd_flag & RSLEEP) {
   3566 			stp->sd_flag &= ~RSLEEP;
   3567 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
   3568 		} else {
   3569 			stp->sd_wakeq |= RSLEEP;
   3570 		}
   3571 
   3572 		mutex_exit(&stp->sd_lock);
   3573 		pollwakeup(pl, POLLIN | POLLRDNORM);
   3574 		mutex_enter(&stp->sd_lock);
   3575 
   3576 		{
   3577 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
   3578 
   3579 			if (events)
   3580 				strsendsig(stp->sd_siglist, events, 0, 0);
   3581 		}
   3582 	} else {
   3583 		if (stp->sd_flag & WSLEEP) {
   3584 			stp->sd_flag &= ~WSLEEP;
   3585 			cv_broadcast(&stp->sd_wrq->q_wait);
   3586 		}
   3587 
   3588 		mutex_exit(&stp->sd_lock);
   3589 		pollwakeup(pl, POLLWRNORM);
   3590 		mutex_enter(&stp->sd_lock);
   3591 
   3592 		if (stp->sd_sigflags & S_WRNORM)
   3593 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
   3594 	}
   3595 	mutex_exit(&stp->sd_lock);
   3596 }
   3597 
   3598 int
   3599 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
   3600 {
   3601 	stdata_t *stp = STREAM(q);
   3602 	int typ  = STRUIOT_STANDARD;
   3603 	uio_t	 *uiop = &dp->d_uio;
   3604 	dblk_t	 *dbp;
   3605 	ssize_t	 uiocnt;
   3606 	ssize_t	 cnt;
   3607 	unsigned char *ptr;
   3608 	ssize_t	 resid;
   3609 	int	 error = 0;
   3610 	on_trap_data_t otd;
   3611 	queue_t	*stwrq;
   3612 
   3613 	/*
   3614 	 * Plumbing may change while taking the type so store the
   3615 	 * queue in a temporary variable. It doesn't matter even
   3616 	 * if the we take the type from the previous plumbing,
   3617 	 * that's because if the plumbing has changed when we were
   3618 	 * holding the queue in a temporary variable, we can continue
   3619 	 * processing the message the way it would have been processed
   3620 	 * in the old plumbing, without any side effects but a bit
   3621 	 * extra processing for partial ip header checksum.
   3622 	 *
   3623 	 * This has been done to avoid holding the sd_lock which is
   3624 	 * very hot.
   3625 	 */
   3626 
   3627 	stwrq = stp->sd_struiowrq;
   3628 	if (stwrq)
   3629 		typ = stwrq->q_struiot;
   3630 
   3631 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
   3632 		dbp = mp->b_datap;
   3633 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
   3634 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
   3635 		cnt = MIN(uiocnt, uiop->uio_resid);
   3636 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
   3637 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
   3638 			/*
   3639 			 * Either this mblk has already been processed
   3640 			 * or there is no more room in this mblk (?).
   3641 			 */
   3642 			continue;
   3643 		}
   3644 		switch (typ) {
   3645 		case STRUIOT_STANDARD:
   3646 			if (noblock) {
   3647 				if (on_trap(&otd, OT_DATA_ACCESS)) {
   3648 					no_trap();
   3649 					error = EWOULDBLOCK;
   3650 					goto out;
   3651 				}
   3652 			}
   3653 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
   3654 				if (noblock)
   3655 					no_trap();
   3656 				goto out;
   3657 			}
   3658 			if (noblock)
   3659 				no_trap();
   3660 			break;
   3661 
   3662 		default:
   3663 			error = EIO;
   3664 			goto out;
   3665 		}
   3666 		dbp->db_struioflag |= STRUIO_DONE;
   3667 		dbp->db_cksumstuff += cnt;
   3668 	}
   3669 out:
   3670 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
   3671 		/*
   3672 		 * A fault has occured and some bytes were moved to the
   3673 		 * current mblk, the uio_t has already been updated by
   3674 		 * the appropriate uio routine, so also update the mblk
   3675 		 * to reflect this in case this same mblk chain is used
   3676 		 * again (after the fault has been handled).
   3677 		 */
   3678 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
   3679 		if (uiocnt >= resid)
   3680 			dbp->db_cksumstuff += resid;
   3681 	}
   3682 	return (error);
   3683 }
   3684 
   3685 /*
   3686  * Try to enter queue synchronously. Any attempt to enter a closing queue will
   3687  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
   3688  * that removeq() will not try to close the queue while a thread is inside the
   3689  * queue.
   3690  */
   3691 static boolean_t
   3692 rwnext_enter(queue_t *qp)
   3693 {
   3694 	mutex_enter(QLOCK(qp));
   3695 	if (qp->q_flag & QWCLOSE) {
   3696 		mutex_exit(QLOCK(qp));
   3697 		return (B_FALSE);
   3698 	}
   3699 	qp->q_rwcnt++;
   3700 	ASSERT(qp->q_rwcnt != 0);
   3701 	mutex_exit(QLOCK(qp));
   3702 	return (B_TRUE);
   3703 }
   3704 
   3705 /*
   3706  * Decrease the count of threads running in sync stream queue and wake up any
   3707  * threads blocked in removeq().
   3708  */
   3709 static void
   3710 rwnext_exit(queue_t *qp)
   3711 {
   3712 	mutex_enter(QLOCK(qp));
   3713 	qp->q_rwcnt--;
   3714 	if (qp->q_flag & QWANTRMQSYNC) {
   3715 		qp->q_flag &= ~QWANTRMQSYNC;
   3716 		cv_broadcast(&qp->q_wait);
   3717 	}
   3718 	mutex_exit(QLOCK(qp));
   3719 }
   3720 
   3721 /*
   3722  * The purpose of rwnext() is to call the rw procedure of the next
   3723  * (downstream) modules queue.
   3724  *
   3725  * treated as put entrypoint for perimeter syncronization.
   3726  *
   3727  * There's no need to grab sq_putlocks here (which only exist for CIPUT
   3728  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
   3729  * not matter if any regular put entrypoints have been already entered. We
   3730  * can't increment one of the sq_putcounts (instead of sq_count) because
   3731  * qwait_rw won't know which counter to decrement.
   3732  *
   3733  * It would be reasonable to add the lockless FASTPUT logic.
   3734  */
   3735 int
   3736 rwnext(queue_t *qp, struiod_t *dp)
   3737 {
   3738 	queue_t		*nqp;
   3739 	syncq_t		*sq;
   3740 	uint16_t	count;
   3741 	uint16_t	flags;
   3742 	struct qinit	*qi;
   3743 	int		(*proc)();
   3744 	struct stdata	*stp;
   3745 	int		isread;
   3746 	int		rval;
   3747 
   3748 	stp = STREAM(qp);
   3749 	/*
   3750 	 * Prevent q_next from changing by holding sd_lock until acquiring
   3751 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
   3752 	 * already have sd_lock acquired. In either case sd_lock is always
   3753 	 * released after acquiring SQLOCK.
   3754 	 *
   3755 	 * The streamhead read-side holding sd_lock when calling rwnext is
   3756 	 * required to prevent a race condition were M_DATA mblks flowing
   3757 	 * up the read-side of the stream could be bypassed by a rwnext()
   3758 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
   3759 	 */
   3760 	if ((nqp = _WR(qp)) == qp) {
   3761 		isread = 0;
   3762 		mutex_enter(&stp->sd_lock);
   3763 		qp = nqp->q_next;
   3764 	} else {
   3765 		isread = 1;
   3766 		if (nqp != stp->sd_wrq)
   3767 			/* Not streamhead */
   3768 			mutex_enter(&stp->sd_lock);
   3769 		qp = _RD(nqp->q_next);
   3770 	}
   3771 	qi = qp->q_qinfo;
   3772 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
   3773 		/*
   3774 		 * Not a synchronous module or no r/w procedure for this
   3775 		 * queue, so just return EINVAL and let the caller handle it.
   3776 		 */
   3777 		mutex_exit(&stp->sd_lock);
   3778 		return (EINVAL);
   3779 	}
   3780 
   3781 	if (rwnext_enter(qp) == B_FALSE) {
   3782 		mutex_exit(&stp->sd_lock);
   3783 		return (EINVAL);
   3784 	}
   3785 
   3786 	sq = qp->q_syncq;
   3787 	mutex_enter(SQLOCK(sq));
   3788 	mutex_exit(&stp->sd_lock);
   3789 	count = sq->sq_count;
   3790 	flags = sq->sq_flags;
   3791 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
   3792 
   3793 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
   3794 		/*
   3795 		 * if this queue is being closed, return.
   3796 		 */
   3797 		if (qp->q_flag & QWCLOSE) {
   3798 			mutex_exit(SQLOCK(sq));
   3799 			rwnext_exit(qp);
   3800 			return (EINVAL);
   3801 		}
   3802 
   3803 		/*
   3804 		 * Wait until we can enter the inner perimeter.
   3805 		 */
   3806 		sq->sq_flags = flags | SQ_WANTWAKEUP;
   3807 		cv_wait(&sq->sq_wait, SQLOCK(sq));
   3808 		count = sq->sq_count;
   3809 		flags = sq->sq_flags;
   3810 	}
   3811 
   3812 	if (isread == 0 && stp->sd_struiowrq == NULL ||
   3813 	    isread == 1 && stp->sd_struiordq == NULL) {
   3814 		/*
   3815 		 * Stream plumbing changed while waiting for inner perimeter
   3816 		 * so just return EINVAL and let the caller handle it.
   3817 		 */
   3818 		mutex_exit(SQLOCK(sq));
   3819 		rwnext_exit(qp);
   3820 		return (EINVAL);
   3821 	}
   3822 	if (!(flags & SQ_CIPUT))
   3823 		sq->sq_flags = flags | SQ_EXCL;
   3824 	sq->sq_count = count + 1;
   3825 	ASSERT(sq->sq_count != 0);		/* Wraparound */
   3826 	/*
   3827 	 * Note: The only message ordering guarantee that rwnext() makes is
   3828 	 *	 for the write queue flow-control case. All others (r/w queue
   3829 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
   3830 	 *	 the queue's rw procedure. This could be genralized here buy
   3831 	 *	 running the queue's service procedure, but that wouldn't be
   3832 	 *	 the most efficent for all cases.
   3833 	 */
   3834 	mutex_exit(SQLOCK(sq));
   3835 	if (! isread && (qp->q_flag & QFULL)) {
   3836 		/*
   3837 		 * Write queue may be flow controlled. If so,
   3838 		 * mark the queue for wakeup when it's not.
   3839 		 */
   3840 		mutex_enter(QLOCK(qp));
   3841 		if (qp->q_flag & QFULL) {
   3842 			qp->q_flag |= QWANTWSYNC;
   3843 			mutex_exit(QLOCK(qp));
   3844 			rval = EWOULDBLOCK;
   3845 			goto out;
   3846 		}
   3847 		mutex_exit(QLOCK(qp));
   3848 	}
   3849 
   3850 	if (! isread && dp->d_mp)
   3851 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
   3852 		    dp->d_mp->b_datap->db_base);
   3853 
   3854 	rval = (*proc)(qp, dp);
   3855 
   3856 	if (isread && dp->d_mp)
   3857 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
   3858 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
   3859 out:
   3860 	/*
   3861 	 * The queue is protected from being freed by sq_count, so it is
   3862 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
   3863 	 */
   3864 	rwnext_exit(qp);
   3865 
   3866 	mutex_enter(SQLOCK(sq));
   3867 	flags = sq->sq_flags;
   3868 	ASSERT(sq->sq_count != 0);
   3869 	sq->sq_count--;
   3870 	if (flags & SQ_TAIL) {
   3871 		putnext_tail(sq, qp, flags);
   3872 		/*
   3873 		 * The only purpose of this ASSERT is to preserve calling stack
   3874 		 * in DEBUG kernel.
   3875 		 */
   3876 		ASSERT(flags & SQ_TAIL);
   3877 		return (rval);
   3878 	}
   3879 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
   3880 	/*
   3881 	 * Safe to always drop SQ_EXCL:
   3882 	 *	Not SQ_CIPUT means we set SQ_EXCL above
   3883 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
   3884 	 *	did a qwriter(INNER) in which case nobody else
   3885 	 *	is in the inner perimeter and we are exiting.
   3886 	 *
   3887 	 * I would like to make the following assertion:
   3888 	 *
   3889 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
   3890 	 * 	sq->sq_count == 0);
   3891 	 *
   3892 	 * which indicates that if we are both putshared and exclusive,
   3893 	 * we became exclusive while executing the putproc, and the only
   3894 	 * claim on the syncq was the one we dropped a few lines above.
   3895 	 * But other threads that enter putnext while the syncq is exclusive
   3896 	 * need to make a claim as they may need to drop SQLOCK in the
   3897 	 * has_writers case to avoid deadlocks.  If these threads are
   3898 	 * delayed or preempted, it is possible that the writer thread can
   3899 	 * find out that there are other claims making the (sq_count == 0)
   3900 	 * test invalid.
   3901 	 */
   3902 
   3903 	sq->sq_flags = flags & ~SQ_EXCL;
   3904 	if (sq->sq_flags & SQ_WANTWAKEUP) {
   3905 		sq->sq_flags &= ~SQ_WANTWAKEUP;
   3906 		cv_broadcast(&sq->sq_wait);
   3907 	}
   3908 	mutex_exit(SQLOCK(sq));
   3909 	return (rval);
   3910 }
   3911 
   3912 /*
   3913  * The purpose of infonext() is to call the info procedure of the next
   3914  * (downstream) modules queue.
   3915  *
   3916  * treated as put entrypoint for perimeter syncronization.
   3917  *
   3918  * There's no need to grab sq_putlocks here (which only exist for CIPUT
   3919  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
   3920  * it does not matter if any regular put entrypoints have been already
   3921  * entered.
   3922  */
   3923 int
   3924 infonext(queue_t *qp, infod_t *idp)
   3925 {
   3926 	queue_t		*nqp;
   3927 	syncq_t		*sq;
   3928 	uint16_t	count;
   3929 	uint16_t 	flags;
   3930 	struct qinit	*qi;
   3931 	int		(*proc)();
   3932 	struct stdata	*stp;
   3933 	int		rval;
   3934 
   3935 	stp = STREAM(qp);
   3936 	/*
   3937 	 * Prevent q_next from changing by holding sd_lock until
   3938 	 * acquiring SQLOCK.
   3939 	 */
   3940 	mutex_enter(&stp->sd_lock);
   3941 	if ((nqp = _WR(qp)) == qp) {
   3942 		qp = nqp->q_next;
   3943 	} else {
   3944 		qp = _RD(nqp->q_next);
   3945 	}
   3946 	qi = qp->q_qinfo;
   3947 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
   3948 		mutex_exit(&stp->sd_lock);
   3949 		return (EINVAL);
   3950 	}
   3951 	sq = qp->q_syncq;
   3952 	mutex_enter(SQLOCK(sq));
   3953 	mutex_exit(&stp->sd_lock);
   3954 	count = sq->sq_count;
   3955 	flags = sq->sq_flags;
   3956 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
   3957 
   3958 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
   3959 		/*
   3960 		 * Wait until we can enter the inner perimeter.
   3961 		 */
   3962 		sq->sq_flags = flags | SQ_WANTWAKEUP;
   3963 		cv_wait(&sq->sq_wait, SQLOCK(sq));
   3964 		count = sq->sq_count;
   3965 		flags = sq->sq_flags;
   3966 	}
   3967 
   3968 	if (! (flags & SQ_CIPUT))
   3969 		sq->sq_flags = flags | SQ_EXCL;
   3970 	sq->sq_count = count + 1;
   3971 	ASSERT(sq->sq_count != 0);		/* Wraparound */
   3972 	mutex_exit(SQLOCK(sq));
   3973 
   3974 	rval = (*proc)(qp, idp);
   3975 
   3976 	mutex_enter(SQLOCK(sq));
   3977 	flags = sq->sq_flags;
   3978 	ASSERT(sq->sq_count != 0);
   3979 	sq->sq_count--;
   3980 	if (flags & SQ_TAIL) {
   3981 		putnext_tail(sq, qp, flags);
   3982 		/*
   3983 		 * The only purpose of this ASSERT is to preserve calling stack
   3984 		 * in DEBUG kernel.
   3985 		 */
   3986 		ASSERT(flags & SQ_TAIL);
   3987 		return (rval);
   3988 	}
   3989 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
   3990 /*
   3991  * XXXX
   3992  * I am not certain the next comment is correct here.  I need to consider
   3993  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
   3994  * might cause other problems.  It just might be safer to drop it if
   3995  * !SQ_CIPUT because that is when we set it.
   3996  */
   3997 	/*
   3998 	 * Safe to always drop SQ_EXCL:
   3999 	 *	Not SQ_CIPUT means we set SQ_EXCL above
   4000 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
   4001 	 *	did a qwriter(INNER) in which case nobody else
   4002 	 *	is in the inner perimeter and we are exiting.
   4003 	 *
   4004 	 * I would like to make the following assertion:
   4005 	 *
   4006 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
   4007 	 *	sq->sq_count == 0);
   4008 	 *
   4009 	 * which indicates that if we are both putshared and exclusive,
   4010 	 * we became exclusive while executing the putproc, and the only
   4011 	 * claim on the syncq was the one we dropped a few lines above.
   4012 	 * But other threads that enter putnext while the syncq is exclusive
   4013 	 * need to make a claim as they may need to drop SQLOCK in the
   4014 	 * has_writers case to avoid deadlocks.  If these threads are
   4015 	 * delayed or preempted, it is possible that the writer thread can
   4016 	 * find out that there are other claims making the (sq_count == 0)
   4017 	 * test invalid.
   4018 	 */
   4019 
   4020 	sq->sq_flags = flags & ~SQ_EXCL;
   4021 	mutex_exit(SQLOCK(sq));
   4022 	return (rval);
   4023 }
   4024 
   4025 /*
   4026  * Return nonzero if the queue is responsible for struio(), else return 0.
   4027  */
   4028 int
   4029 isuioq(queue_t *q)
   4030 {
   4031 	if (q->q_flag & QREADR)
   4032 		return (STREAM(q)->sd_struiordq == q);
   4033 	else
   4034 		return (STREAM(q)->sd_struiowrq == q);
   4035 }
   4036 
   4037 #if defined(__sparc)
   4038 int disable_putlocks = 0;
   4039 #else
   4040 int disable_putlocks = 1;
   4041 #endif
   4042 
   4043 /*
   4044  * called by create_putlock.
   4045  */
   4046 static void
   4047 create_syncq_putlocks(queue_t *q)
   4048 {
   4049 	syncq_t	*sq = q->q_syncq;
   4050 	ciputctrl_t *cip;
   4051 	int i;
   4052 
   4053 	ASSERT(sq != NULL);
   4054 
   4055 	ASSERT(disable_putlocks == 0);
   4056 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
   4057 	ASSERT(ciputctrl_cache != NULL);
   4058 
   4059 	if (!(sq->sq_type & SQ_CIPUT))
   4060 		return;
   4061 
   4062 	for (i = 0; i <= 1; i++) {
   4063 		if (sq->sq_ciputctrl == NULL) {
   4064 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
   4065 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
   4066 			mutex_enter(SQLOCK(sq));
   4067 			if (sq->sq_ciputctrl != NULL) {
   4068 				mutex_exit(SQLOCK(sq));
   4069 				kmem_cache_free(ciputctrl_cache, cip);
   4070 			} else {
   4071 				ASSERT(sq-