Home | History | Annotate | Download | only in rpc
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 /* Copyright (c) 1990 Mentat Inc. */
     26 
     27 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
     28 /*	  All Rights Reserved  	*/
     29 
     30 #pragma ident	"@(#)rpcmod.c	1.115	07/12/05 SMI"
     31 
     32 /*
     33  * Kernel RPC filtering module
     34  */
     35 
     36 #include <sys/param.h>
     37 #include <sys/types.h>
     38 #include <sys/stream.h>
     39 #include <sys/stropts.h>
     40 #include <sys/tihdr.h>
     41 #include <sys/timod.h>
     42 #include <sys/tiuser.h>
     43 #include <sys/debug.h>
     44 #include <sys/signal.h>
     45 #include <sys/pcb.h>
     46 #include <sys/user.h>
     47 #include <sys/errno.h>
     48 #include <sys/cred.h>
     49 #include <sys/policy.h>
     50 #include <sys/inline.h>
     51 #include <sys/cmn_err.h>
     52 #include <sys/kmem.h>
     53 #include <sys/file.h>
     54 #include <sys/sysmacros.h>
     55 #include <sys/systm.h>
     56 #include <sys/t_lock.h>
     57 #include <sys/ddi.h>
     58 #include <sys/vtrace.h>
     59 #include <sys/callb.h>
     60 #include <sys/strsun.h>
     61 
     62 #include <sys/strlog.h>
     63 #include <rpc/rpc_com.h>
     64 #include <inet/common.h>
     65 #include <rpc/types.h>
     66 #include <sys/time.h>
     67 #include <rpc/xdr.h>
     68 #include <rpc/auth.h>
     69 #include <rpc/clnt.h>
     70 #include <rpc/rpc_msg.h>
     71 #include <rpc/clnt.h>
     72 #include <rpc/svc.h>
     73 #include <rpc/rpcsys.h>
     74 #include <rpc/rpc_rdma.h>
     75 
     76 /*
     77  * This is the loadable module wrapper.
     78  */
     79 #include <sys/conf.h>
     80 #include <sys/modctl.h>
     81 #include <sys/syscall.h>
     82 
     83 extern struct streamtab rpcinfo;
     84 
     85 static struct fmodsw fsw = {
     86 	"rpcmod",
     87 	&rpcinfo,
     88 	D_NEW|D_MP,
     89 };
     90 
     91 /*
     92  * Module linkage information for the kernel.
     93  */
     94 
     95 static struct modlstrmod modlstrmod = {
     96 	&mod_strmodops, "rpc interface str mod", &fsw
     97 };
     98 
     99 /*
    100  * For the RPC system call.
    101  */
    102 static struct sysent rpcsysent = {
    103 	2,
    104 	SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD,
    105 	rpcsys
    106 };
    107 
    108 static struct modlsys modlsys = {
    109 	&mod_syscallops,
    110 	"RPC syscall",
    111 	&rpcsysent
    112 };
    113 
    114 #ifdef _SYSCALL32_IMPL
    115 static struct modlsys modlsys32 = {
    116 	&mod_syscallops32,
    117 	"32-bit RPC syscall",
    118 	&rpcsysent
    119 };
    120 #endif /* _SYSCALL32_IMPL */
    121 
    122 static struct modlinkage modlinkage = {
    123 	MODREV_1,
    124 	{
    125 		&modlsys,
    126 #ifdef _SYSCALL32_IMPL
    127 		&modlsys32,
    128 #endif
    129 		&modlstrmod,
    130 		NULL
    131 	}
    132 };
    133 
    134 int
    135 _init(void)
    136 {
    137 	int error = 0;
    138 	callb_id_t cid;
    139 	int status;
    140 
    141 	svc_init();
    142 	clnt_init();
    143 	cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc");
    144 
    145 	if (error = mod_install(&modlinkage)) {
    146 		/*
    147 		 * Could not install module, cleanup previous
    148 		 * initialization work.
    149 		 */
    150 		clnt_fini();
    151 		if (cid != NULL)
    152 			(void) callb_delete(cid);
    153 
    154 		return (error);
    155 	}
    156 
    157 	/*
    158 	 * Load up the RDMA plugins and initialize the stats. Even if the
    159 	 * plugins loadup fails, but rpcmod was successfully installed the
    160 	 * counters still get initialized.
    161 	 */
    162 	rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL);
    163 	mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL);
    164 	mt_kstat_init();
    165 
    166 	/*
    167 	 * Get our identification into ldi.  This is used for loading
    168 	 * other modules, e.g. rpcib.
    169 	 */
    170 	status = ldi_ident_from_mod(&modlinkage, &rpcmod_li);
    171 	if (status != 0) {
    172 		cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status);
    173 		rpcmod_li = NULL;
    174 	}
    175 
    176 	return (error);
    177 }
    178 
    179 /*
    180  * The unload entry point fails, because we advertise entry points into
    181  * rpcmod from the rest of kRPC: rpcmod_release().
    182  */
    183 int
    184 _fini(void)
    185 {
    186 	return (EBUSY);
    187 }
    188 
    189 int
    190 _info(struct modinfo *modinfop)
    191 {
    192 	return (mod_info(&modlinkage, modinfop));
    193 }
    194 
    195 extern int nulldev();
    196 
    197 #define	RPCMOD_ID	2049
    198 
    199 int rmm_open(), rmm_close();
    200 
    201 /*
    202  * To save instructions, since STREAMS ignores the return value
    203  * from these functions, they are defined as void here. Kind of icky, but...
    204  */
    205 void rmm_rput(queue_t *, mblk_t *);
    206 void rmm_wput(queue_t *, mblk_t *);
    207 void rmm_rsrv(queue_t *);
    208 void rmm_wsrv(queue_t *);
    209 
    210 int rpcmodopen(), rpcmodclose();
    211 void rpcmodrput(), rpcmodwput();
    212 void rpcmodrsrv(), rpcmodwsrv();
    213 
    214 static	void	rpcmodwput_other(queue_t *, mblk_t *);
    215 static	int	mir_close(queue_t *q);
    216 static	int	mir_open(queue_t *q, dev_t *devp, int flag, int sflag,
    217 		    cred_t *credp);
    218 static	void	mir_rput(queue_t *q, mblk_t *mp);
    219 static	void	mir_rsrv(queue_t *q);
    220 static	void	mir_wput(queue_t *q, mblk_t *mp);
    221 static	void	mir_wsrv(queue_t *q);
    222 
    223 static struct module_info rpcmod_info =
    224 	{RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
    225 
    226 /*
    227  * Read side has no service procedure.
    228  */
    229 static struct qinit rpcmodrinit = {
    230 	(int (*)())rmm_rput,
    231 	(int (*)())rmm_rsrv,
    232 	rmm_open,
    233 	rmm_close,
    234 	nulldev,
    235 	&rpcmod_info,
    236 	NULL
    237 };
    238 
    239 /*
    240  * The write put procedure is simply putnext to conserve stack space.
    241  * The write service procedure is not used to queue data, but instead to
    242  * synchronize with flow control.
    243  */
    244 static struct qinit rpcmodwinit = {
    245 	(int (*)())rmm_wput,
    246 	(int (*)())rmm_wsrv,
    247 	rmm_open,
    248 	rmm_close,
    249 	nulldev,
    250 	&rpcmod_info,
    251 	NULL
    252 };
    253 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL };
    254 
    255 struct xprt_style_ops {
    256 	int (*xo_open)();
    257 	int (*xo_close)();
    258 	void (*xo_wput)();
    259 	void (*xo_wsrv)();
    260 	void (*xo_rput)();
    261 	void (*xo_rsrv)();
    262 };
    263 
    264 static struct xprt_style_ops xprt_clts_ops = {
    265 	rpcmodopen,
    266 	rpcmodclose,
    267 	rpcmodwput,
    268 	rpcmodwsrv,
    269 	rpcmodrput,
    270 	NULL
    271 };
    272 
    273 static struct xprt_style_ops xprt_cots_ops = {
    274 	mir_open,
    275 	mir_close,
    276 	mir_wput,
    277 	mir_wsrv,
    278 	mir_rput,
    279 	mir_rsrv
    280 };
    281 
    282 /*
    283  * Per rpcmod "slot" data structure. q->q_ptr points to one of these.
    284  */
    285 struct rpcm {
    286 	void		*rm_krpc_cell;	/* Reserved for use by KRPC */
    287 	struct		xprt_style_ops	*rm_ops;
    288 	int		rm_type;	/* Client or server side stream */
    289 #define	RM_CLOSING	0x1		/* somebody is trying to close slot */
    290 	uint_t		rm_state;	/* state of the slot. see above */
    291 	uint_t		rm_ref;		/* cnt of external references to slot */
    292 	kmutex_t	rm_lock;	/* mutex protecting above fields */
    293 	kcondvar_t	rm_cwait;	/* condition for closing */
    294 	zoneid_t	rm_zoneid;	/* zone which pushed rpcmod */
    295 };
    296 
    297 struct temp_slot {
    298 	void *cell;
    299 	struct xprt_style_ops *ops;
    300 	int type;
    301 	mblk_t *info_ack;
    302 	kmutex_t lock;
    303 	kcondvar_t wait;
    304 };
    305 
    306 typedef struct mir_s {
    307 	void	*mir_krpc_cell;	/* Reserved for KRPC use. This field */
    308 					/* must be first in the structure. */
    309 	struct xprt_style_ops	*rm_ops;
    310 	int	mir_type;		/* Client or server side stream */
    311 
    312 	mblk_t	*mir_head_mp;		/* RPC msg in progress */
    313 		/*
    314 		 * mir_head_mp points the first mblk being collected in
    315 		 * the current RPC message.  Record headers are removed
    316 		 * before data is linked into mir_head_mp.
    317 		 */
    318 	mblk_t	*mir_tail_mp;		/* Last mblk in mir_head_mp */
    319 		/*
    320 		 * mir_tail_mp points to the last mblk in the message
    321 		 * chain starting at mir_head_mp.  It is only valid
    322 		 * if mir_head_mp is non-NULL and is used to add new
    323 		 * data blocks to the end of chain quickly.
    324 		 */
    325 
    326 	int32_t	mir_frag_len;		/* Bytes seen in the current frag */
    327 		/*
    328 		 * mir_frag_len starts at -4 for beginning of each fragment.
    329 		 * When this length is negative, it indicates the number of
    330 		 * bytes that rpcmod needs to complete the record marker
    331 		 * header.  When it is positive or zero, it holds the number
    332 		 * of bytes that have arrived for the current fragment and
    333 		 * are held in mir_header_mp.
    334 		 */
    335 
    336 	int32_t	mir_frag_header;
    337 		/*
    338 		 * Fragment header as collected for the current fragment.
    339 		 * It holds the last-fragment indicator and the number
    340 		 * of bytes in the fragment.
    341 		 */
    342 
    343 	unsigned int
    344 		mir_ordrel_pending : 1,	/* Sent T_ORDREL_REQ */
    345 		mir_hold_inbound : 1,	/* Hold inbound messages on server */
    346 					/* side until outbound flow control */
    347 					/* is relieved. */
    348 		mir_closing : 1,	/* The stream is being closed */
    349 		mir_inrservice : 1,	/* data queued or rd srv proc running */
    350 		mir_inwservice : 1,	/* data queued or wr srv proc running */
    351 		mir_inwflushdata : 1,	/* flush M_DATAs when srv runs */
    352 		/*
    353 		 * On client streams, mir_clntreq is 0 or 1; it is set
    354 		 * to 1 whenever a new request is sent out (mir_wput)
    355 		 * and cleared when the timer fires (mir_timer).  If
    356 		 * the timer fires with this value equal to 0, then the
    357 		 * stream is considered idle and KRPC is notified.
    358 		 */
    359 		mir_clntreq : 1,
    360 		/*
    361 		 * On server streams, stop accepting messages
    362 		 */
    363 		mir_svc_no_more_msgs : 1,
    364 		mir_listen_stream : 1,	/* listen end point */
    365 		mir_unused : 1,	/* no longer used */
    366 		mir_timer_call : 1,
    367 		mir_junk_fill_thru_bit_31 : 21;
    368 
    369 	int	mir_setup_complete;	/* server has initialized everything */
    370 	timeout_id_t mir_timer_id;	/* Timer for idle checks */
    371 	clock_t	mir_idle_timeout;	/* Allowed idle time before shutdown */
    372 		/*
    373 		 * This value is copied from clnt_idle_timeout or
    374 		 * svc_idle_timeout during the appropriate ioctl.
    375 		 * Kept in milliseconds
    376 		 */
    377 	clock_t	mir_use_timestamp;	/* updated on client with each use */
    378 		/*
    379 		 * This value is set to lbolt
    380 		 * every time a client stream sends or receives data.
    381 		 * Even if the timer message arrives, we don't shutdown
    382 		 * client unless:
    383 		 *    lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp.
    384 		 * This value is kept in HZ.
    385 		 */
    386 
    387 	uint_t	*mir_max_msg_sizep;	/* Reference to sanity check size */
    388 		/*
    389 		 * This pointer is set to &clnt_max_msg_size or
    390 		 * &svc_max_msg_size during the appropriate ioctl.
    391 		 */
    392 	zoneid_t mir_zoneid;	/* zone which pushed rpcmod */
    393 	/* Server-side fields. */
    394 	int	mir_ref_cnt;		/* Reference count: server side only */
    395 					/* counts the number of references */
    396 					/* that a kernel RPC server thread */
    397 					/* (see svc_run()) has on this rpcmod */
    398 					/* slot. Effectively, it is the */
    399 					/* number * of unprocessed messages */
    400 					/* that have been passed up to the */
    401 					/* KRPC layer */
    402 
    403 	mblk_t	*mir_svc_pend_mp;	/* Pending T_ORDREL_IND or */
    404 					/* T_DISCON_IND */
    405 
    406 	/*
    407 	 * these fields are for both client and server, but for debugging,
    408 	 * it is easier to have these last in the structure.
    409 	 */
    410 	kmutex_t	mir_mutex;	/* Mutex and condvar for close */
    411 	kcondvar_t	mir_condvar;	/* synchronization. */
    412 	kcondvar_t	mir_timer_cv;	/* Timer routine sync. */
    413 } mir_t;
    414 
    415 void tmp_rput(queue_t *q, mblk_t *mp);
    416 
    417 struct xprt_style_ops tmpops = {
    418 	NULL,
    419 	NULL,
    420 	putnext,
    421 	NULL,
    422 	tmp_rput,
    423 	NULL
    424 };
    425 
    426 void
    427 tmp_rput(queue_t *q, mblk_t *mp)
    428 {
    429 	struct temp_slot *t = (struct temp_slot *)(q->q_ptr);
    430 	struct T_info_ack *pptr;
    431 
    432 	switch (mp->b_datap->db_type) {
    433 	case M_PCPROTO:
    434 		pptr = (struct T_info_ack *)mp->b_rptr;
    435 		switch (pptr->PRIM_type) {
    436 		case T_INFO_ACK:
    437 			mutex_enter(&t->lock);
    438 			t->info_ack = mp;
    439 			cv_signal(&t->wait);
    440 			mutex_exit(&t->lock);
    441 			return;
    442 		default:
    443 			break;
    444 		}
    445 	default:
    446 		break;
    447 	}
    448 
    449 	/*
    450 	 * Not an info-ack, so free it. This is ok because we should
    451 	 * not be receiving data until the open finishes: rpcmod
    452 	 * is pushed well before the end-point is bound to an address.
    453 	 */
    454 	freemsg(mp);
    455 }
    456 
    457 int
    458 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
    459 {
    460 	mblk_t *bp;
    461 	struct temp_slot ts, *t;
    462 	struct T_info_ack *pptr;
    463 	int error = 0;
    464 
    465 	ASSERT(q != NULL);
    466 	/*
    467 	 * Check for re-opens.
    468 	 */
    469 	if (q->q_ptr) {
    470 		TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END,
    471 		    "rpcmodopen_end:(%s)", "q->qptr");
    472 		return (0);
    473 	}
    474 
    475 	t = &ts;
    476 	bzero(t, sizeof (*t));
    477 	q->q_ptr = (void *)t;
    478 	WR(q)->q_ptr = (void *)t;
    479 
    480 	/*
    481 	 * Allocate the required messages upfront.
    482 	 */
    483 	if ((bp = allocb(sizeof (struct T_info_req) +
    484 	    sizeof (struct T_info_ack), BPRI_LO)) == (mblk_t *)NULL) {
    485 		return (ENOBUFS);
    486 	}
    487 
    488 	mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL);
    489 	cv_init(&t->wait, NULL, CV_DEFAULT, NULL);
    490 
    491 	t->ops = &tmpops;
    492 
    493 	qprocson(q);
    494 	bp->b_datap->db_type = M_PCPROTO;
    495 	*(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ;
    496 	bp->b_wptr += sizeof (struct T_info_req);
    497 	putnext(WR(q), bp);
    498 
    499 	mutex_enter(&t->lock);
    500 	while (t->info_ack == NULL) {
    501 		if (cv_wait_sig(&t->wait, &t->lock) == 0) {
    502 			error = EINTR;
    503 			break;
    504 		}
    505 	}
    506 	mutex_exit(&t->lock);
    507 
    508 	if (error)
    509 		goto out;
    510 
    511 	pptr = (struct T_info_ack *)t->info_ack->b_rptr;
    512 
    513 	if (pptr->SERV_type == T_CLTS) {
    514 		if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0)
    515 			((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops;
    516 	} else {
    517 		if ((error = mir_open(q, devp, flag, sflag, crp)) == 0)
    518 			((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops;
    519 	}
    520 
    521 out:
    522 	if (error)
    523 		qprocsoff(q);
    524 
    525 	freemsg(t->info_ack);
    526 	mutex_destroy(&t->lock);
    527 	cv_destroy(&t->wait);
    528 
    529 	return (error);
    530 }
    531 
    532 void
    533 rmm_rput(queue_t *q, mblk_t  *mp)
    534 {
    535 	(*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp);
    536 }
    537 
    538 void
    539 rmm_rsrv(queue_t *q)
    540 {
    541 	(*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q);
    542 }
    543 
    544 void
    545 rmm_wput(queue_t *q, mblk_t *mp)
    546 {
    547 	(*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp);
    548 }
    549 
    550 void
    551 rmm_wsrv(queue_t *q)
    552 {
    553 	(*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q);
    554 }
    555 
    556 int
    557 rmm_close(queue_t *q, int flag, cred_t *crp)
    558 {
    559 	return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
    560 }
    561 
    562 /*
    563  * rpcmodopen -	open routine gets called when the module gets pushed
    564  *		onto the stream.
    565  */
    566 /*ARGSUSED*/
    567 int
    568 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
    569 {
    570 	struct rpcm *rmp;
    571 
    572 	extern void (*rpc_rele)(queue_t *, mblk_t *);
    573 	static void rpcmod_release(queue_t *, mblk_t *);
    574 
    575 	TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
    576 
    577 	/*
    578 	 * Initialize entry points to release a rpcmod slot (and an input
    579 	 * message if supplied) and to send an output message to the module
    580 	 * below rpcmod.
    581 	 */
    582 	if (rpc_rele == NULL)
    583 		rpc_rele = rpcmod_release;
    584 
    585 	/*
    586 	 * Only sufficiently privileged users can use this module, and it
    587 	 * is assumed that they will use this module properly, and NOT send
    588 	 * bulk data from downstream.
    589 	 */
    590 	if (secpolicy_rpcmod_open(crp) != 0)
    591 		return (EPERM);
    592 
    593 	/*
    594 	 * Allocate slot data structure.
    595 	 */
    596 	rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP);
    597 
    598 	mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL);
    599 	cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL);
    600 	rmp->rm_zoneid = rpc_zoneid();
    601 	/*
    602 	 * slot type will be set by kRPC client and server ioctl's
    603 	 */
    604 	rmp->rm_type = 0;
    605 
    606 	q->q_ptr = (void *)rmp;
    607 	WR(q)->q_ptr = (void *)rmp;
    608 
    609 	TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end");
    610 	return (0);
    611 }
    612 
    613 /*
    614  * rpcmodclose - This routine gets called when the module gets popped
    615  * off of the stream.
    616  */
    617 /*ARGSUSED*/
    618 int
    619 rpcmodclose(queue_t *q, int flag, cred_t *crp)
    620 {
    621 	struct rpcm *rmp;
    622 
    623 	ASSERT(q != NULL);
    624 	rmp = (struct rpcm *)q->q_ptr;
    625 
    626 	/*
    627 	 * Mark our state as closing.
    628 	 */
    629 	mutex_enter(&rmp->rm_lock);
    630 	rmp->rm_state |= RM_CLOSING;
    631 
    632 	/*
    633 	 * Check and see if there are any messages on the queue.  If so, send
    634 	 * the messages, regardless whether the downstream module is ready to
    635 	 * accept data.
    636 	 */
    637 	if (rmp->rm_type == RPC_SERVER) {
    638 		flushq(q, FLUSHDATA);
    639 
    640 		qenable(WR(q));
    641 
    642 		if (rmp->rm_ref) {
    643 			mutex_exit(&rmp->rm_lock);
    644 			/*
    645 			 * call into SVC to clean the queue
    646 			 */
    647 			svc_queueclean(q);
    648 			mutex_enter(&rmp->rm_lock);
    649 
    650 			/*
    651 			 * Block while there are kRPC threads with a reference
    652 			 * to this message.
    653 			 */
    654 			while (rmp->rm_ref)
    655 				cv_wait(&rmp->rm_cwait, &rmp->rm_lock);
    656 		}
    657 
    658 		mutex_exit(&rmp->rm_lock);
    659 
    660 		/*
    661 		 * It is now safe to remove this queue from the stream. No kRPC
    662 		 * threads have a reference to the stream, and none ever will,
    663 		 * because RM_CLOSING is set.
    664 		 */
    665 		qprocsoff(q);
    666 
    667 		/* Notify kRPC that this stream is going away. */
    668 		svc_queueclose(q);
    669 	} else {
    670 		mutex_exit(&rmp->rm_lock);
    671 		qprocsoff(q);
    672 	}
    673 
    674 	q->q_ptr = NULL;
    675 	WR(q)->q_ptr = NULL;
    676 	mutex_destroy(&rmp->rm_lock);
    677 	cv_destroy(&rmp->rm_cwait);
    678 	kmem_free(rmp, sizeof (*rmp));
    679 	return (0);
    680 }
    681 
    682 #ifdef	DEBUG
    683 int	rpcmod_send_msg_up = 0;
    684 int	rpcmod_send_uderr = 0;
    685 int	rpcmod_send_dup = 0;
    686 int	rpcmod_send_dup_cnt = 0;
    687 #endif
    688 
    689 /*
    690  * rpcmodrput -	Module read put procedure.  This is called from
    691  *		the module, driver, or stream head downstream.
    692  */
    693 void
    694 rpcmodrput(queue_t *q, mblk_t *mp)
    695 {
    696 	struct rpcm *rmp;
    697 	union T_primitives *pptr;
    698 	int hdrsz;
    699 
    700 	TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:");
    701 
    702 	ASSERT(q != NULL);
    703 	rmp = (struct rpcm *)q->q_ptr;
    704 
    705 	if (rmp->rm_type == 0) {
    706 		freemsg(mp);
    707 		return;
    708 	}
    709 
    710 #ifdef DEBUG
    711 	if (rpcmod_send_msg_up > 0) {
    712 		mblk_t *nmp = copymsg(mp);
    713 		if (nmp) {
    714 			putnext(q, nmp);
    715 			rpcmod_send_msg_up--;
    716 		}
    717 	}
    718 	if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) {
    719 		mblk_t *nmp;
    720 		struct T_unitdata_ind *data;
    721 		struct T_uderror_ind *ud;
    722 		int d;
    723 		data = (struct T_unitdata_ind *)mp->b_rptr;
    724 		if (data->PRIM_type == T_UNITDATA_IND) {
    725 			d = sizeof (*ud) - sizeof (*data);
    726 			nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI);
    727 			if (nmp) {
    728 				ud = (struct T_uderror_ind *)nmp->b_rptr;
    729 				ud->PRIM_type = T_UDERROR_IND;
    730 				ud->DEST_length = data->SRC_length;
    731 				ud->DEST_offset = data->SRC_offset + d;
    732 				ud->OPT_length = data->OPT_length;
    733 				ud->OPT_offset = data->OPT_offset + d;
    734 				ud->ERROR_type = ENETDOWN;
    735 				if (data->SRC_length) {
    736 					bcopy(mp->b_rptr +
    737 					    data->SRC_offset,
    738 					    nmp->b_rptr +
    739 					    ud->DEST_offset,
    740 					    data->SRC_length);
    741 				}
    742 				if (data->OPT_length) {
    743 					bcopy(mp->b_rptr +
    744 					    data->OPT_offset,
    745 					    nmp->b_rptr +
    746 					    ud->OPT_offset,
    747 					    data->OPT_length);
    748 				}
    749 				nmp->b_wptr += d;
    750 				nmp->b_wptr += (mp->b_wptr - mp->b_rptr);
    751 				nmp->b_datap->db_type = M_PROTO;
    752 				putnext(q, nmp);
    753 				rpcmod_send_uderr--;
    754 			}
    755 		}
    756 	}
    757 #endif
    758 	switch (mp->b_datap->db_type) {
    759 	default:
    760 		putnext(q, mp);
    761 		break;
    762 
    763 	case M_PROTO:
    764 	case M_PCPROTO:
    765 		ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t));
    766 		pptr = (union T_primitives *)mp->b_rptr;
    767 
    768 		/*
    769 		 * Forward this message to krpc if it is data.
    770 		 */
    771 		if (pptr->type == T_UNITDATA_IND) {
    772 			mblk_t *nmp;
    773 
    774 		/*
    775 		 * Check if the module is being popped.
    776 		 */
    777 			mutex_enter(&rmp->rm_lock);
    778 			if (rmp->rm_state & RM_CLOSING) {
    779 				mutex_exit(&rmp->rm_lock);
    780 				putnext(q, mp);
    781 				break;
    782 			}
    783 
    784 			switch (rmp->rm_type) {
    785 			case RPC_CLIENT:
    786 				mutex_exit(&rmp->rm_lock);
    787 				hdrsz = mp->b_wptr - mp->b_rptr;
    788 
    789 				/*
    790 				 * Make sure the header is sane.
    791 				 */
    792 				if (hdrsz < TUNITDATAINDSZ ||
    793 				    hdrsz < (pptr->unitdata_ind.OPT_length +
    794 				    pptr->unitdata_ind.OPT_offset) ||
    795 				    hdrsz < (pptr->unitdata_ind.SRC_length +
    796 				    pptr->unitdata_ind.SRC_offset)) {
    797 					freemsg(mp);
    798 					return;
    799 				}
    800 
    801 				/*
    802 				 * Call clnt_clts_dispatch_notify, so that it
    803 				 * can pass the message to the proper caller.
    804 				 * Don't discard the header just yet since the
    805 				 * client may need the sender's address.
    806 				 */
    807 				clnt_clts_dispatch_notify(mp, hdrsz,
    808 				    rmp->rm_zoneid);
    809 				return;
    810 			case RPC_SERVER:
    811 				/*
    812 				 * rm_krpc_cell is exclusively used by the kRPC
    813 				 * CLTS server
    814 				 */
    815 				if (rmp->rm_krpc_cell) {
    816 #ifdef DEBUG
    817 					/*
    818 					 * Test duplicate request cache and
    819 					 * rm_ref count handling by sending a
    820 					 * duplicate every so often, if
    821 					 * desired.
    822 					 */
    823 					if (rpcmod_send_dup &&
    824 					    rpcmod_send_dup_cnt++ %
    825 					    rpcmod_send_dup)
    826 						nmp = copymsg(mp);
    827 					else
    828 						nmp = NULL;
    829 #endif
    830 					/*
    831 					 * Raise the reference count on this
    832 					 * module to prevent it from being
    833 					 * popped before krpc generates the
    834 					 * reply.
    835 					 */
    836 					rmp->rm_ref++;
    837 					mutex_exit(&rmp->rm_lock);
    838 
    839 					/*
    840 					 * Submit the message to krpc.
    841 					 */
    842 					svc_queuereq(q, mp);
    843 #ifdef DEBUG
    844 					/*
    845 					 * Send duplicate if we created one.
    846 					 */
    847 					if (nmp) {
    848 						mutex_enter(&rmp->rm_lock);
    849 						rmp->rm_ref++;
    850 						mutex_exit(&rmp->rm_lock);
    851 						svc_queuereq(q, nmp);
    852 					}
    853 #endif
    854 				} else {
    855 					mutex_exit(&rmp->rm_lock);
    856 					freemsg(mp);
    857 				}
    858 				return;
    859 			default:
    860 				mutex_exit(&rmp->rm_lock);
    861 				freemsg(mp);
    862 				return;
    863 			} /* end switch(rmp->rm_type) */
    864 		} else if (pptr->type == T_UDERROR_IND) {
    865 			mutex_enter(&rmp->rm_lock);
    866 			hdrsz = mp->b_wptr - mp->b_rptr;
    867 
    868 			/*
    869 			 * Make sure the header is sane
    870 			 */
    871 			if (hdrsz < TUDERRORINDSZ ||
    872 			    hdrsz < (pptr->uderror_ind.OPT_length +
    873 			    pptr->uderror_ind.OPT_offset) ||
    874 			    hdrsz < (pptr->uderror_ind.DEST_length +
    875 			    pptr->uderror_ind.DEST_offset)) {
    876 				mutex_exit(&rmp->rm_lock);
    877 				freemsg(mp);
    878 				return;
    879 			}
    880 
    881 			/*
    882 			 * In the case where a unit data error has been
    883 			 * received, all we need to do is clear the message from
    884 			 * the queue.
    885 			 */
    886 			mutex_exit(&rmp->rm_lock);
    887 			freemsg(mp);
    888 			RPCLOG(32, "rpcmodrput: unitdata error received at "
    889 			    "%ld\n", gethrestime_sec());
    890 			return;
    891 		} /* end else if (pptr->type == T_UDERROR_IND) */
    892 
    893 		putnext(q, mp);
    894 		break;
    895 	} /* end switch (mp->b_datap->db_type) */
    896 
    897 	TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END,
    898 	    "rpcmodrput_end:");
    899 	/*
    900 	 * Return codes are not looked at by the STREAMS framework.
    901 	 */
    902 }
    903 
    904 /*
    905  * write put procedure
    906  */
    907 void
    908 rpcmodwput(queue_t *q, mblk_t *mp)
    909 {
    910 	struct rpcm	*rmp;
    911 
    912 	ASSERT(q != NULL);
    913 
    914 	switch (mp->b_datap->db_type) {
    915 		case M_PROTO:
    916 		case M_PCPROTO:
    917 			break;
    918 		default:
    919 			rpcmodwput_other(q, mp);
    920 			return;
    921 	}
    922 
    923 	/*
    924 	 * Check to see if we can send the message downstream.
    925 	 */
    926 	if (canputnext(q)) {
    927 		putnext(q, mp);
    928 		return;
    929 	}
    930 
    931 	rmp = (struct rpcm *)q->q_ptr;
    932 	ASSERT(rmp != NULL);
    933 
    934 	/*
    935 	 * The first canputnext failed.  Try again except this time with the
    936 	 * lock held, so that we can check the state of the stream to see if
    937 	 * it is closing.  If either of these conditions evaluate to true
    938 	 * then send the meesage.
    939 	 */
    940 	mutex_enter(&rmp->rm_lock);
    941 	if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) {
    942 		mutex_exit(&rmp->rm_lock);
    943 		putnext(q, mp);
    944 	} else {
    945 		/*
    946 		 * canputnext failed again and the stream is not closing.
    947 		 * Place the message on the queue and let the service
    948 		 * procedure handle the message.
    949 		 */
    950 		mutex_exit(&rmp->rm_lock);
    951 		(void) putq(q, mp);
    952 	}
    953 }
    954 
    955 static void
    956 rpcmodwput_other(queue_t *q, mblk_t *mp)
    957 {
    958 	struct rpcm	*rmp;
    959 	struct iocblk	*iocp;
    960 
    961 	rmp = (struct rpcm *)q->q_ptr;
    962 	ASSERT(rmp != NULL);
    963 
    964 	switch (mp->b_datap->db_type) {
    965 		case M_IOCTL:
    966 			iocp = (struct iocblk *)mp->b_rptr;
    967 			ASSERT(iocp != NULL);
    968 			switch (iocp->ioc_cmd) {
    969 				case RPC_CLIENT:
    970 				case RPC_SERVER:
    971 					mutex_enter(&rmp->rm_lock);
    972 					rmp->rm_type = iocp->ioc_cmd;
    973 					mutex_exit(&rmp->rm_lock);
    974 					mp->b_datap->db_type = M_IOCACK;
    975 					qreply(q, mp);
    976 					return;
    977 				default:
    978 				/*
    979 				 * pass the ioctl downstream and hope someone
    980 				 * down there knows how to handle it.
    981 				 */
    982 					putnext(q, mp);
    983 					return;
    984 			}
    985 		default:
    986 			break;
    987 	}
    988 	/*
    989 	 * This is something we definitely do not know how to handle, just
    990 	 * pass the message downstream
    991 	 */
    992 	putnext(q, mp);
    993 }
    994 
    995 /*
    996  * Module write service procedure. This is called by downstream modules
    997  * for back enabling during flow control.
    998  */
    999 void
   1000 rpcmodwsrv(queue_t *q)
   1001 {
   1002 	struct rpcm	*rmp;
   1003 	mblk_t		*mp = NULL;
   1004 
   1005 	rmp = (struct rpcm *)q->q_ptr;
   1006 	ASSERT(rmp != NULL);
   1007 
   1008 	/*
   1009 	 * Get messages that may be queued and send them down stream
   1010 	 */
   1011 	while ((mp = getq(q)) != NULL) {
   1012 		/*
   1013 		 * Optimize the service procedure for the server-side, by
   1014 		 * avoiding a call to canputnext().
   1015 		 */
   1016 		if (rmp->rm_type == RPC_SERVER || canputnext(q)) {
   1017 			putnext(q, mp);
   1018 			continue;
   1019 		}
   1020 		(void) putbq(q, mp);
   1021 		return;
   1022 	}
   1023 }
   1024 
   1025 static void
   1026 rpcmod_release(queue_t *q, mblk_t *bp)
   1027 {
   1028 	struct rpcm *rmp;
   1029 
   1030 	/*
   1031 	 * For now, just free the message.
   1032 	 */
   1033 	if (bp)
   1034 		freemsg(bp);
   1035 	rmp = (struct rpcm *)q->q_ptr;
   1036 
   1037 	mutex_enter(&rmp->rm_lock);
   1038 	rmp->rm_ref--;
   1039 
   1040 	if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) {
   1041 		cv_broadcast(&rmp->rm_cwait);
   1042 	}
   1043 
   1044 	mutex_exit(&rmp->rm_lock);
   1045 }
   1046 
   1047 /*
   1048  * This part of rpcmod is pushed on a connection-oriented transport for use
   1049  * by RPC.  It serves to bypass the Stream head, implements
   1050  * the record marking protocol, and dispatches incoming RPC messages.
   1051  */
   1052 
   1053 /* Default idle timer values */
   1054 #define	MIR_CLNT_IDLE_TIMEOUT	(5 * (60 * 1000L))	/* 5 minutes */
   1055 #define	MIR_SVC_IDLE_TIMEOUT	(6 * (60 * 1000L))	/* 6 minutes */
   1056 #define	MIR_SVC_ORDREL_TIMEOUT	(10 * (60 * 1000L))	/* 10 minutes */
   1057 #define	MIR_LASTFRAG	0x80000000	/* Record marker */
   1058 
   1059 #define	DLEN(mp) (mp->b_cont ? msgdsize(mp) : (mp->b_wptr - mp->b_rptr))
   1060 
   1061 #define	MIR_SVC_QUIESCED(mir)	\
   1062 	(mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0)
   1063 
   1064 #define	MIR_CLEAR_INRSRV(mir_ptr)	{	\
   1065 	(mir_ptr)->mir_inrservice = 0;	\
   1066 	if ((mir_ptr)->mir_type == RPC_SERVER &&	\
   1067 		(mir_ptr)->mir_closing)	\
   1068 		cv_signal(&(mir_ptr)->mir_condvar);	\
   1069 }
   1070 
   1071 /*
   1072  * Don't block service procedure (and mir_close) if
   1073  * we are in the process of closing.
   1074  */
   1075 #define	MIR_WCANPUTNEXT(mir_ptr, write_q)	\
   1076 	(canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1))
   1077 
   1078 static int	mir_clnt_dup_request(queue_t *q, mblk_t *mp);
   1079 static void	mir_rput_proto(queue_t *q, mblk_t *mp);
   1080 static int	mir_svc_policy_notify(queue_t *q, int event);
   1081 static void	mir_svc_release(queue_t *wq, mblk_t *mp);
   1082 static void	mir_svc_start(queue_t *wq);
   1083 static void	mir_svc_idle_start(queue_t *, mir_t *);
   1084 static void	mir_svc_idle_stop(queue_t *, mir_t *);
   1085 static void	mir_svc_start_close(queue_t *, mir_t *);
   1086 static void	mir_clnt_idle_do_stop(queue_t *);
   1087 static void	mir_clnt_idle_stop(queue_t *, mir_t *);
   1088 static void	mir_clnt_idle_start(queue_t *, mir_t *);
   1089 static void	mir_wput(queue_t *q, mblk_t *mp);
   1090 static void	mir_wput_other(queue_t *q, mblk_t *mp);
   1091 static void	mir_wsrv(queue_t *q);
   1092 static	void	mir_disconnect(queue_t *, mir_t *ir);
   1093 static	int	mir_check_len(queue_t *, int32_t, mblk_t *);
   1094 static	void	mir_timer(void *);
   1095 
   1096 extern void	(*mir_rele)(queue_t *, mblk_t *);
   1097 extern void	(*mir_start)(queue_t *);
   1098 extern void	(*clnt_stop_idle)(queue_t *);
   1099 
   1100 clock_t	clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT;
   1101 clock_t	svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT;
   1102 
   1103 /*
   1104  * Timeout for subsequent notifications of idle connection.  This is
   1105  * typically used to clean up after a wedged orderly release.
   1106  */
   1107 clock_t	svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */
   1108 
   1109 extern	uint_t	*clnt_max_msg_sizep;
   1110 extern	uint_t	*svc_max_msg_sizep;
   1111 uint_t	clnt_max_msg_size = RPC_MAXDATASIZE;
   1112 uint_t	svc_max_msg_size = RPC_MAXDATASIZE;
   1113 uint_t	mir_krpc_cell_null;
   1114 
   1115 static void
   1116 mir_timer_stop(mir_t *mir)
   1117 {
   1118 	timeout_id_t tid;
   1119 
   1120 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   1121 
   1122 	/*
   1123 	 * Since the mir_mutex lock needs to be released to call
   1124 	 * untimeout(), we need to make sure that no other thread
   1125 	 * can start/stop the timer (changing mir_timer_id) during
   1126 	 * that time.  The mir_timer_call bit and the mir_timer_cv
   1127 	 * condition variable are used to synchronize this.  Setting
   1128 	 * mir_timer_call also tells mir_timer() (refer to the comments
   1129 	 * in mir_timer()) that it does not need to do anything.
   1130 	 */
   1131 	while (mir->mir_timer_call)
   1132 		cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
   1133 	mir->mir_timer_call = B_TRUE;
   1134 
   1135 	if ((tid = mir->mir_timer_id) != 0) {
   1136 		mir->mir_timer_id = 0;
   1137 		mutex_exit(&mir->mir_mutex);
   1138 		(void) untimeout(tid);
   1139 		mutex_enter(&mir->mir_mutex);
   1140 	}
   1141 	mir->mir_timer_call = B_FALSE;
   1142 	cv_broadcast(&mir->mir_timer_cv);
   1143 }
   1144 
   1145 static void
   1146 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl)
   1147 {
   1148 	timeout_id_t tid;
   1149 
   1150 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   1151 
   1152 	while (mir->mir_timer_call)
   1153 		cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
   1154 	mir->mir_timer_call = B_TRUE;
   1155 
   1156 	if ((tid = mir->mir_timer_id) != 0) {
   1157 		mutex_exit(&mir->mir_mutex);
   1158 		(void) untimeout(tid);
   1159 		mutex_enter(&mir->mir_mutex);
   1160 	}
   1161 	/* Only start the timer when it is not closing. */
   1162 	if (!mir->mir_closing) {
   1163 		mir->mir_timer_id = timeout(mir_timer, q,
   1164 		    MSEC_TO_TICK(intrvl));
   1165 	}
   1166 	mir->mir_timer_call = B_FALSE;
   1167 	cv_broadcast(&mir->mir_timer_cv);
   1168 }
   1169 
   1170 static int
   1171 mir_clnt_dup_request(queue_t *q, mblk_t *mp)
   1172 {
   1173 	mblk_t  *mp1;
   1174 	uint32_t  new_xid;
   1175 	uint32_t  old_xid;
   1176 
   1177 	ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex));
   1178 	new_xid = BE32_TO_U32(&mp->b_rptr[4]);
   1179 	/*
   1180 	 * This loop is a bit tacky -- it walks the STREAMS list of
   1181 	 * flow-controlled messages.
   1182 	 */
   1183 	if ((mp1 = q->q_first) != NULL) {
   1184 		do {
   1185 			old_xid = BE32_TO_U32(&mp1->b_rptr[4]);
   1186 			if (new_xid == old_xid)
   1187 				return (1);
   1188 		} while ((mp1 = mp1->b_next) != NULL);
   1189 	}
   1190 	return (0);
   1191 }
   1192 
   1193 static int
   1194 mir_close(queue_t *q)
   1195 {
   1196 	mir_t	*mir = q->q_ptr;
   1197 	mblk_t	*mp;
   1198 	bool_t queue_cleaned = FALSE;
   1199 
   1200 	RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q);
   1201 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   1202 	mutex_enter(&mir->mir_mutex);
   1203 	if ((mp = mir->mir_head_mp) != NULL) {
   1204 		mir->mir_head_mp = NULL;
   1205 		mir->mir_tail_mp = NULL;
   1206 		freemsg(mp);
   1207 	}
   1208 	/*
   1209 	 * Set mir_closing so we get notified when MIR_SVC_QUIESCED()
   1210 	 * is TRUE.  And mir_timer_start() won't start the timer again.
   1211 	 */
   1212 	mir->mir_closing = B_TRUE;
   1213 	mir_timer_stop(mir);
   1214 
   1215 	if (mir->mir_type == RPC_SERVER) {
   1216 		flushq(q, FLUSHDATA);	/* Ditch anything waiting on read q */
   1217 
   1218 		/*
   1219 		 * This will prevent more requests from arriving and
   1220 		 * will force rpcmod to ignore flow control.
   1221 		 */
   1222 		mir_svc_start_close(WR(q), mir);
   1223 
   1224 		while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) {
   1225 
   1226 			if (mir->mir_ref_cnt && !mir->mir_inrservice &&
   1227 			    (queue_cleaned == FALSE)) {
   1228 				/*
   1229 				 * call into SVC to clean the queue
   1230 				 */
   1231 				mutex_exit(&mir->mir_mutex);
   1232 				svc_queueclean(q);
   1233 				queue_cleaned = TRUE;
   1234 				mutex_enter(&mir->mir_mutex);
   1235 				continue;
   1236 			}
   1237 
   1238 			/*
   1239 			 * Bugid 1253810 - Force the write service
   1240 			 * procedure to send its messages, regardless
   1241 			 * whether the downstream  module is ready
   1242 			 * to accept data.
   1243 			 */
   1244 			if (mir->mir_inwservice == 1)
   1245 				qenable(WR(q));
   1246 
   1247 			cv_wait(&mir->mir_condvar, &mir->mir_mutex);
   1248 		}
   1249 
   1250 		mutex_exit(&mir->mir_mutex);
   1251 		qprocsoff(q);
   1252 
   1253 		/* Notify KRPC that this stream is going away. */
   1254 		svc_queueclose(q);
   1255 	} else {
   1256 		mutex_exit(&mir->mir_mutex);
   1257 		qprocsoff(q);
   1258 	}
   1259 
   1260 	mutex_destroy(&mir->mir_mutex);
   1261 	cv_destroy(&mir->mir_condvar);
   1262 	cv_destroy(&mir->mir_timer_cv);
   1263 	kmem_free(mir, sizeof (mir_t));
   1264 	return (0);
   1265 }
   1266 
   1267 /*
   1268  * This is server side only (RPC_SERVER).
   1269  *
   1270  * Exit idle mode.
   1271  */
   1272 static void
   1273 mir_svc_idle_stop(queue_t *q, mir_t *mir)
   1274 {
   1275 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   1276 	ASSERT((q->q_flag & QREADR) == 0);
   1277 	ASSERT(mir->mir_type == RPC_SERVER);
   1278 	RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q);
   1279 
   1280 	mir_timer_stop(mir);
   1281 }
   1282 
   1283 /*
   1284  * This is server side only (RPC_SERVER).
   1285  *
   1286  * Start idle processing, which will include setting idle timer if the
   1287  * stream is not being closed.
   1288  */
   1289 static void
   1290 mir_svc_idle_start(queue_t *q, mir_t *mir)
   1291 {
   1292 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   1293 	ASSERT((q->q_flag & QREADR) == 0);
   1294 	ASSERT(mir->mir_type == RPC_SERVER);
   1295 	RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q);
   1296 
   1297 	/*
   1298 	 * Don't re-start idle timer if we are closing queues.
   1299 	 */
   1300 	if (mir->mir_closing) {
   1301 		RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n",
   1302 		    (void *)q);
   1303 
   1304 		/*
   1305 		 * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED()
   1306 		 * is true.  When it is true, and we are in the process of
   1307 		 * closing the stream, signal any thread waiting in
   1308 		 * mir_close().
   1309 		 */
   1310 		if (mir->mir_inwservice == 0)
   1311 			cv_signal(&mir->mir_condvar);
   1312 
   1313 	} else {
   1314 		RPCLOG(16, "mir_svc_idle_start - reset %s timer\n",
   1315 		    mir->mir_ordrel_pending ? "ordrel" : "normal");
   1316 		/*
   1317 		 * Normal condition, start the idle timer.  If an orderly
   1318 		 * release has been sent, set the timeout to wait for the
   1319 		 * client to close its side of the connection.  Otherwise,
   1320 		 * use the normal idle timeout.
   1321 		 */
   1322 		mir_timer_start(q, mir, mir->mir_ordrel_pending ?
   1323 		    svc_ordrel_timeout : mir->mir_idle_timeout);
   1324 	}
   1325 }
   1326 
   1327 /* ARGSUSED */
   1328 static int
   1329 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
   1330 {
   1331 	mir_t	*mir;
   1332 
   1333 	RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q);
   1334 	/* Set variables used directly by KRPC. */
   1335 	if (!mir_rele)
   1336 		mir_rele = mir_svc_release;
   1337 	if (!mir_start)
   1338 		mir_start = mir_svc_start;
   1339 	if (!clnt_stop_idle)
   1340 		clnt_stop_idle = mir_clnt_idle_do_stop;
   1341 	if (!clnt_max_msg_sizep)
   1342 		clnt_max_msg_sizep = &clnt_max_msg_size;
   1343 	if (!svc_max_msg_sizep)
   1344 		svc_max_msg_sizep = &svc_max_msg_size;
   1345 
   1346 	/* Allocate a zero'ed out mir structure for this stream. */
   1347 	mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP);
   1348 
   1349 	/*
   1350 	 * We set hold inbound here so that incoming messages will
   1351 	 * be held on the read-side queue until the stream is completely
   1352 	 * initialized with a RPC_CLIENT or RPC_SERVER ioctl.  During
   1353 	 * the ioctl processing, the flag is cleared and any messages that
   1354 	 * arrived between the open and the ioctl are delivered to KRPC.
   1355 	 *
   1356 	 * Early data should never arrive on a client stream since
   1357 	 * servers only respond to our requests and we do not send any.
   1358 	 * until after the stream is initialized.  Early data is
   1359 	 * very common on a server stream where the client will start
   1360 	 * sending data as soon as the connection is made (and this
   1361 	 * is especially true with TCP where the protocol accepts the
   1362 	 * connection before nfsd or KRPC is notified about it).
   1363 	 */
   1364 
   1365 	mir->mir_hold_inbound = 1;
   1366 
   1367 	/*
   1368 	 * Start the record marker looking for a 4-byte header.  When
   1369 	 * this length is negative, it indicates that rpcmod is looking
   1370 	 * for bytes to consume for the record marker header.  When it
   1371 	 * is positive, it holds the number of bytes that have arrived
   1372 	 * for the current fragment and are being held in mir_header_mp.
   1373 	 */
   1374 
   1375 	mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
   1376 
   1377 	mir->mir_zoneid = rpc_zoneid();
   1378 	mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL);
   1379 	cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL);
   1380 	cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL);
   1381 
   1382 	q->q_ptr = (char *)mir;
   1383 	WR(q)->q_ptr = (char *)mir;
   1384 
   1385 	/*
   1386 	 * We noenable the read-side queue because we don't want it
   1387 	 * automatically enabled by putq.  We enable it explicitly
   1388 	 * in mir_wsrv when appropriate. (See additional comments on
   1389 	 * flow control at the beginning of mir_rsrv.)
   1390 	 */
   1391 	noenable(q);
   1392 
   1393 	qprocson(q);
   1394 	return (0);
   1395 }
   1396 
   1397 /*
   1398  * Read-side put routine for both the client and server side.  Does the
   1399  * record marking for incoming RPC messages, and when complete, dispatches
   1400  * the message to either the client or server.
   1401  */
   1402 static void
   1403 mir_rput(queue_t *q, mblk_t *mp)
   1404 {
   1405 	int	excess;
   1406 	int32_t	frag_len, frag_header;
   1407 	mblk_t	*cont_mp, *head_mp, *tail_mp, *mp1;
   1408 	mir_t	*mir = q->q_ptr;
   1409 	boolean_t stop_timer = B_FALSE;
   1410 
   1411 	ASSERT(mir != NULL);
   1412 
   1413 	/*
   1414 	 * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
   1415 	 * with the corresponding ioctl, then don't accept
   1416 	 * any inbound data.  This should never happen for streams
   1417 	 * created by nfsd or client-side KRPC because they are careful
   1418 	 * to set the mode of the stream before doing anything else.
   1419 	 */
   1420 	if (mir->mir_type == 0) {
   1421 		freemsg(mp);
   1422 		return;
   1423 	}
   1424 
   1425 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   1426 
   1427 	switch (mp->b_datap->db_type) {
   1428 	case M_DATA:
   1429 		break;
   1430 	case M_PROTO:
   1431 	case M_PCPROTO:
   1432 		if (MBLKL(mp) < sizeof (t_scalar_t)) {
   1433 			RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n",
   1434 			    (int)MBLKL(mp));
   1435 			freemsg(mp);
   1436 			return;
   1437 		}
   1438 		if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) {
   1439 			mir_rput_proto(q, mp);
   1440 			return;
   1441 		}
   1442 
   1443 		/* Throw away the T_DATA_IND block and continue with data. */
   1444 		mp1 = mp;
   1445 		mp = mp->b_cont;
   1446 		freeb(mp1);
   1447 		break;
   1448 	case M_SETOPTS:
   1449 		/*
   1450 		 * If a module on the stream is trying set the Stream head's
   1451 		 * high water mark, then set our hiwater to the requested
   1452 		 * value.  We are the "stream head" for all inbound
   1453 		 * data messages since messages are passed directly to KRPC.
   1454 		 */
   1455 		if (MBLKL(mp) >= sizeof (struct stroptions)) {
   1456 			struct stroptions	*stropts;
   1457 
   1458 			stropts = (struct stroptions *)mp->b_rptr;
   1459 			if ((stropts->so_flags & SO_HIWAT) &&
   1460 			    !(stropts->so_flags & SO_BAND)) {
   1461 				(void) strqset(q, QHIWAT, 0, stropts->so_hiwat);
   1462 			}
   1463 		}
   1464 		putnext(q, mp);
   1465 		return;
   1466 	case M_FLUSH:
   1467 		RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr);
   1468 		RPCLOG(32, "on q 0x%p\n", (void *)q);
   1469 		putnext(q, mp);
   1470 		return;
   1471 	default:
   1472 		putnext(q, mp);
   1473 		return;
   1474 	}
   1475 
   1476 	mutex_enter(&mir->mir_mutex);
   1477 
   1478 	/*
   1479 	 * If this connection is closing, don't accept any new messages.
   1480 	 */
   1481 	if (mir->mir_svc_no_more_msgs) {
   1482 		ASSERT(mir->mir_type == RPC_SERVER);
   1483 		mutex_exit(&mir->mir_mutex);
   1484 		freemsg(mp);
   1485 		return;
   1486 	}
   1487 
   1488 	/* Get local copies for quicker access. */
   1489 	frag_len = mir->mir_frag_len;
   1490 	frag_header = mir->mir_frag_header;
   1491 	head_mp = mir->mir_head_mp;
   1492 	tail_mp = mir->mir_tail_mp;
   1493 
   1494 	/* Loop, processing each message block in the mp chain separately. */
   1495 	do {
   1496 		cont_mp = mp->b_cont;
   1497 		mp->b_cont = NULL;
   1498 
   1499 		/*
   1500 		 * If frag_len is negative, we're still in the process of
   1501 		 * building frag_header -- try to complete it with this mblk.
   1502 		 */
   1503 		while (frag_len < 0 && mp->b_rptr < mp->b_wptr) {
   1504 			frag_len++;
   1505 			frag_header <<= 8;
   1506 			frag_header += *mp->b_rptr++;
   1507 		}
   1508 
   1509 		if (MBLKL(mp) == 0) {
   1510 			/*
   1511 			 * This was either a zero-length mblk or we consumed
   1512 			 * it while trying to complete the fragment header.
   1513 			 * In either case, free it and move on.
   1514 			 */
   1515 			freeb(mp);
   1516 			continue;
   1517 		}
   1518 
   1519 		ASSERT(frag_len >= 0);
   1520 
   1521 		/*
   1522 		 * Now frag_header has the number of bytes in this fragment
   1523 		 * and we're just waiting to collect them all.  Chain our
   1524 		 * latest mblk onto the list and see if we now have enough
   1525 		 * bytes to complete the fragment.
   1526 		 */
   1527 		if (head_mp == NULL) {
   1528 			ASSERT(tail_mp == NULL);
   1529 			head_mp = tail_mp = mp;
   1530 		} else {
   1531 			tail_mp->b_cont = mp;
   1532 			tail_mp = mp;
   1533 		}
   1534 
   1535 		frag_len += MBLKL(mp);
   1536 		excess = frag_len - (frag_header & ~MIR_LASTFRAG);
   1537 		if (excess < 0) {
   1538 			/*
   1539 			 * We still haven't received enough data to complete
   1540 			 * the fragment, so continue on to the next mblk.
   1541 			 */
   1542 			continue;
   1543 		}
   1544 
   1545 		/*
   1546 		 * We've got a complete fragment.  If there are excess bytes,
   1547 		 * then they're part of the next fragment's header (of either
   1548 		 * this RPC message or the next RPC message).  Split that part
   1549 		 * into its own mblk so that we can safely freeb() it when
   1550 		 * building frag_header above.
   1551 		 */
   1552 		if (excess > 0) {
   1553 			if ((mp1 = dupb(mp)) == NULL &&
   1554 			    (mp1 = copyb(mp)) == NULL) {
   1555 				freemsg(head_mp);
   1556 				freemsg(cont_mp);
   1557 				RPCLOG0(1, "mir_rput: dupb/copyb failed\n");
   1558 				mir->mir_frag_header = 0;
   1559 				mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
   1560 				mir->mir_head_mp = NULL;
   1561 				mir->mir_tail_mp = NULL;
   1562 				mir_disconnect(q, mir);	/* drops mir_mutex */
   1563 				return;
   1564 			}
   1565 
   1566 			/*
   1567 			 * Relink the message chain so that the next mblk is
   1568 			 * the next fragment header, followed by the rest of
   1569 			 * the message chain.
   1570 			 */
   1571 			mp1->b_cont = cont_mp;
   1572 			cont_mp = mp1;
   1573 
   1574 			/*
   1575 			 * Data in the new mblk begins at the next fragment,
   1576 			 * and data in the old mblk ends at the next fragment.
   1577 			 */
   1578 			mp1->b_rptr = mp1->b_wptr - excess;
   1579 			mp->b_wptr -= excess;
   1580 		}
   1581 
   1582 		/*
   1583 		 * Reset frag_len and frag_header for the next fragment.
   1584 		 */
   1585 		frag_len = -(int32_t)sizeof (uint32_t);
   1586 		if (!(frag_header & MIR_LASTFRAG)) {
   1587 			/*
   1588 			 * The current fragment is complete, but more
   1589 			 * fragments need to be processed before we can
   1590 			 * pass along the RPC message headed at head_mp.
   1591 			 */
   1592 			frag_header = 0;
   1593 			continue;
   1594 		}
   1595 		frag_header = 0;
   1596 
   1597 		/*
   1598 		 * We've got a complete RPC message; pass it to the
   1599 		 * appropriate consumer.
   1600 		 */
   1601 		switch (mir->mir_type) {
   1602 		case RPC_CLIENT:
   1603 			if (clnt_dispatch_notify(head_mp, mir->mir_zoneid)) {
   1604 				/*
   1605 				 * Mark this stream as active.  This marker
   1606 				 * is used in mir_timer().
   1607 				 */
   1608 				mir->mir_clntreq = 1;
   1609 				mir->mir_use_timestamp = lbolt;
   1610 			} else {
   1611 				freemsg(head_mp);
   1612 			}
   1613 			break;
   1614 
   1615 		case RPC_SERVER:
   1616 			/*
   1617 			 * Check for flow control before passing the
   1618 			 * message to KRPC.
   1619 			 */
   1620 			if (!mir->mir_hold_inbound) {
   1621 				if (mir->mir_krpc_cell) {
   1622 					/*
   1623 					 * If the reference count is 0
   1624 					 * (not including this request),
   1625 					 * then the stream is transitioning
   1626 					 * from idle to non-idle.  In this case,
   1627 					 * we cancel the idle timer.
   1628 					 */
   1629 					if (mir->mir_ref_cnt++ == 0)
   1630 						stop_timer = B_TRUE;
   1631 					if (mir_check_len(q,
   1632 					    (int32_t)msgdsize(mp), mp))
   1633 						return;
   1634 					svc_queuereq(q, head_mp); /* to KRPC */
   1635 				} else {
   1636 					/*
   1637 					 * Count # of times this happens. Should
   1638 					 * be never, but experience shows
   1639 					 * otherwise.
   1640 					 */
   1641 					mir_krpc_cell_null++;
   1642 					freemsg(head_mp);
   1643 				}
   1644 			} else {
   1645 				/*
   1646 				 * If the outbound side of the stream is
   1647 				 * flow controlled, then hold this message
   1648 				 * until client catches up. mir_hold_inbound
   1649 				 * is set in mir_wput and cleared in mir_wsrv.
   1650 				 */
   1651 				(void) putq(q, head_mp);
   1652 				mir->mir_inrservice = B_TRUE;
   1653 			}
   1654 			break;
   1655 		default:
   1656 			RPCLOG(1, "mir_rput: unknown mir_type %d\n",
   1657 			    mir->mir_type);
   1658 			freemsg(head_mp);
   1659 			break;
   1660 		}
   1661 
   1662 		/*
   1663 		 * Reset the chain since we're starting on a new RPC message.
   1664 		 */
   1665 		head_mp = tail_mp = NULL;
   1666 	} while ((mp = cont_mp) != NULL);
   1667 
   1668 	/*
   1669 	 * Sanity check the message length; if it's too large mir_check_len()
   1670 	 * will shutdown the connection, drop mir_mutex, and return non-zero.
   1671 	 */
   1672 	if (head_mp != NULL && mir->mir_setup_complete &&
   1673 	    mir_check_len(q, frag_len, head_mp))
   1674 		return;
   1675 
   1676 	/* Save our local copies back in the mir structure. */
   1677 	mir->mir_frag_header = frag_header;
   1678 	mir->mir_frag_len = frag_len;
   1679 	mir->mir_head_mp = head_mp;
   1680 	mir->mir_tail_mp = tail_mp;
   1681 
   1682 	/*
   1683 	 * The timer is stopped after the whole message chain is processed.
   1684 	 * The reason is that stopping the timer releases the mir_mutex
   1685 	 * lock temporarily.  This means that the request can be serviced
   1686 	 * while we are still processing the message chain.  This is not
   1687 	 * good.  So we stop the timer here instead.
   1688 	 *
   1689 	 * Note that if the timer fires before we stop it, it will not
   1690 	 * do any harm as MIR_SVC_QUIESCED() is false and mir_timer()
   1691 	 * will just return.
   1692 	 */
   1693 	if (stop_timer) {
   1694 		RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because "
   1695 		    "ref cnt going to non zero\n", (void *)WR(q));
   1696 		mir_svc_idle_stop(WR(q), mir);
   1697 	}
   1698 	mutex_exit(&mir->mir_mutex);
   1699 }
   1700 
   1701 static void
   1702 mir_rput_proto(queue_t *q, mblk_t *mp)
   1703 {
   1704 	mir_t	*mir = (mir_t *)q->q_ptr;
   1705 	uint32_t	type;
   1706 	uint32_t reason = 0;
   1707 
   1708 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   1709 
   1710 	type = ((union T_primitives *)mp->b_rptr)->type;
   1711 	switch (mir->mir_type) {
   1712 	case RPC_CLIENT:
   1713 		switch (type) {
   1714 		case T_DISCON_IND:
   1715 			reason = ((struct T_discon_ind *)
   1716 			    (mp->b_rptr))->DISCON_reason;
   1717 			/*FALLTHROUGH*/
   1718 		case T_ORDREL_IND:
   1719 			mutex_enter(&mir->mir_mutex);
   1720 			if (mir->mir_head_mp) {
   1721 				freemsg(mir->mir_head_mp);
   1722 				mir->mir_head_mp = (mblk_t *)0;
   1723 				mir->mir_tail_mp = (mblk_t *)0;
   1724 			}
   1725 			/*
   1726 			 * We are disconnecting, but not necessarily
   1727 			 * closing. By not closing, we will fail to
   1728 			 * pick up a possibly changed global timeout value,
   1729 			 * unless we store it now.
   1730 			 */
   1731 			mir->mir_idle_timeout = clnt_idle_timeout;
   1732 			mir_clnt_idle_stop(WR(q), mir);
   1733 
   1734 			/*
   1735 			 * Even though we are unconnected, we still
   1736 			 * leave the idle timer going on the client. The
   1737 			 * reason for is that if we've disconnected due
   1738 			 * to a server-side disconnect, reset, or connection
   1739 			 * timeout, there is a possibility the client may
   1740 			 * retry the RPC request. This retry needs to done on
   1741 			 * the same bound address for the server to interpret
   1742 			 * it as such. However, we don't want
   1743 			 * to wait forever for that possibility. If the
   1744 			 * end-point stays unconnected for mir_idle_timeout
   1745 			 * units of time, then that is a signal to the
   1746 			 * connection manager to give up waiting for the
   1747 			 * application (eg. NFS) to send a retry.
   1748 			 */
   1749 			mir_clnt_idle_start(WR(q), mir);
   1750 			mutex_exit(&mir->mir_mutex);
   1751 			clnt_dispatch_notifyall(WR(q), type, reason);
   1752 			freemsg(mp);
   1753 			return;
   1754 		case T_ERROR_ACK:
   1755 		{
   1756 			struct T_error_ack	*terror;
   1757 
   1758 			terror = (struct T_error_ack *)mp->b_rptr;
   1759 			RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p",
   1760 			    (void *)q);
   1761 			RPCLOG(1, " ERROR_prim: %s,",
   1762 			    rpc_tpiprim2name(terror->ERROR_prim));
   1763 			RPCLOG(1, " TLI_error: %s,",
   1764 			    rpc_tpierr2name(terror->TLI_error));
   1765 			RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error);
   1766 			if (terror->ERROR_prim == T_DISCON_REQ)  {
   1767 				clnt_dispatch_notifyall(WR(q), type, reason);
   1768 				freemsg(mp);
   1769 				return;
   1770 			} else {
   1771 				if (clnt_dispatch_notifyconn(WR(q), mp))
   1772 					return;
   1773 			}
   1774 			break;
   1775 		}
   1776 		case T_OK_ACK:
   1777 		{
   1778 			struct T_ok_ack	*tok = (struct T_ok_ack *)mp->b_rptr;
   1779 
   1780 			if (tok->CORRECT_prim == T_DISCON_REQ) {
   1781 				clnt_dispatch_notifyall(WR(q), type, reason);
   1782 				freemsg(mp);
   1783 				return;
   1784 			} else {
   1785 				if (clnt_dispatch_notifyconn(WR(q), mp))
   1786 					return;
   1787 			}
   1788 			break;
   1789 		}
   1790 		case T_CONN_CON:
   1791 		case T_INFO_ACK:
   1792 		case T_OPTMGMT_ACK:
   1793 			if (clnt_dispatch_notifyconn(WR(q), mp))
   1794 				return;
   1795 			break;
   1796 		case T_BIND_ACK:
   1797 			break;
   1798 		default:
   1799 			RPCLOG(1, "mir_rput: unexpected message %d "
   1800 			    "for KRPC client\n",
   1801 			    ((union T_primitives *)mp->b_rptr)->type);
   1802 			break;
   1803 		}
   1804 		break;
   1805 
   1806 	case RPC_SERVER:
   1807 		switch (type) {
   1808 		case T_BIND_ACK:
   1809 		{
   1810 			struct T_bind_ack	*tbind;
   1811 
   1812 			/*
   1813 			 * If this is a listening stream, then shut
   1814 			 * off the idle timer.
   1815 			 */
   1816 			tbind = (struct T_bind_ack *)mp->b_rptr;
   1817 			if (tbind->CONIND_number > 0) {
   1818 				mutex_enter(&mir->mir_mutex);
   1819 				mir_svc_idle_stop(WR(q), mir);
   1820 
   1821 				/*
   1822 				 * mark this as a listen endpoint
   1823 				 * for special handling.
   1824 				 */
   1825 
   1826 				mir->mir_listen_stream = 1;
   1827 				mutex_exit(&mir->mir_mutex);
   1828 			}
   1829 			break;
   1830 		}
   1831 		case T_DISCON_IND:
   1832 		case T_ORDREL_IND:
   1833 			RPCLOG(16, "mir_rput_proto: got %s indication\n",
   1834 			    type == T_DISCON_IND ? "disconnect"
   1835 			    : "orderly release");
   1836 
   1837 			/*
   1838 			 * For listen endpoint just pass
   1839 			 * on the message.
   1840 			 */
   1841 
   1842 			if (mir->mir_listen_stream)
   1843 				break;
   1844 
   1845 			mutex_enter(&mir->mir_mutex);
   1846 
   1847 			/*
   1848 			 * If client wants to break off connection, record
   1849 			 * that fact.
   1850 			 */
   1851 			mir_svc_start_close(WR(q), mir);
   1852 
   1853 			/*
   1854 			 * If we are idle, then send the orderly release
   1855 			 * or disconnect indication to nfsd.
   1856 			 */
   1857 			if (MIR_SVC_QUIESCED(mir)) {
   1858 				mutex_exit(&mir->mir_mutex);
   1859 				break;
   1860 			}
   1861 
   1862 			RPCLOG(16, "mir_rput_proto: not idle, so "
   1863 			    "disconnect/ord rel indication not passed "
   1864 			    "upstream on 0x%p\n", (void *)q);
   1865 
   1866 			/*
   1867 			 * Hold the indication until we get idle
   1868 			 * If there already is an indication stored,
   1869 			 * replace it if the new one is a disconnect. The
   1870 			 * reasoning is that disconnection takes less time
   1871 			 * to process, and once a client decides to
   1872 			 * disconnect, we should do that.
   1873 			 */
   1874 			if (mir->mir_svc_pend_mp) {
   1875 				if (type == T_DISCON_IND) {
   1876 					RPCLOG(16, "mir_rput_proto: replacing"
   1877 					    " held disconnect/ord rel"
   1878 					    " indication with disconnect on"
   1879 					    " 0x%p\n", (void *)q);
   1880 
   1881 					freemsg(mir->mir_svc_pend_mp);
   1882 					mir->mir_svc_pend_mp = mp;
   1883 				} else {
   1884 					RPCLOG(16, "mir_rput_proto: already "
   1885 					    "held a disconnect/ord rel "
   1886 					    "indication. freeing ord rel "
   1887 					    "ind on 0x%p\n", (void *)q);
   1888 					freemsg(mp);
   1889 				}
   1890 			} else
   1891 				mir->mir_svc_pend_mp = mp;
   1892 
   1893 			mutex_exit(&mir->mir_mutex);
   1894 			return;
   1895 
   1896 		default:
   1897 			/* nfsd handles server-side non-data messages. */
   1898 			break;
   1899 		}
   1900 		break;
   1901 
   1902 	default:
   1903 		break;
   1904 	}
   1905 
   1906 	putnext(q, mp);
   1907 }
   1908 
   1909 /*
   1910  * The server-side read queues are used to hold inbound messages while
   1911  * outbound flow control is exerted.  When outbound flow control is
   1912  * relieved, mir_wsrv qenables the read-side queue.  Read-side queues
   1913  * are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
   1914  *
   1915  * For the server side,  we have two types of messages queued. The first type
   1916  * are messages that are ready to be XDR decoded and and then sent to the
   1917  * RPC program's dispatch routine. The second type are "raw" messages that
   1918  * haven't been processed, i.e. assembled from rpc record fragements into
   1919  * full requests. The only time we will see the second type of message
   1920  * queued is if we have a memory allocation failure while processing a
   1921  * a raw message. The field mir_first_non_processed_mblk will mark the
   1922  * first such raw message. So the flow for server side is:
   1923  *
   1924  *	- send processed queued messages to kRPC until we run out or find
   1925  *	  one that needs additional processing because we were short on memory
   1926  *	  earlier
   1927  *	- process a message that was deferred because of lack of
   1928  *	  memory
   1929  *	- continue processing messages until the queue empties or we
   1930  *	  have to stop because of lack of memory
   1931  *	- during each of the above phase, if the queue is empty and
   1932  *	  there are no pending messages that were passed to the RPC
   1933  *	  layer, send upstream the pending disconnect/ordrel indication if
   1934  *	  there is one
   1935  *
   1936  * The read-side queue is also enabled by a bufcall callback if dupmsg
   1937  * fails in mir_rput.
   1938  */
   1939 static void
   1940 mir_rsrv(queue_t *q)
   1941 {
   1942 	mir_t	*mir;
   1943 	mblk_t	*mp;
   1944 	mblk_t	*cmp = NULL;
   1945 	boolean_t stop_timer = B_FALSE;
   1946 
   1947 	mir = (mir_t *)q->q_ptr;
   1948 	mutex_enter(&mir->mir_mutex);
   1949 
   1950 	mp = NULL;
   1951 	switch (mir->mir_type) {
   1952 	case RPC_SERVER:
   1953 		if (mir->mir_ref_cnt == 0)
   1954 			mir->mir_hold_inbound = 0;
   1955 		if (mir->mir_hold_inbound) {
   1956 
   1957 			ASSERT(cmp == NULL);
   1958 			if (q->q_first == NULL) {
   1959 
   1960 				MIR_CLEAR_INRSRV(mir);
   1961 
   1962 				if (MIR_SVC_QUIESCED(mir)) {
   1963 					cmp = mir->mir_svc_pend_mp;
   1964 					mir->mir_svc_pend_mp = NULL;
   1965 				}
   1966 			}
   1967 
   1968 			mutex_exit(&mir->mir_mutex);
   1969 
   1970 			if (cmp != NULL) {
   1971 				RPCLOG(16, "mir_rsrv: line %d: sending a held "
   1972 				    "disconnect/ord rel indication upstream\n",
   1973 				    __LINE__);
   1974 				putnext(q, cmp);
   1975 			}
   1976 
   1977 			return;
   1978 		}
   1979 		while (mp = getq(q)) {
   1980 			if (mir->mir_krpc_cell &&
   1981 			    (mir->mir_svc_no_more_msgs == 0)) {
   1982 				/*
   1983 				 * If we were idle, turn off idle timer since
   1984 				 * we aren't idle any more.
   1985 				 */
   1986 				if (mir->mir_ref_cnt++ == 0)
   1987 					stop_timer = B_TRUE;
   1988 				if (mir_check_len(q,
   1989 				    (int32_t)msgdsize(mp), mp))
   1990 					return;
   1991 				svc_queuereq(q, mp);
   1992 			} else {
   1993 				/*
   1994 				 * Count # of times this happens. Should be
   1995 				 * never, but experience shows otherwise.
   1996 				 */
   1997 				if (mir->mir_krpc_cell == NULL)
   1998 					mir_krpc_cell_null++;
   1999 				freemsg(mp);
   2000 			}
   2001 		}
   2002 		break;
   2003 	case RPC_CLIENT:
   2004 		break;
   2005 	default:
   2006 		RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type);
   2007 
   2008 		if (q->q_first == NULL)
   2009 			MIR_CLEAR_INRSRV(mir);
   2010 
   2011 		mutex_exit(&mir->mir_mutex);
   2012 
   2013 		return;
   2014 	}
   2015 
   2016 	/*
   2017 	 * The timer is stopped after all the messages are processed.
   2018 	 * The reason is that stopping the timer releases the mir_mutex
   2019 	 * lock temporarily.  This means that the request can be serviced
   2020 	 * while we are still processing the message queue.  This is not
   2021 	 * good.  So we stop the timer here instead.
   2022 	 */
   2023 	if (stop_timer)  {
   2024 		RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref "
   2025 		    "cnt going to non zero\n", (void *)WR(q));
   2026 		mir_svc_idle_stop(WR(q), mir);
   2027 	}
   2028 
   2029 	if (q->q_first == NULL) {
   2030 
   2031 		MIR_CLEAR_INRSRV(mir);
   2032 
   2033 		ASSERT(cmp == NULL);
   2034 		if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) {
   2035 			cmp = mir->mir_svc_pend_mp;
   2036 			mir->mir_svc_pend_mp = NULL;
   2037 		}
   2038 
   2039 		mutex_exit(&mir->mir_mutex);
   2040 
   2041 		if (cmp != NULL) {
   2042 			RPCLOG(16, "mir_rsrv: line %d: sending a held "
   2043 			    "disconnect/ord rel indication upstream\n",
   2044 			    __LINE__);
   2045 			putnext(q, cmp);
   2046 		}
   2047 
   2048 		return;
   2049 	}
   2050 	mutex_exit(&mir->mir_mutex);
   2051 }
   2052 
   2053 static int mir_svc_policy_fails;
   2054 
   2055 /*
   2056  * Called to send an event code to nfsd/lockd so that it initiates
   2057  * connection close.
   2058  */
   2059 static int
   2060 mir_svc_policy_notify(queue_t *q, int event)
   2061 {
   2062 	mblk_t	*mp;
   2063 #ifdef DEBUG
   2064 	mir_t *mir = (mir_t *)q->q_ptr;
   2065 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   2066 #endif
   2067 	ASSERT(q->q_flag & QREADR);
   2068 
   2069 	/*
   2070 	 * Create an M_DATA message with the event code and pass it to the
   2071 	 * Stream head (nfsd or whoever created the stream will consume it).
   2072 	 */
   2073 	mp = allocb(sizeof (int), BPRI_HI);
   2074 
   2075 	if (!mp) {
   2076 
   2077 		mir_svc_policy_fails++;
   2078 		RPCLOG(16, "mir_svc_policy_notify: could not allocate event "
   2079 		    "%d\n", event);
   2080 		return (ENOMEM);
   2081 	}
   2082 
   2083 	U32_TO_BE32(event, mp->b_rptr);
   2084 	mp->b_wptr = mp->b_rptr + sizeof (int);
   2085 	putnext(q, mp);
   2086 	return (0);
   2087 }
   2088 
   2089 /*
   2090  * Server side: start the close phase. We want to get this rpcmod slot in an
   2091  * idle state before mir_close() is called.
   2092  */
   2093 static void
   2094 mir_svc_start_close(queue_t *wq, mir_t *mir)
   2095 {
   2096 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   2097 	ASSERT((wq->q_flag & QREADR) == 0);
   2098 	ASSERT(mir->mir_type == RPC_SERVER);
   2099 
   2100 
   2101 	/*
   2102 	 * Do not accept any more messages.
   2103 	 */
   2104 	mir->mir_svc_no_more_msgs = 1;
   2105 
   2106 	/*
   2107 	 * Next two statements will make the read service procedure invoke
   2108 	 * svc_queuereq() on everything stuck in the streams read queue.
   2109 	 * It's not necessary because enabling the write queue will
   2110 	 * have the same effect, but why not speed the process along?
   2111 	 */
   2112 	mir->mir_hold_inbound = 0;
   2113 	qenable(RD(wq));
   2114 
   2115 	/*
   2116 	 * Meanwhile force the write service procedure to send the
   2117 	 * responses downstream, regardless of flow control.
   2118 	 */
   2119 	qenable(wq);
   2120 }
   2121 
   2122 /*
   2123  * This routine is called directly by KRPC after a request is completed,
   2124  * whether a reply was sent or the request was dropped.
   2125  */
   2126 static void
   2127 mir_svc_release(queue_t *wq, mblk_t *mp)
   2128 {
   2129 	mir_t   *mir = (mir_t *)wq->q_ptr;
   2130 	mblk_t	*cmp = NULL;
   2131 
   2132 	ASSERT((wq->q_flag & QREADR) == 0);
   2133 	if (mp)
   2134 		freemsg(mp);
   2135 
   2136 	mutex_enter(&mir->mir_mutex);
   2137 
   2138 	/*
   2139 	 * Start idle processing if this is the last reference.
   2140 	 */
   2141 	if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) {
   2142 
   2143 		RPCLOG(16, "mir_svc_release starting idle timer on 0x%p "
   2144 		    "because ref cnt is zero\n", (void *) wq);
   2145 
   2146 		cmp = mir->mir_svc_pend_mp;
   2147 		mir->mir_svc_pend_mp = NULL;
   2148 		mir_svc_idle_start(wq, mir);
   2149 	}
   2150 
   2151 	mir->mir_ref_cnt--;
   2152 	ASSERT(mir->mir_ref_cnt >= 0);
   2153 
   2154 	/*
   2155 	 * Wake up the thread waiting to close.
   2156 	 */
   2157 
   2158 	if ((mir->mir_ref_cnt == 0) && mir->mir_closing)
   2159 		cv_signal(&mir->mir_condvar);
   2160 
   2161 	mutex_exit(&mir->mir_mutex);
   2162 
   2163 	if (cmp) {
   2164 		RPCLOG(16, "mir_svc_release: sending a held "
   2165 		    "disconnect/ord rel indication upstream on queue 0x%p\n",
   2166 		    (void *)RD(wq));
   2167 
   2168 		putnext(RD(wq), cmp);
   2169 	}
   2170 }
   2171 
   2172 /*
   2173  * This routine is called by server-side KRPC when it is ready to
   2174  * handle inbound messages on the stream.
   2175  */
   2176 static void
   2177 mir_svc_start(queue_t *wq)
   2178 {
   2179 	mir_t   *mir = (mir_t *)wq->q_ptr;
   2180 
   2181 	/*
   2182 	 * no longer need to take the mir_mutex because the
   2183 	 * mir_setup_complete field has been moved out of
   2184 	 * the binary field protected by the mir_mutex.
   2185 	 */
   2186 
   2187 	mir->mir_setup_complete = 1;
   2188 	qenable(RD(wq));
   2189 }
   2190 
   2191 /*
   2192  * client side wrapper for stopping timer with normal idle timeout.
   2193  */
   2194 static void
   2195 mir_clnt_idle_stop(queue_t *wq, mir_t *mir)
   2196 {
   2197 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   2198 	ASSERT((wq->q_flag & QREADR) == 0);
   2199 	ASSERT(mir->mir_type == RPC_CLIENT);
   2200 
   2201 	mir_timer_stop(mir);
   2202 }
   2203 
   2204 /*
   2205  * client side wrapper for stopping timer with normal idle timeout.
   2206  */
   2207 static void
   2208 mir_clnt_idle_start(queue_t *wq, mir_t *mir)
   2209 {
   2210 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   2211 	ASSERT((wq->q_flag & QREADR) == 0);
   2212 	ASSERT(mir->mir_type == RPC_CLIENT);
   2213 
   2214 	mir_timer_start(wq, mir, mir->mir_idle_timeout);
   2215 }
   2216 
   2217 /*
   2218  * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on
   2219  * end-points that aren't connected.
   2220  */
   2221 static void
   2222 mir_clnt_idle_do_stop(queue_t *wq)
   2223 {
   2224 	mir_t   *mir = (mir_t *)wq->q_ptr;
   2225 
   2226 	RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq);
   2227 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   2228 	mutex_enter(&mir->mir_mutex);
   2229 	mir_clnt_idle_stop(wq, mir);
   2230 	mutex_exit(&mir->mir_mutex);
   2231 }
   2232 
   2233 /*
   2234  * Timer handler.  It handles idle timeout and memory shortage problem.
   2235  */
   2236 static void
   2237 mir_timer(void *arg)
   2238 {
   2239 	queue_t *wq = (queue_t *)arg;
   2240 	mir_t *mir = (mir_t *)wq->q_ptr;
   2241 	boolean_t notify;
   2242 
   2243 	mutex_enter(&mir->mir_mutex);
   2244 
   2245 	/*
   2246 	 * mir_timer_call is set only when either mir_timer_[start|stop]
   2247 	 * is progressing.  And mir_timer() can only be run while they
   2248 	 * are progressing if the timer is being stopped.  So just
   2249 	 * return.
   2250 	 */
   2251 	if (mir->mir_timer_call) {
   2252 		mutex_exit(&mir->mir_mutex);
   2253 		return;
   2254 	}
   2255 	mir->mir_timer_id = 0;
   2256 
   2257 	switch (mir->mir_type) {
   2258 	case RPC_CLIENT:
   2259 
   2260 		/*
   2261 		 * For clients, the timer fires at clnt_idle_timeout
   2262 		 * intervals.  If the activity marker (mir_clntreq) is
   2263 		 * zero, then the stream has been idle since the last
   2264 		 * timer event and we notify KRPC.  If mir_clntreq is
   2265 		 * non-zero, then the stream is active and we just
   2266 		 * restart the timer for another interval.  mir_clntreq
   2267 		 * is set to 1 in mir_wput for every request passed
   2268 		 * downstream.
   2269 		 *
   2270 		 * If this was a memory shortage timer reset the idle
   2271 		 * timeout regardless; the mir_clntreq will not be a
   2272 		 * valid indicator.
   2273 		 *
   2274 		 * The timer is initially started in mir_wput during
   2275 		 * RPC_CLIENT ioctl processing.
   2276 		 *
   2277 		 * The timer interval can be changed for individual
   2278 		 * streams with the ND variable "mir_idle_timeout".
   2279 		 */
   2280 		if (mir->mir_clntreq > 0 && mir->mir_use_timestamp +
   2281 		    MSEC_TO_TICK(mir->mir_idle_timeout) - lbolt >= 0) {
   2282 			clock_t tout;
   2283 
   2284 			tout = mir->mir_idle_timeout -
   2285 			    TICK_TO_MSEC(lbolt - mir->mir_use_timestamp);
   2286 			if (tout < 0)
   2287 				tout = 1000;
   2288 #if 0
   2289 			printf("mir_timer[%d < %d + %d]: reset client timer "
   2290 			    "to %d (ms)\n", TICK_TO_MSEC(lbolt),
   2291 			    TICK_TO_MSEC(mir->mir_use_timestamp),
   2292 			    mir->mir_idle_timeout, tout);
   2293 #endif
   2294 			mir->mir_clntreq = 0;
   2295 			mir_timer_start(wq, mir, tout);
   2296 			mutex_exit(&mir->mir_mutex);
   2297 			return;
   2298 		}
   2299 #if 0
   2300 printf("mir_timer[%d]: doing client timeout\n", lbolt / hz);
   2301 #endif
   2302 		/*
   2303 		 * We are disconnecting, but not necessarily
   2304 		 * closing. By not closing, we will fail to
   2305 		 * pick up a possibly changed global timeout value,
   2306 		 * unless we store it now.
   2307 		 */
   2308 		mir->mir_idle_timeout = clnt_idle_timeout;
   2309 		mir_clnt_idle_start(wq, mir);
   2310 
   2311 		mutex_exit(&mir->mir_mutex);
   2312 		/*
   2313 		 * We pass T_ORDREL_REQ as an integer value
   2314 		 * to KRPC as the indication that the stream
   2315 		 * is idle.  This is not a T_ORDREL_REQ message,
   2316 		 * it is just a convenient value since we call
   2317 		 * the same KRPC routine for T_ORDREL_INDs and
   2318 		 * T_DISCON_INDs.
   2319 		 */
   2320 		clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0);
   2321 		return;
   2322 
   2323 	case RPC_SERVER:
   2324 
   2325 		/*
   2326 		 * For servers, the timer is only running when the stream
   2327 		 * is really idle or memory is short.  The timer is started
   2328 		 * by mir_wput when mir_type is set to RPC_SERVER and
   2329 		 * by mir_svc_idle_start whenever the stream goes idle
   2330 		 * (mir_ref_cnt == 0).  The timer is cancelled in
   2331 		 * mir_rput whenever a new inbound request is passed to KRPC
   2332 		 * and the stream was previously idle.
   2333 		 *
   2334 		 * The timer interval can be changed for individual
   2335 		 * streams with the ND variable "mir_idle_timeout".
   2336 		 *
   2337 		 * If the stream is not idle do nothing.
   2338 		 */
   2339 		if (!MIR_SVC_QUIESCED(mir)) {
   2340 			mutex_exit(&mir->mir_mutex);
   2341 			return;
   2342 		}
   2343 
   2344 		notify = !mir->mir_inrservice;
   2345 		mutex_exit(&mir->mir_mutex);
   2346 
   2347 		/*
   2348 		 * If there is no packet queued up in read queue, the stream
   2349 		 * is really idle so notify nfsd to close it.
   2350 		 */
   2351 		if (notify) {
   2352 			RPCLOG(16, "mir_timer: telling stream head listener "
   2353 			    "to close stream (0x%p)\n", (void *) RD(wq));
   2354 			(void) mir_svc_policy_notify(RD(wq), 1);
   2355 		}
   2356 		return;
   2357 	default:
   2358 		RPCLOG(1, "mir_timer: unexpected mir_type %d\n",
   2359 		    mir->mir_type);
   2360 		mutex_exit(&mir->mir_mutex);
   2361 		return;
   2362 	}
   2363 }
   2364 
   2365 /*
   2366  * Called by the RPC package to send either a call or a return, or a
   2367  * transport connection request.  Adds the record marking header.
   2368  */
   2369 static void
   2370 mir_wput(queue_t *q, mblk_t *mp)
   2371 {
   2372 	uint_t	frag_header;
   2373 	mir_t	*mir = (mir_t *)q->q_ptr;
   2374 	uchar_t	*rptr = mp->b_rptr;
   2375 
   2376 	if (!mir) {
   2377 		freemsg(mp);
   2378 		return;
   2379 	}
   2380 
   2381 	if (mp->b_datap->db_type != M_DATA) {
   2382 		mir_wput_other(q, mp);
   2383 		return;
   2384 	}
   2385 
   2386 	if (mir->mir_ordrel_pending == 1) {
   2387 		freemsg(mp);
   2388 		RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n",
   2389 		    (void *)q);
   2390 		return;
   2391 	}
   2392 
   2393 	frag_header = (uint_t)DLEN(mp);
   2394 	frag_header |= MIR_LASTFRAG;
   2395 
   2396 	/* Stick in the 4 byte record marking header. */
   2397 	if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) ||
   2398 	    !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) {
   2399 		/*
   2400 		 * Since we know that M_DATA messages are created exclusively
   2401 		 * by KRPC, we expect that KRPC will leave room for our header
   2402 		 * and 4 byte align which is normal for XDR.
   2403 		 * If KRPC (or someone else) does not cooperate, then we
   2404 		 * just throw away the message.
   2405 		 */
   2406 		RPCLOG(1, "mir_wput: KRPC did not leave space for record "
   2407 		    "fragment header (%d bytes left)\n",
   2408 		    (int)(rptr - mp->b_datap->db_base));
   2409 		freemsg(mp);
   2410 		return;
   2411 	}
   2412 	rptr -= sizeof (uint32_t);
   2413 	*(uint32_t *)rptr = htonl(frag_header);
   2414 	mp->b_rptr = rptr;
   2415 
   2416 	mutex_enter(&mir->mir_mutex);
   2417 	if (mir->mir_type == RPC_CLIENT) {
   2418 		/*
   2419 		 * For the client, set mir_clntreq to indicate that the
   2420 		 * connection is active.
   2421 		 */
   2422 		mir->mir_clntreq = 1;
   2423 		mir->mir_use_timestamp = lbolt;
   2424 	}
   2425 
   2426 	/*
   2427 	 * If we haven't already queued some data and the downstream module
   2428 	 * can accept more data, send it on, otherwise we queue the message
   2429 	 * and take other actions depending on mir_type.
   2430 	 */
   2431 	if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) {
   2432 		mutex_exit(&mir->mir_mutex);
   2433 
   2434 		/*
   2435 		 * Now we pass the RPC message downstream.
   2436 		 */
   2437 		putnext(q, mp);
   2438 		return;
   2439 	}
   2440 
   2441 	switch (mir->mir_type) {
   2442 	case RPC_CLIENT:
   2443 		/*
   2444 		 * Check for a previous duplicate request on the
   2445 		 * queue.  If there is one, then we throw away
   2446 		 * the current message and let the previous one
   2447 		 * go through.  If we can't find a duplicate, then
   2448 		 * send this one.  This tap dance is an effort
   2449 		 * to reduce traffic and processing requirements
   2450 		 * under load conditions.
   2451 		 */
   2452 		if (mir_clnt_dup_request(q, mp)) {
   2453 			mutex_exit(&mir->mir_mutex);
   2454 			freemsg(mp);
   2455 			return;
   2456 		}
   2457 		break;
   2458 	case RPC_SERVER:
   2459 		/*
   2460 		 * Set mir_hold_inbound so that new inbound RPC
   2461 		 * messages will be held until the client catches
   2462 		 * up on the earlier replies.  This flag is cleared
   2463 		 * in mir_wsrv after flow control is relieved;
   2464 		 * the read-side queue is also enabled at that time.
   2465 		 */
   2466 		mir->mir_hold_inbound = 1;
   2467 		break;
   2468 	default:
   2469 		RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type);
   2470 		break;
   2471 	}
   2472 	mir->mir_inwservice = 1;
   2473 	(void) putq(q, mp);
   2474 	mutex_exit(&mir->mir_mutex);
   2475 }
   2476 
   2477 static void
   2478 mir_wput_other(queue_t *q, mblk_t *mp)
   2479 {
   2480 	mir_t	*mir = (mir_t *)q->q_ptr;
   2481 	struct iocblk	*iocp;
   2482 	uchar_t	*rptr = mp->b_rptr;
   2483 	bool_t	flush_in_svc = FALSE;
   2484 
   2485 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   2486 	switch (mp->b_datap->db_type) {
   2487 	case M_IOCTL:
   2488 		iocp = (struct iocblk *)rptr;
   2489 		switch (iocp->ioc_cmd) {
   2490 		case RPC_CLIENT:
   2491 			mutex_enter(&mir->mir_mutex);
   2492 			if (mir->mir_type != 0 &&
   2493 			    mir->mir_type != iocp->ioc_cmd) {
   2494 ioc_eperm:
   2495 				mutex_exit(&mir->mir_mutex);
   2496 				iocp->ioc_error = EPERM;
   2497 				iocp->ioc_count = 0;
   2498 				mp->b_datap->db_type = M_IOCACK;
   2499 				qreply(q, mp);
   2500 				return;
   2501 			}
   2502 
   2503 			mir->mir_type = iocp->ioc_cmd;
   2504 
   2505 			/*
   2506 			 * Clear mir_hold_inbound which was set to 1 by
   2507 			 * mir_open.  This flag is not used on client
   2508 			 * streams.
   2509 			 */
   2510 			mir->mir_hold_inbound = 0;
   2511 			mir->mir_max_msg_sizep = &clnt_max_msg_size;
   2512 
   2513 			/*
   2514 			 * Start the idle timer.  See mir_timer() for more
   2515 			 * information on how client timers work.
   2516 			 */
   2517 			mir->mir_idle_timeout = clnt_idle_timeout;
   2518 			mir_clnt_idle_start(q, mir);
   2519 			mutex_exit(&mir->mir_mutex);
   2520 
   2521 			mp->b_datap->db_type = M_IOCACK;
   2522 			qreply(q, mp);
   2523 			return;
   2524 		case RPC_SERVER:
   2525 			mutex_enter(&mir->mir_mutex);
   2526 			if (mir->mir_type != 0 &&
   2527 			    mir->mir_type != iocp->ioc_cmd)
   2528 				goto ioc_eperm;
   2529 
   2530 			/*
   2531 			 * We don't clear mir_hold_inbound here because
   2532 			 * mir_hold_inbound is used in the flow control
   2533 			 * model. If we cleared it here, then we'd commit
   2534 			 * a small violation to the model where the transport
   2535 			 * might immediately block downstream flow.
   2536 			 */
   2537 
   2538 			mir->mir_type = iocp->ioc_cmd;
   2539 			mir->mir_max_msg_sizep = &svc_max_msg_size;
   2540 
   2541 			/*
   2542 			 * Start the idle timer.  See mir_timer() for more
   2543 			 * information on how server timers work.
   2544 			 *
   2545 			 * Note that it is important to start the idle timer
   2546 			 * here so that connections time out even if we
   2547 			 * never receive any data on them.
   2548 			 */
   2549 			mir->mir_idle_timeout = svc_idle_timeout;
   2550 			RPCLOG(16, "mir_wput_other starting idle timer on 0x%p "
   2551 			    "because we got RPC_SERVER ioctl\n", (void *)q);
   2552 			mir_svc_idle_start(q, mir);
   2553 			mutex_exit(&mir->mir_mutex);
   2554 
   2555 			mp->b_datap->db_type = M_IOCACK;
   2556 			qreply(q, mp);
   2557 			return;
   2558 		default:
   2559 			break;
   2560 		}
   2561 		break;
   2562 
   2563 	case M_PROTO:
   2564 		if (mir->mir_type == RPC_CLIENT) {
   2565 			/*
   2566 			 * We are likely being called from the context of a
   2567 			 * service procedure. So we need to enqueue. However
   2568 			 * enqueing may put our message behind data messages.
   2569 			 * So flush the data first.
   2570 			 */
   2571 			flush_in_svc = TRUE;
   2572 		}
   2573 		if ((mp->b_wptr - rptr) < sizeof (uint32_t) ||
   2574 		    !IS_P2ALIGNED(rptr, sizeof (uint32_t)))
   2575 			break;
   2576 
   2577 		switch (((union T_primitives *)rptr)->type) {
   2578 		case T_DATA_REQ:
   2579 			/* Don't pass T_DATA_REQ messages downstream. */
   2580 			freemsg(mp);
   2581 			return;
   2582 		case T_ORDREL_REQ:
   2583 			RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n",
   2584 			    (void *)q);
   2585 			mutex_enter(&mir->mir_mutex);
   2586 			if (mir->mir_type != RPC_SERVER) {
   2587 				/*
   2588 				 * We are likely being called from
   2589 				 * clnt_dispatch_notifyall(). Sending
   2590 				 * a T_ORDREL_REQ will result in
   2591 				 * a some kind of _IND message being sent,
   2592 				 * will be another call to
   2593 				 * clnt_dispatch_notifyall(). To keep the stack
   2594 				 * lean, queue this message.
   2595 				 */
   2596 				mir->mir_inwservice = 1;
   2597 				(void) putq(q, mp);
   2598 				mutex_exit(&mir->mir_mutex);
   2599 				return;
   2600 			}
   2601 
   2602 			/*
   2603 			 * Mark the structure such that we don't accept any
   2604 			 * more requests from client. We could defer this
   2605 			 * until we actually send the orderly release
   2606 			 * request downstream, but all that does is delay
   2607 			 * the closing of this stream.
   2608 			 */
   2609 			RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ "
   2610 			    " so calling mir_svc_start_close\n", (void *)q);
   2611 
   2612 			mir_svc_start_close(q, mir);
   2613 
   2614 			/*
   2615 			 * If we have sent down a T_ORDREL_REQ, don't send
   2616 			 * any more.
   2617 			 */
   2618 			if (mir->mir_ordrel_pending) {
   2619 				freemsg(mp);
   2620 				mutex_exit(&mir->mir_mutex);
   2621 				return;
   2622 			}
   2623 
   2624 			/*
   2625 			 * If the stream is not idle, then we hold the
   2626 			 * orderly release until it becomes idle.  This
   2627 			 * ensures that KRPC will be able to reply to
   2628 			 * all requests that we have passed to it.
   2629 			 *
   2630 			 * We also queue the request if there is data already
   2631 			 * queued, because we cannot allow the T_ORDREL_REQ
   2632 			 * to go before data. When we had a separate reply
   2633 			 * count, this was not a problem, because the
   2634 			 * reply count was reconciled when mir_wsrv()
   2635 			 * completed.
   2636 			 */
   2637 			if (!MIR_SVC_QUIESCED(mir) ||
   2638 			    mir->mir_inwservice == 1) {
   2639 				mir->mir_inwservice = 1;
   2640 				(void) putq(q, mp);
   2641 
   2642 				RPCLOG(16, "mir_wput_other: queuing "
   2643 				    "T_ORDREL_REQ on 0x%p\n", (void *)q);
   2644 
   2645 				mutex_exit(&mir->mir_mutex);
   2646 				return;
   2647 			}
   2648 
   2649 			/*
   2650 			 * Mark the structure so that we know we sent
   2651 			 * an orderly release request, and reset the idle timer.
   2652 			 */
   2653 			mir->mir_ordrel_pending = 1;
   2654 
   2655 			RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start"
   2656 			    " on 0x%p because we got T_ORDREL_REQ\n",
   2657 			    (void *)q);
   2658 
   2659 			mir_svc_idle_start(q, mir);
   2660 			mutex_exit(&mir->mir_mutex);
   2661 
   2662 			/*
   2663 			 * When we break, we will putnext the T_ORDREL_REQ.
   2664 			 */
   2665 			break;
   2666 
   2667 		case T_CONN_REQ:
   2668 			mutex_enter(&mir->mir_mutex);
   2669 			if (mir->mir_head_mp != NULL) {
   2670 				freemsg(mir->mir_head_mp);
   2671 				mir->mir_head_mp = NULL;
   2672 				mir->mir_tail_mp = NULL;
   2673 			}
   2674 			mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
   2675 			/*
   2676 			 * Restart timer in case mir_clnt_idle_do_stop() was
   2677 			 * called.
   2678 			 */
   2679 			mir->mir_idle_timeout = clnt_idle_timeout;
   2680 			mir_clnt_idle_stop(q, mir);
   2681 			mir_clnt_idle_start(q, mir);
   2682 			mutex_exit(&mir->mir_mutex);
   2683 			break;
   2684 
   2685 		default:
   2686 			/*
   2687 			 * T_DISCON_REQ is one of the interesting default
   2688 			 * cases here. Ideally, an M_FLUSH is done before
   2689 			 * T_DISCON_REQ is done. However, that is somewhat
   2690 			 * cumbersome for clnt_cots.c to do. So we queue
   2691 			 * T_DISCON_REQ, and let the service procedure
   2692 			 * flush all M_DATA.
   2693 			 */
   2694 			break;
   2695 		}
   2696 		/* fallthru */;
   2697 	default:
   2698 		if (mp->b_datap->db_type >= QPCTL) {
   2699 			if (mp->b_datap->db_type == M_FLUSH) {
   2700 				if (mir->mir_type == RPC_CLIENT &&
   2701 				    *mp->b_rptr & FLUSHW) {
   2702 					RPCLOG(32, "mir_wput_other: flushing "
   2703 					    "wq 0x%p\n", (void *)q);
   2704 					if (*mp->b_rptr & FLUSHBAND) {
   2705 						flushband(q, *(mp->b_rptr + 1),
   2706 						    FLUSHDATA);
   2707 					} else {
   2708 						flushq(q, FLUSHDATA);
   2709 					}
   2710 				} else {
   2711 					RPCLOG(32, "mir_wput_other: ignoring "
   2712 					    "M_FLUSH on wq 0x%p\n", (void *)q);
   2713 				}
   2714 			}
   2715 			break;
   2716 		}
   2717 
   2718 		mutex_enter(&mir->mir_mutex);
   2719 		if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) {
   2720 			mutex_exit(&mir->mir_mutex);
   2721 			break;
   2722 		}
   2723 		mir->mir_inwservice = 1;
   2724 		mir->mir_inwflushdata = flush_in_svc;
   2725 		(void) putq(q, mp);
   2726 		mutex_exit(&mir->mir_mutex);
   2727 		qenable(q);
   2728 
   2729 		return;
   2730 	}
   2731 	putnext(q, mp);
   2732 }
   2733 
   2734 static void
   2735 mir_wsrv(queue_t *q)
   2736 {
   2737 	mblk_t	*mp;
   2738 	mir_t	*mir;
   2739 	bool_t flushdata;
   2740 
   2741 	mir = (mir_t *)q->q_ptr;
   2742 	mutex_enter(&mir->mir_mutex);
   2743 
   2744 	flushdata = mir->mir_inwflushdata;
   2745 	mir->mir_inwflushdata = 0;
   2746 
   2747 	while (mp = getq(q)) {
   2748 		if (mp->b_datap->db_type == M_DATA) {
   2749 			/*
   2750 			 * Do not send any more data if we have sent
   2751 			 * a T_ORDREL_REQ.
   2752 			 */
   2753 			if (flushdata || mir->mir_ordrel_pending == 1) {
   2754 				freemsg(mp);
   2755 				continue;
   2756 			}
   2757 
   2758 			/*
   2759 			 * Make sure that the stream can really handle more
   2760 			 * data.
   2761 			 */
   2762 			if (!MIR_WCANPUTNEXT(mir, q)) {
   2763 				(void) putbq(q, mp);
   2764 				mutex_exit(&mir->mir_mutex);
   2765 				return;
   2766 			}
   2767 
   2768 			/*
   2769 			 * Now we pass the RPC message downstream.
   2770 			 */
   2771 			mutex_exit(&mir->mir_mutex);
   2772 			putnext(q, mp);
   2773 			mutex_enter(&mir->mir_mutex);
   2774 			continue;
   2775 		}
   2776 
   2777 		/*
   2778 		 * This is not an RPC message, pass it downstream
   2779 		 * (ignoring flow control) if the server side is not sending a
   2780 		 * T_ORDREL_REQ downstream.
   2781 		 */
   2782 		if (mir->mir_type != RPC_SERVER ||
   2783 		    ((union T_primitives *)mp->b_rptr)->type !=
   2784 		    T_ORDREL_REQ) {
   2785 			mutex_exit(&mir->mir_mutex);
   2786 			putnext(q, mp);
   2787 			mutex_enter(&mir->mir_mutex);
   2788 			continue;
   2789 		}
   2790 
   2791 		if (mir->mir_ordrel_pending == 1) {
   2792 			/*
   2793 			 * Don't send two T_ORDRELs
   2794 			 */
   2795 			freemsg(mp);
   2796 			continue;
   2797 		}
   2798 
   2799 		/*
   2800 		 * Mark the structure so that we know we sent an orderly
   2801 		 * release request.  We will check to see slot is idle at the
   2802 		 * end of this routine, and if so, reset the idle timer to
   2803 		 * handle orderly release timeouts.
   2804 		 */
   2805 		mir->mir_ordrel_pending = 1;
   2806 		RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n",
   2807 		    (void *)q);
   2808 		/*
   2809 		 * Send the orderly release downstream. If there are other
   2810 		 * pending replies we won't be able to send them.  However,
   2811 		 * the only reason we should send the orderly release is if
   2812 		 * we were idle, or if an unusual event occurred.
   2813 		 */
   2814 		mutex_exit(&mir->mir_mutex);
   2815 		putnext(q, mp);
   2816 		mutex_enter(&mir->mir_mutex);
   2817 	}
   2818 
   2819 	if (q->q_first == NULL)
   2820 		/*
   2821 		 * If we call mir_svc_idle_start() below, then
   2822 		 * clearing mir_inwservice here will also result in
   2823 		 * any thread waiting in mir_close() to be signaled.
   2824 		 */
   2825 		mir->mir_inwservice = 0;
   2826 
   2827 	if (mir->mir_type != RPC_SERVER) {
   2828 		mutex_exit(&mir->mir_mutex);
   2829 		return;
   2830 	}
   2831 
   2832 	/*
   2833 	 * If idle we call mir_svc_idle_start to start the timer (or wakeup
   2834 	 * a close). Also make sure not to start the idle timer on the
   2835 	 * listener stream. This can cause nfsd to send an orderly release
   2836 	 * command on the listener stream.
   2837 	 */
   2838 	if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) {
   2839 		RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p "
   2840 		    "because mir slot is idle\n", (void *)q);
   2841 		mir_svc_idle_start(q, mir);
   2842 	}
   2843 
   2844 	/*
   2845 	 * If outbound flow control has been relieved, then allow new
   2846 	 * inbound requests to be processed.
   2847 	 */
   2848 	if (mir->mir_hold_inbound) {
   2849 		mir->mir_hold_inbound = 0;
   2850 		qenable(RD(q));
   2851 	}
   2852 	mutex_exit(&mir->mir_mutex);
   2853 }
   2854 
   2855 static void
   2856 mir_disconnect(queue_t *q, mir_t *mir)
   2857 {
   2858 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   2859 
   2860 	switch (mir->mir_type) {
   2861 	case RPC_CLIENT:
   2862 		/*
   2863 		 * We are disconnecting, but not necessarily
   2864 		 * closing. By not closing, we will fail to
   2865 		 * pick up a possibly changed global timeout value,
   2866 		 * unless we store it now.
   2867 		 */
   2868 		mir->mir_idle_timeout = clnt_idle_timeout;
   2869 		mir_clnt_idle_start(WR(q), mir);
   2870 		mutex_exit(&mir->mir_mutex);
   2871 
   2872 		/*
   2873 		 * T_DISCON_REQ is passed to KRPC as an integer value
   2874 		 * (this is not a TPI message).  It is used as a
   2875 		 * convenient value to indicate a sanity check
   2876 		 * failure -- the same KRPC routine is also called
   2877 		 * for T_DISCON_INDs and T_ORDREL_INDs.
   2878 		 */
   2879 		clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0);
   2880 		break;
   2881 
   2882 	case RPC_SERVER:
   2883 		mir->mir_svc_no_more_msgs = 1;
   2884 		mir_svc_idle_stop(WR(q), mir);
   2885 		mutex_exit(&mir->mir_mutex);
   2886 		RPCLOG(16, "mir_disconnect: telling "
   2887 		    "stream head listener to disconnect stream "
   2888 		    "(0x%p)\n", (void *) q);
   2889 		(void) mir_svc_policy_notify(q, 2);
   2890 		break;
   2891 
   2892 	default:
   2893 		mutex_exit(&mir->mir_mutex);
   2894 		break;
   2895 	}
   2896 }
   2897 
   2898 /*
   2899  * Sanity check the message length, and if it's too large, shutdown the
   2900  * connection.  Returns 1 if the connection is shutdown; 0 otherwise.
   2901  */
   2902 static int
   2903 mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp)
   2904 {
   2905 	mir_t *mir = q->q_ptr;
   2906 	uint_t maxsize = 0;
   2907 
   2908 	if (mir->mir_max_msg_sizep != NULL)
   2909 		maxsize = *mir->mir_max_msg_sizep;
   2910 
   2911 	if (maxsize == 0 || frag_len <= (int)maxsize)
   2912 		return (0);
   2913 
   2914 	freemsg(head_mp);
   2915 	mir->mir_head_mp = NULL;
   2916 	mir->mir_tail_mp = NULL;
   2917 	mir->mir_frag_header = 0;
   2918 	mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
   2919 	if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
   2920 		cmn_err(CE_NOTE,
   2921 		    "KRPC: record fragment from %s of size(%d) exceeds "
   2922 		    "maximum (%u). Disconnecting",
   2923 		    (mir->mir_type == RPC_CLIENT) ? "server" :
   2924 		    (mir->mir_type == RPC_SERVER) ? "client" :
   2925 		    "test tool", frag_len, maxsize);
   2926 	}
   2927 
   2928 	mir_disconnect(q, mir);
   2929 	return (1);
   2930 }
   2931