Home | History | Annotate | Download | only in rpc
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 /* Copyright (c) 1990 Mentat Inc. */
     26 
     27 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
     28 /*	  All Rights Reserved  	*/
     29 
     30 #pragma ident	"@(#)rpcmod.c	1.115	07/12/05 SMI"
     31 
     32 /*
     33  * Kernel RPC filtering module
     34  */
     35 
     36 #include <sys/param.h>
     37 #include <sys/types.h>
     38 #include <sys/stream.h>
     39 #include <sys/stropts.h>
     40 #include <sys/tihdr.h>
     41 #include <sys/timod.h>
     42 #include <sys/tiuser.h>
     43 #include <sys/debug.h>
     44 #include <sys/signal.h>
     45 #include <sys/pcb.h>
     46 #include <sys/user.h>
     47 #include <sys/errno.h>
     48 #include <sys/cred.h>
     49 #include <sys/policy.h>
     50 #include <sys/inline.h>
     51 #include <sys/cmn_err.h>
     52 #include <sys/kmem.h>
     53 #include <sys/file.h>
     54 #include <sys/sysmacros.h>
     55 #include <sys/systm.h>
     56 #include <sys/t_lock.h>
     57 #include <sys/ddi.h>
     58 #include <sys/vtrace.h>
     59 #include <sys/callb.h>
     60 #include <sys/strsun.h>
     61 
     62 #include <sys/strlog.h>
     63 #include <rpc/rpc_com.h>
     64 #include <inet/common.h>
     65 #include <rpc/types.h>
     66 #include <sys/time.h>
     67 #include <rpc/xdr.h>
     68 #include <rpc/auth.h>
     69 #include <rpc/clnt.h>
     70 #include <rpc/rpc_msg.h>
     71 #include <rpc/clnt.h>
     72 #include <rpc/svc.h>
     73 #include <rpc/rpcsys.h>
     74 #include <rpc/rpc_rdma.h>
     75 
     76 /*
     77  * This is the loadable module wrapper.
     78  */
     79 #include <sys/conf.h>
     80 #include <sys/modctl.h>
     81 #include <sys/syscall.h>
     82 
     83 extern struct streamtab rpcinfo;
     84 
     85 static struct fmodsw fsw = {
     86 	"rpcmod",
     87 	&rpcinfo,
     88 	D_NEW|D_MP,
     89 };
     90 
     91 /*
     92  * Module linkage information for the kernel.
     93  */
     94 
     95 static struct modlstrmod modlstrmod = {
     96 	&mod_strmodops, "rpc interface str mod", &fsw
     97 };
     98 
     99 /*
    100  * For the RPC system call.
    101  */
    102 static struct sysent rpcsysent = {
    103 	2,
    104 	SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD,
    105 	rpcsys
    106 };
    107 
    108 static struct modlsys modlsys = {
    109 	&mod_syscallops,
    110 	"RPC syscall",
    111 	&rpcsysent
    112 };
    113 
    114 #ifdef _SYSCALL32_IMPL
    115 static struct modlsys modlsys32 = {
    116 	&mod_syscallops32,
    117 	"32-bit RPC syscall",
    118 	&rpcsysent
    119 };
    120 #endif /* _SYSCALL32_IMPL */
    121 
    122 static struct modlinkage modlinkage = {
    123 	MODREV_1,
    124 	{
    125 		&modlsys,
    126 #ifdef _SYSCALL32_IMPL
    127 		&modlsys32,
    128 #endif
    129 		&modlstrmod,
    130 		NULL
    131 	}
    132 };
    133 
    134 int
    135 _init(void)
    136 {
    137 	int error = 0;
    138 	callb_id_t cid;
    139 	int status;
    140 
    141 	svc_init();
    142 	clnt_init();
    143 	cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc");
    144 
    145 	if (error = mod_install(&modlinkage)) {
    146 		/*
    147 		 * Could not install module, cleanup previous
    148 		 * initialization work.
    149 		 */
    150 		clnt_fini();
    151 		if (cid != NULL)
    152 			(void) callb_delete(cid);
    153 
    154 		return (error);
    155 	}
    156 
    157 	/*
    158 	 * Load up the RDMA plugins and initialize the stats. Even if the
    159 	 * plugins loadup fails, but rpcmod was successfully installed the
    160 	 * counters still get initialized.
    161 	 */
    162 	rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL);
    163 	mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL);
    164 	mt_kstat_init();
    165 
    166 	/*
    167 	 * Get our identification into ldi.  This is used for loading
    168 	 * other modules, e.g. rpcib.
    169 	 */
    170 	status = ldi_ident_from_mod(&modlinkage, &rpcmod_li);
    171 	if (status != 0) {
    172 		cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status);
    173 		rpcmod_li = NULL;
    174 	}
    175 
    176 	return (error);
    177 }
    178 
    179 /*
    180  * The unload entry point fails, because we advertise entry points into
    181  * rpcmod from the rest of kRPC: rpcmod_release().
    182  */
    183 int
    184 _fini(void)
    185 {
    186 	return (EBUSY);
    187 }
    188 
    189 int
    190 _info(struct modinfo *modinfop)
    191 {
    192 	return (mod_info(&modlinkage, modinfop));
    193 }
    194 
    195 extern int nulldev();
    196 
    197 #define	RPCMOD_ID	2049
    198 
    199 int rmm_open(), rmm_close();
    200 
    201 /*
    202  * To save instructions, since STREAMS ignores the return value
    203  * from these functions, they are defined as void here. Kind of icky, but...
    204  */
    205 void rmm_rput(queue_t *, mblk_t *);
    206 void rmm_wput(queue_t *, mblk_t *);
    207 void rmm_rsrv(queue_t *);
    208 void rmm_wsrv(queue_t *);
    209 
    210 int rpcmodopen(), rpcmodclose();
    211 void rpcmodrput(), rpcmodwput();
    212 void rpcmodrsrv(), rpcmodwsrv();
    213 
    214 static	void	rpcmodwput_other(queue_t *, mblk_t *);
    215 static	int	mir_close(queue_t *q);
    216 static	int	mir_open(queue_t *q, dev_t *devp, int flag, int sflag,
    217 		    cred_t *credp);
    218 static	void	mir_rput(queue_t *q, mblk_t *mp);
    219 static	void	mir_rsrv(queue_t *q);
    220 static	void	mir_wput(queue_t *q, mblk_t *mp);
    221 static	void	mir_wsrv(queue_t *q);
    222 
    223 static struct module_info rpcmod_info =
    224 	{RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
    225 
    226 /*
    227  * Read side has no service procedure.
    228  */
    229 static struct qinit rpcmodrinit = {
    230 	(int (*)())rmm_rput,
    231 	(int (*)())rmm_rsrv,
    232 	rmm_open,
    233 	rmm_close,
    234 	nulldev,
    235 	&rpcmod_info,
    236 	NULL
    237 };
    238 
    239 /*
    240  * The write put procedure is simply putnext to conserve stack space.
    241  * The write service procedure is not used to queue data, but instead to
    242  * synchronize with flow control.
    243  */
    244 static struct qinit rpcmodwinit = {
    245 	(int (*)())rmm_wput,
    246 	(int (*)())rmm_wsrv,
    247 	rmm_open,
    248 	rmm_close,
    249 	nulldev,
    250 	&rpcmod_info,
    251 	NULL
    252 };
    253 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL };
    254 
    255 struct xprt_style_ops {
    256 	int (*xo_open)();
    257 	int (*xo_close)();
    258 	void (*xo_wput)();
    259 	void (*xo_wsrv)();
    260 	void (*xo_rput)();
    261 	void (*xo_rsrv)();
    262 };
    263 
    264 static struct xprt_style_ops xprt_clts_ops = {
    265 	rpcmodopen,
    266 	rpcmodclose,
    267 	rpcmodwput,
    268 	rpcmodwsrv,
    269 	rpcmodrput,
    270 	NULL
    271 };
    272 
    273 static struct xprt_style_ops xprt_cots_ops = {
    274 	mir_open,
    275 	mir_close,
    276 	mir_wput,
    277 	mir_wsrv,
    278 	mir_rput,
    279 	mir_rsrv
    280 };
    281 
    282 /*
    283  * Per rpcmod "slot" data structure. q->q_ptr points to one of these.
    284  */
    285 struct rpcm {
    286 	void		*rm_krpc_cell;	/* Reserved for use by KRPC */
    287 	struct		xprt_style_ops	*rm_ops;
    288 	int		rm_type;	/* Client or server side stream */
    289 #define	RM_CLOSING	0x1		/* somebody is trying to close slot */
    290 	uint_t		rm_state;	/* state of the slot. see above */
    291 	uint_t		rm_ref;		/* cnt of external references to slot */
    292 	kmutex_t	rm_lock;	/* mutex protecting above fields */
    293 	kcondvar_t	rm_cwait;	/* condition for closing */
    294 	zoneid_t	rm_zoneid;	/* zone which pushed rpcmod */
    295 };
    296 
    297 struct temp_slot {
    298 	void *cell;
    299 	struct xprt_style_ops *ops;
    300 	int type;
    301 	mblk_t *info_ack;
    302 	kmutex_t lock;
    303 	kcondvar_t wait;
    304 };
    305 
    306 typedef struct mir_s {
    307 	void	*mir_krpc_cell;	/* Reserved for KRPC use. This field */
    308 					/* must be first in the structure. */
    309 	struct xprt_style_ops	*rm_ops;
    310 	int	mir_type;		/* Client or server side stream */
    311 
    312 	mblk_t	*mir_head_mp;		/* RPC msg in progress */
    313 		/*
    314 		 * mir_head_mp points the first mblk being collected in
    315 		 * the current RPC message.  Record headers are removed
    316 		 * before data is linked into mir_head_mp.
    317 		 */
    318 	mblk_t	*mir_tail_mp;		/* Last mblk in mir_head_mp */
    319 		/*
    320 		 * mir_tail_mp points to the last mblk in the message
    321 		 * chain starting at mir_head_mp.  It is only valid
    322 		 * if mir_head_mp is non-NULL and is used to add new
    323 		 * data blocks to the end of chain quickly.
    324 		 */
    325 
    326 	int32_t	mir_frag_len;		/* Bytes seen in the current frag */
    327 		/*
    328 		 * mir_frag_len starts at -4 for beginning of each fragment.
    329 		 * When this length is negative, it indicates the number of
    330 		 * bytes that rpcmod needs to complete the record marker
    331 		 * header.  When it is positive or zero, it holds the number
    332 		 * of bytes that have arrived for the current fragment and
    333 		 * are held in mir_header_mp.
    334 		 */
    335 
    336 	int32_t	mir_frag_header;
    337 		/*
    338 		 * Fragment header as collected for the current fragment.
    339 		 * It holds the last-fragment indicator and the number
    340 		 * of bytes in the fragment.
    341 		 */
    342 
    343 	unsigned int
    344 		mir_ordrel_pending : 1,	/* Sent T_ORDREL_REQ */
    345 		mir_hold_inbound : 1,	/* Hold inbound messages on server */
    346 					/* side until outbound flow control */
    347 					/* is relieved. */
    348 		mir_closing : 1,	/* The stream is being closed */
    349 		mir_inrservice : 1,	/* data queued or rd srv proc running */
    350 		mir_inwservice : 1,	/* data queued or wr srv proc running */
    351 		mir_inwflushdata : 1,	/* flush M_DATAs when srv runs */
    352 		/*
    353 		 * On client streams, mir_clntreq is 0 or 1; it is set
    354 		 * to 1 whenever a new request is sent out (mir_wput)
    355 		 * and cleared when the timer fires (mir_timer).  If
    356 		 * the timer fires with this value equal to 0, then the
    357 		 * stream is considered idle and KRPC is notified.
    358 		 */
    359 		mir_clntreq : 1,
    360 		/*
    361 		 * On server streams, stop accepting messages
    362 		 */
    363 		mir_svc_no_more_msgs : 1,
    364 		mir_listen_stream : 1,	/* listen end point */
    365 		mir_unused : 1,	/* no longer used */
    366 		mir_timer_call : 1,
    367 		mir_junk_fill_thru_bit_31 : 21;
    368 
    369 	int	mir_setup_complete;	/* server has initialized everything */
    370 	timeout_id_t mir_timer_id;	/* Timer for idle checks */
    371 	clock_t	mir_idle_timeout;	/* Allowed idle time before shutdown */
    372 		/*
    373 		 * This value is copied from clnt_idle_timeout or
    374 		 * svc_idle_timeout during the appropriate ioctl.
    375 		 * Kept in milliseconds
    376 		 */
    377 	clock_t	mir_use_timestamp;	/* updated on client with each use */
    378 		/*
    379 		 * This value is set to lbolt
    380 		 * every time a client stream sends or receives data.
    381 		 * Even if the timer message arrives, we don't shutdown
    382 		 * client unless:
    383 		 *    lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp.
    384 		 * This value is kept in HZ.
    385 		 */
    386 
    387 	uint_t	*mir_max_msg_sizep;	/* Reference to sanity check size */
    388 		/*
    389 		 * This pointer is set to &clnt_max_msg_size or
    390 		 * &svc_max_msg_size during the appropriate ioctl.
    391 		 */
    392 	zoneid_t mir_zoneid;	/* zone which pushed rpcmod */
    393 	/* Server-side fields. */
    394 	int	mir_ref_cnt;		/* Reference count: server side only */
    395 					/* counts the number of references */
    396 					/* that a kernel RPC server thread */
    397 					/* (see svc_run()) has on this rpcmod */
    398 					/* slot. Effectively, it is the */
    399 					/* number * of unprocessed messages */
    400 					/* that have been passed up to the */
    401 					/* KRPC layer */
    402 
    403 	mblk_t	*mir_svc_pend_mp;	/* Pending T_ORDREL_IND or */
    404 					/* T_DISCON_IND */
    405 
    406 	/*
    407 	 * these fields are for both client and server, but for debugging,
    408 	 * it is easier to have these last in the structure.
    409 	 */
    410 	kmutex_t	mir_mutex;	/* Mutex and condvar for close */
    411 	kcondvar_t	mir_condvar;	/* synchronization. */
    412 	kcondvar_t	mir_timer_cv;	/* Timer routine sync. */
    413 } mir_t;
    414 
    415 void tmp_rput(queue_t *q, mblk_t *mp);
    416 
    417 struct xprt_style_ops tmpops = {
    418 	NULL,
    419 	NULL,
    420 	putnext,
    421 	NULL,
    422 	tmp_rput,
    423 	NULL
    424 };
    425 
    426 void
    427 tmp_rput(queue_t *q, mblk_t *mp)
    428 {
    429 	struct temp_slot *t = (struct temp_slot *)(q->q_ptr);
    430 	struct T_info_ack *pptr;
    431 
    432 	switch (mp->b_datap->db_type) {
    433 	case M_PCPROTO:
    434 		pptr = (struct T_info_ack *)mp->b_rptr;
    435 		switch (pptr->PRIM_type) {
    436 		case T_INFO_ACK:
    437 			mutex_enter(&t->lock);
    438 			t->info_ack = mp;
    439 			cv_signal(&t->wait);
    440 			mutex_exit(&t->lock);
    441 			return;
    442 		default:
    443 			break;
    444 		}
    445 	default:
    446 		break;
    447 	}
    448 
    449 	/*
    450 	 * Not an info-ack, so free it. This is ok because we should
    451 	 * not be receiving data until the open finishes: rpcmod
    452 	 * is pushed well before the end-point is bound to an address.
    453 	 */
    454 	freemsg(mp);
    455 }
    456 
    457 int
    458 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
    459 {
    460 	mblk_t *bp;
    461 	struct temp_slot ts, *t;
    462 	struct T_info_ack *pptr;
    463 	int error = 0;
    464 
    465 	ASSERT(q != NULL);
    466 	/*
    467 	 * Check for re-opens.
    468 	 */
    469 	if (q->q_ptr) {
    470 		TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END,
    471 		    "rpcmodopen_end:(%s)", "q->qptr");
    472 		return (0);
    473 	}
    474 
    475 	t = &ts;
    476 	bzero(t, sizeof (*t));
    477 	q->q_ptr = (void *)t;
    478 	WR(q)->q_ptr = (void *)t;
    479 
    480 	/*
    481 	 * Allocate the required messages upfront.
    482 	 */
    483 	if ((bp = allocb(sizeof (struct T_info_req) +
    484 	    sizeof (struct T_info_ack), BPRI_LO)) == (mblk_t *)NULL) {
    485 		return (ENOBUFS);
    486 	}
    487 
    488 	mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL);
    489 	cv_init(&t->wait, NULL, CV_DEFAULT, NULL);
    490 
    491 	t->ops = &tmpops;
    492 
    493 	qprocson(q);
    494 	bp->b_datap->db_type = M_PCPROTO;
    495 	*(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ;
    496 	bp->b_wptr += sizeof (struct T_info_req);
    497 	putnext(WR(q), bp);
    498 
    499 	mutex_enter(&t->lock);
    500 	while (t->info_ack == NULL) {
    501 		if (cv_wait_sig(&t->wait, &t->lock) == 0) {
    502 			error = EINTR;
    503 			break;
    504 		}
    505 	}
    506 	mutex_exit(&t->lock);
    507 
    508 	if (error)
    509 		goto out;
    510 
    511 	pptr = (struct T_info_ack *)t->info_ack->b_rptr;
    512 
    513 	if (pptr->SERV_type == T_CLTS) {
    514 		if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0)
    515 			((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops;
    516 	} else {
    517 		if ((error = mir_open(q, devp, flag, sflag, crp)) == 0)
    518 			((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops;
    519 	}
    520 
    521 out:
    522 	if (error)
    523 		qprocsoff(q);
    524 
    525 	freemsg(t->info_ack);
    526 	mutex_destroy(&t->lock);
    527 	cv_destroy(&t->wait);
    528 
    529 	return (error);
    530 }
    531 
    532 void
    533 rmm_rput(queue_t *q, mblk_t  *mp)
    534 {
    535 	(*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp);
    536 }
    537 
    538 void
    539 rmm_rsrv(queue_t *q)
    540 {
    541 	(*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q);
    542 }
    543 
    544 void
    545 rmm_wput(queue_t *q, mblk_t *mp)
    546 {
    547 	(*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp);
    548 }
    549 
    550 void
    551 rmm_wsrv(queue_t *q)
    552 {
    553 	(*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q);
    554 }
    555 
    556 int
    557 rmm_close(queue_t *q, int flag, cred_t *crp)
    558 {
    559 	return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
    560 }
    561 
    562 /*
    563  * rpcmodopen -	open routine gets called when the module gets pushed
    564  *		onto the stream.
    565  */
    566 /*ARGSUSED*/
    567 int
    568 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
    569 {
    570 	struct rpcm *rmp;
    571 
    572 	extern void (*rpc_rele)(queue_t *, mblk_t *);
    573 	static void rpcmod_release(queue_t *, mblk_t *);
    574 
    575 	TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
    576 
    577 	/*
    578 	 * Initialize entry points to release a rpcmod slot (and an input
    579 	 * message if supplied) and to send an output message to the module
    580 	 * below rpcmod.
    581 	 */
    582 	if (rpc_rele == NULL)
    583 		rpc_rele = rpcmod_release;
    584 
    585 	/*
    586 	 * Only sufficiently privileged users can use this module, and it
    587 	 * is assumed that they will use this module properly, and NOT send
    588 	 * bulk data from downstream.
    589 	 */
    590 	if (secpolicy_rpcmod_open(crp) != 0)
    591 		return (EPERM);
    592 
    593 	/*
    594 	 * Allocate slot data structure.
    595 	 */
    596 	rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP);
    597 
    598 	mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL);
    599 	cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL);
    600 	rmp->rm_zoneid = rpc_zoneid();
    601 	/*
    602 	 * slot type will be set by kRPC client and server ioctl's
    603 	 */
    604 	rmp->rm_type = 0;
    605 
    606 	q->q_ptr = (void *)rmp;
    607 	WR(q)->q_ptr = (void *)rmp;
    608 
    609 	TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end");
    610 	return (0);
    611 }
    612 
    613 /*
    614  * rpcmodclose - This routine gets called when the module gets popped
    615  * off of the stream.
    616  */
    617 /*ARGSUSED*/
    618 int
    619 rpcmodclose(queue_t *q, int flag, cred_t *crp)
    620 {
    621 	struct rpcm *rmp;
    622 
    623 	ASSERT(q != NULL);
    624 	rmp = (struct rpcm *)q->q_ptr;
    625 
    626 	/*
    627 	 * Mark our state as closing.
    628 	 */
    629 	mutex_enter(&rmp->rm_lock);
    630 	rmp->rm_state |= RM_CLOSING;
    631 
    632 	/*
    633 	 * Check and see if there are any messages on the queue.  If so, send
    634 	 * the messages, regardless whether the downstream module is ready to
    635 	 * accept data.
    636 	 */
    637 	if (rmp->rm_type == RPC_SERVER) {
    638 		flushq(q, FLUSHDATA);
    639 
    640 		qenable(WR(q));
    641 
    642 		if (rmp->rm_ref) {
    643 			mutex_exit(&rmp->rm_lock);
    644 			/*
    645 			 * call into SVC to clean the queue
    646 			 */
    647 			svc_queueclean(q);
    648 			mutex_enter(&rmp->rm_lock);
    649 
    650 			/*
    651 			 * Block while there are kRPC threads with a reference
    652 			 * to this message.
    653 			 */
    654 			while (rmp->rm_ref)
    655 				cv_wait(&rmp->rm_cwait, &rmp->rm_lock);
    656 		}
    657 
    658 		mutex_exit(&rmp->rm_lock);
    659 
    660 		/*
    661 		 * It is now safe to remove this queue from the stream. No kRPC
    662 		 * threads have a reference to the stream, and none ever will,
    663 		 * because RM_CLOSING is set.
    664 		 */
    665 		qprocsoff(q);
    666 
    667 		/* Notify kRPC that this stream is going away. */
    668 		svc_queueclose(q);
    669 	} else {
    670 		mutex_exit(&rmp->rm_lock);
    671 		qprocsoff(q);
    672 	}
    673 
    674 	q->q_ptr = NULL;
    675 	WR(q)->q_ptr = NULL;
    676 	mutex_destroy(&rmp->rm_lock);
    677 	cv_destroy(&rmp->rm_cwait);
    678 	kmem_free(rmp, sizeof (*rmp));
    679 	return (0);
    680 }
    681 
    682 #ifdef	DEBUG
    683 int	rpcmod_send_msg_up = 0;
    684 int	rpcmod_send_uderr = 0;
    685 int	rpcmod_send_dup = 0;
    686 int	rpcmod_send_dup_cnt = 0;
    687 #endif
    688 
    689 /*
    690  * rpcmodrput -	Module read put procedure.  This is called from
    691  *		the module, driver, or stream head downstream.
    692  */
    693 void
    694 rpcmodrput(queue_t *q, mblk_t *mp)
    695 {
    696 	struct rpcm *rmp;
    697 	union T_primitives *pptr;
    698 	int hdrsz;
    699 
    700 	TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:");
    701 
    702 	ASSERT(q != NULL);
    703 	rmp = (struct rpcm *)q->q_ptr;
    704 
    705 	if (rmp->rm_type == 0) {
    706 		freemsg(mp);
    707 		return;
    708 	}
    709 
    710 #ifdef DEBUG
    711 	if (rpcmod_send_msg_up > 0) {
    712 		mblk_t *nmp = copymsg(mp);
    713 		if (nmp) {
    714 			putnext(q, nmp);
    715 			rpcmod_send_msg_up--;
    716 		}
    717 	}
    718 	if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) {
    719 		mblk_t *nmp;
    720 		struct T_unitdata_ind *data;
    721 		struct T_uderror_ind *ud;
    722 		int d;
    723 		data = (struct T_unitdata_ind *)mp->b_rptr;
    724 		if (data->PRIM_type == T_UNITDATA_IND) {
    725 			d = sizeof (*ud) - sizeof (*data);
    726 			nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI);
    727 			if (nmp) {
    728 				ud = (struct T_uderror_ind *)nmp->b_rptr;
    729 				ud->PRIM_type = T_UDERROR_IND;
    730 				ud->DEST_length = data->SRC_length;
    731 				ud->DEST_offset = data->SRC_offset + d;
    732 				ud->OPT_length = data->OPT_length;
    733 				ud->OPT_offset = data->OPT_offset + d;
    734 				ud->ERROR_type = ENETDOWN;
    735 				if (data->SRC_length) {
    736 					bcopy(mp->b_rptr +
    737 					    data->SRC_offset,
    738 					    nmp->b_rptr +
    739 					    ud->DEST_offset,
    740 					    data->SRC_length);
    741 				}
    742 				if (data->OPT_length) {
    743 					bcopy(mp->b_rptr +
    744 					    data->OPT_offset,
    745 					    nmp->b_rptr +
    746 					    ud->OPT_offset,
    747 					    data->OPT_length);
    748 				}
    749 				nmp->b_wptr += d;
    750 				nmp->b_wptr += (mp->b_wptr - mp->b_rptr);
    751 				nmp->b_datap->db_type = M_PROTO;
    752 				putnext(q, nmp);
    753 				rpcmod_send_uderr--;
    754 			}
    755 		}
    756 	}
    757 #endif
    758 	switch (mp->b_datap->db_type) {
    759 	default:
    760 		putnext(q, mp);
    761 		break;
    762 
    763 	case M_PROTO:
    764 	case M_PCPROTO:
    765 		ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t));
    766 		pptr = (union T_primitives *)mp->b_rptr;
    767 
    768 		/*
    769 		 * Forward this message to krpc if it is data.
    770 		 */
    771 		if (pptr->type == T_UNITDATA_IND) {
    772 			mblk_t *nmp;
    773 
    774 		/*
    775 		 * Check if the module is being popped.
    776 		 */
    777 			mutex_enter(&rmp->rm_lock);
    778 			if (rmp->rm_state & RM_CLOSING) {
    779 				mutex_exit(&rmp->rm_lock);
    780 				putnext(q, mp);
    781 				break;
    782 			}
    783 
    784 			switch (rmp->rm_type) {
    785 			case RPC_CLIENT:
    786 				mutex_exit(&rmp->rm_lock);
    787 				hdrsz = mp->b_wptr - mp->b_rptr;
    788 
    789 				/*
    790 				 * Make sure the header is sane.
    791 				 */
    792 				if (hdrsz < TUNITDATAINDSZ ||
    793 				    hdrsz < (pptr->unitdata_ind.OPT_length +
    794 				    pptr->unitdata_ind.OPT_offset) ||
    795 				    hdrsz < (pptr->unitdata_ind.SRC_length +
    796 				    pptr->unitdata_ind.SRC_offset)) {
    797 					freemsg(mp);
    798 					return;
    799 				}
    800 
    801 				/*
    802 				 * Call clnt_clts_dispatch_notify, so that it
    803 				 * can pass the message to the proper caller.
    804 				 * Don't discard the header just yet since the
    805 				 * client may need the sender's address.
    806 				 */
    807 				clnt_clts_dispatch_notify(mp, hdrsz,
    808 				    rmp->rm_zoneid);
    809 				return;
    810 			case RPC_SERVER:
    811 				/*
    812 				 * rm_krpc_cell is exclusively used by the kRPC
    813 				 * CLTS server
    814 				 */
    815 				if (rmp->rm_krpc_cell) {
    816 #ifdef DEBUG
    817 					/*
    818 					 * Test duplicate request cache and
    819 					 * rm_ref count handling by sending a
    820 					 * duplicate every so often, if
    821 					 * desired.
    822 					 */
    823 					if (rpcmod_send_dup &&
    824 					    rpcmod_send_dup_cnt++ %
    825 					    rpcmod_send_dup)
    826 						nmp = copymsg(mp);
    827 					else
    828 						nmp = NULL;
    829 #endif
    830 					/*
    831 					 * Raise the reference count on this
    832 					 * module to prevent it from being
    833 					 * popped before krpc generates the
    834 					 * reply.
    835 					 */
    836 					rmp->rm_ref++;
    837 					mutex_exit(&rmp->rm_lock);
    838 
    839 					/*
    840 					 * Submit the message to krpc.
    841 					 */
    842 					svc_queuereq(q, mp);
    843 #ifdef DEBUG
    844 					/*
    845 					 * Send duplicate if we created one.
    846 					 */
    847 					if (nmp) {
    848 						mutex_enter(&rmp->rm_lock);
    849 						rmp->rm_ref++;
    850 						mutex_exit(&rmp->rm_lock);
    851 						svc_queuereq(q, nmp);
    852 					}
    853 #endif
    854 				} else {
    855 					mutex_exit(&rmp->rm_lock);
    856 					freemsg(mp);
    857 				}
    858 				return;
    859 			default:
    860 				mutex_exit(&rmp->rm_lock);
    861 				freemsg(mp);
    862 				return;
    863 			} /* end switch(rmp->rm_type) */
    864 		} else if (pptr->type == T_UDERROR_IND) {
    865 			mutex_enter(&rmp->rm_lock);
    866 			hdrsz = mp->b_wptr - mp->b_rptr;
    867 
    868 			/*
    869 			 * Make sure the header is sane
    870 			 */
    871 			if (hdrsz < TUDERRORINDSZ ||
    872 			    hdrsz < (pptr->uderror_ind.OPT_length +
    873 			    pptr->uderror_ind.OPT_offset) ||
    874 			    hdrsz < (pptr->uderror_ind.DEST_length +
    875 			    pptr->uderror_ind.DEST_offset)) {
    876 				mutex_exit(&rmp->rm_lock);
    877 				freemsg(mp);
    878 				return;
    879 			}
    880 
    881 			/*
    882 			 * In the case where a unit data error has been
    883 			 * received, all we need to do is clear the message from
    884 			 * the queue.
    885 			 */
    886 			mutex_exit(&rmp->rm_lock);
    887 			freemsg(mp);
    888 			RPCLOG(32, "rpcmodrput: unitdata error received at "
    889 			    "%ld\n", gethrestime_sec());
    890 			return;
    891 		} /* end else if (pptr->type == T_UDERROR_IND) */
    892 
    893 		putnext(q, mp);
    894 		break;
    895 	} /* end switch (mp->b_datap->db_type) */
    896 
    897 	TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END,
    898 	    "rpcmodrput_end:");
    899 	/*
    900 	 * Return codes are not looked at by the STREAMS framework.
    901 	 */
    902 }
    903 
    904 /*
    905  * write put procedure
    906  */
    907 void
    908 rpcmodwput(queue_t *q, mblk_t *mp)
    909 {
    910 	struct rpcm	*rmp;
    911 
    912 	ASSERT(q != NULL);
    913 
    914 	switch (mp->b_datap->db_type) {
    915 		case M_PROTO:
    916 		case M_PCPROTO:
    917 			break;
    918 		default:
    919 			rpcmodwput_other(q, mp);
    920 			return;
    921 	}
    922 
    923 	/*
    924 	 * Check to see if we can send the message downstream.
    925 	 */
    926 	if (canputnext(q)) {
    927 		putnext(q, mp);
    928 		return;
    929 	}
    930 
    931 	rmp = (struct rpcm *)q->q_ptr;
    932 	ASSERT(rmp != NULL);
    933 
    934 	/*
    935 	 * The first canputnext failed.  Try again except this time with the
    936 	 * lock held, so that we can check the state of the stream to see if
    937 	 * it is closing.  If either of these conditions evaluate to true
    938 	 * then send the meesage.
    939 	 */
    940 	mutex_enter(&rmp->rm_lock);
    941 	if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) {
    942 		mutex_exit(&rmp->rm_lock);
    943 		putnext(q, mp);
    944 	} else {
    945 		/*
    946 		 * canputnext failed again and the stream is not closing.
    947 		 * Place the message on the queue and let the service
    948 		 * procedure handle the message.
    949 		 */
    950 		mutex_exit(&rmp->rm_lock);
    951 		(void) putq(q, mp);
    952 	}
    953 }
    954 
    955 static void
    956 rpcmodwput_other(queue_t *q, mblk_t *mp)
    957 {
    958 	struct rpcm	*rmp;
    959 	struct iocblk	*iocp;
    960 
    961 	rmp = (struct rpcm *)q->q_ptr;
    962 	ASSERT(rmp != NULL);
    963 
    964 	switch (mp->b_datap->db_type) {
    965 		case M_IOCTL:
    966 			iocp = (struct iocblk *)mp->b_rptr;
    967 			ASSERT(iocp != NULL);
    968 			switch (iocp->ioc_cmd) {
    969 				case RPC_CLIENT:
    970 				case RPC_SERVER:
    971 					mutex_enter(&rmp->rm_lock);
    972 					rmp->rm_type = iocp->ioc_cmd;
    973 					mutex_exit(&rmp->rm_lock);
    974 					mp->b_datap->db_type = M_IOCACK;
    975 					qreply(q, mp);
    976 					return;
    977 				default:
    978 				/*
    979 				 * pass the ioctl downstream and hope someone
    980 				 * down there knows how to handle it.
    981 				 */
    982 					putnext(q, mp);
    983 					return;
    984 			}
    985 		default:
    986 			break;
    987 	}
    988 	/*
    989 	 * This is something we definitely do not know how to handle, just
    990 	 * pass the message downstream
    991 	 */
    992 	putnext(q, mp);
    993 }
    994 
    995 /*
    996  * Module write service procedure. This is called by downstream modules
    997  * for back enabling during flow control.
    998  */
    999 void
   1000 rpcmodwsrv(queue_t *q)
   1001 {
   1002 	struct rpcm	*rmp;
   1003 	mblk_t		*mp = NULL;
   1004 
   1005 	rmp = (struct rpcm *)q->q_ptr;
   1006 	ASSERT(rmp != NULL);
   1007 
   1008 	/*
   1009 	 * Get messages that may be queued and send them down stream
   1010 	 */
   1011 	while ((mp = getq(q)) != NULL) {
   1012 		/*
   1013 		 * Optimize the service procedure for the server-side, by
   1014 		 * avoiding a call to canputnext().
   1015 		 */
   1016 		if (rmp->rm_type == RPC_SERVER || canputnext(q)) {
   1017 			putnext(q, mp);
   1018 			continue;
   1019 		}
   1020 		(void) putbq(q, mp);
   1021 		return;
   1022 	}
   1023 }
   1024 
   1025 static void
   1026 rpcmod_release(queue_t *q, mblk_t *bp)
   1027 {
   1028 	struct rpcm *rmp;
   1029 
   1030 	/*
   1031 	 * For now, just free the message.
   1032 	 */
   1033 	if (bp)
   1034 		freemsg(bp);
   1035 	rmp = (struct rpcm *)q->q_ptr;
   1036 
   1037 	mutex_enter(&rmp->rm_lock);
   1038 	rmp->rm_ref--;
   1039 
   1040 	if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) {
   1041 		cv_broadcast(&rmp->rm_cwait);
   1042 	}
   1043 
   1044 	mutex_exit(&rmp->rm_lock);
   1045 }
   1046 
   1047 /*
   1048  * This part of rpcmod is pushed on a connection-oriented transport for use
   1049  * by RPC.  It serves to bypass the Stream head, implements
   1050  * the record marking protocol, and dispatches incoming RPC messages.
   1051  */
   1052 
   1053 /* Default idle timer values */
   1054 #define	MIR_CLNT_IDLE_TIMEOUT	(5 * (60 * 1000L))	/* 5 minutes */
   1055 #define	MIR_SVC_IDLE_TIMEOUT	(6 * (60 * 1000L))	/* 6 minutes */
   1056 #define	MIR_SVC_ORDREL_TIMEOUT	(10 * (60 * 1000L))	/* 10 minutes */
   1057 #define	MIR_LASTFRAG	0x80000000	/* Record marker */
   1058 
   1059 #define	DLEN(mp) (mp->b_cont ? msgdsize(mp) : (mp->b_wptr - mp->b_rptr))
   1060 
   1061 #define	MIR_SVC_QUIESCED(mir)	\
   1062 	(mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0)
   1063 
   1064 #define	MIR_CLEAR_INRSRV(mir_ptr)	{	\
   1065 	(mir_ptr)->mir_inrservice = 0;	\
   1066 	if ((mir_ptr)->mir_type == RPC_SERVER &&	\
   1067 		(mir_ptr)->mir_closing)	\
   1068 		cv_signal(&(mir_ptr)->mir_condvar);	\
   1069 }
   1070 
   1071 /*
   1072  * Don't block service procedure (and mir_close) if
   1073  * we are in the process of closing.
   1074  */
   1075 #define	MIR_WCANPUTNEXT(mir_ptr, write_q)	\
   1076 	(canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1))
   1077 
   1078 static int	mir_clnt_dup_request(queue_t *q, mblk_t *mp);
   1079 static void	mir_rput_proto(queue_t *q, mblk_t *mp);
   1080 static int	mir_svc_policy_notify(queue_t *q, int event);
   1081 static void	mir_svc_release(queue_t *wq, mblk_t *mp);
   1082 static void	mir_svc_start(queue_t *wq);
   1083 static void	mir_svc_idle_start(queue_t *, mir_t *);
   1084 static void	mir_svc_idle_stop(queue_t *, mir_t *);
   1085 static void	mir_svc_start_close(queue_t *, mir_t *);
   1086 static void	mir_clnt_idle_do_stop(queue_t *);
   1087 static void	mir_clnt_idle_stop(queue_t *, mir_t *);
   1088 static void	mir_clnt_idle_start(queue_t *, mir_t *);
   1089 static void	mir_wput(queue_t *q, mblk_t *mp);
   1090 static void	mir_wput_other(queue_t *q, mblk_t *mp);
   1091 static void	mir_wsrv(queue_t *q);
   1092 static	void	mir_disconnect(queue_t *, mir_t *ir);
   1093 static	int	mir_check_len(queue_t *, int32_t, mblk_t *);
   1094 static	void	mir_timer(void *);
   1095 
   1096 extern void	(*mir_rele)(queue_t *, mblk_t *);
   1097 extern void	(*mir_start)(queue_t *);
   1098 extern void	(*clnt_stop_idle)(queue_t *);
   1099 
   1100 clock_t	clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT;
   1101 clock_t	svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT;
   1102 
   1103 /*
   1104  * Timeout for subsequent notifications of idle connection.  This is
   1105  * typically used to clean up after a wedged orderly release.
   1106  */
   1107 clock_t	svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */
   1108 
   1109 extern	uint_t	*clnt_max_msg_sizep;
   1110 extern	uint_t	*svc_max_msg_sizep;
   1111 uint_t	clnt_max_msg_size = RPC_MAXDATASIZE;
   1112 uint_t	svc_max_msg_size = RPC_MAXDATASIZE;
   1113 uint_t	mir_krpc_cell_null;
   1114 
   1115 static void
   1116 mir_timer_stop(mir_t *mir)
   1117 {
   1118 	timeout_id_t tid;
   1119 
   1120 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   1121 
   1122 	/*
   1123 	 * Since the mir_mutex lock needs to be released to call
   1124 	 * untimeout(), we need to make sure that no other thread
   1125 	 * can start/stop the timer (changing mir_timer_id) during
   1126 	 * that time.  The mir_timer_call bit and the mir_timer_cv
   1127 	 * condition variable are used to synchronize this.  Setting
   1128 	 * mir_timer_call also tells mir_timer() (refer to the comments
   1129 	 * in mir_timer()) that it does not need to do anything.
   1130 	 */
   1131 	while (mir->mir_timer_call)
   1132 		cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
   1133 	mir->mir_timer_call = B_TRUE;
   1134 
   1135 	if ((tid = mir->mir_timer_id) != 0) {
   1136 		mir->mir_timer_id = 0;
   1137 		mutex_exit(&mir->mir_mutex);
   1138 		(void) untimeout(tid);
   1139 		mutex_enter(&mir->mir_mutex);
   1140 	}
   1141 	mir->mir_timer_call = B_FALSE;
   1142 	cv_broadcast(&mir->mir_timer_cv);
   1143 }
   1144 
   1145 static void
   1146 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl)
   1147 {
   1148 	timeout_id_t tid;
   1149 
   1150 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   1151 
   1152 	while (mir->mir_timer_call)
   1153 		cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
   1154 	mir->mir_timer_call = B_TRUE;
   1155 
   1156 	if ((tid = mir->mir_timer_id) != 0) {
   1157 		mutex_exit(&mir->mir_mutex);
   1158 		(void) untimeout(tid);
   1159 		mutex_enter(&mir->mir_mutex);
   1160 	}
   1161 	/* Only start the timer when it is not closing. */
   1162 	if (!mir->mir_closing) {
   1163 		mir->mir_timer_id = timeout(mir_timer, q,
   1164 		    MSEC_TO_TICK(intrvl));
   1165 	}
   1166 	mir->mir_timer_call = B_FALSE;
   1167 	cv_broadcast(&mir->mir_timer_cv);
   1168 }
   1169 
   1170 static int
   1171 mir_clnt_dup_request(queue_t *q, mblk_t *mp)
   1172 {
   1173 	mblk_t  *mp1;
   1174 	uint32_t  new_xid;
   1175 	uint32_t  old_xid;
   1176 
   1177 	ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex));
   1178 	new_xid = BE32_TO_U32(&mp->b_rptr[4]);
   1179 	/*
   1180 	 * This loop is a bit tacky -- it walks the STREAMS list of
   1181 	 * flow-controlled messages.
   1182 	 */
   1183 	if ((mp1 = q->q_first) != NULL) {
   1184 		do {
   1185 			old_xid = BE32_TO_U32(&mp1->b_rptr[4]);
   1186 			if (