Home | History | Annotate | Download | only in os
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
     27 /*	  All Rights Reserved  	*/
     28 
     29 
     30 #pragma ident	"@(#)msg.c	1.71	07/12/26 SMI"
     31 
     32 /*
     33  * Inter-Process Communication Message Facility.
     34  *
     35  * See os/ipc.c for a description of common IPC functionality.
     36  *
     37  * Resource controls
     38  * -----------------
     39  *
     40  * Control:      zone.max-msg-ids (rc_zone_msgmni)
     41  * Description:  Maximum number of message queue ids allowed a zone.
     42  *
     43  *   When msgget() is used to allocate a message queue, one id is
     44  *   allocated.  If the id allocation doesn't succeed, msgget() fails
     45  *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
     46  *   the id is deallocated.
     47  *
     48  * Control:      project.max-msg-ids (rc_project_msgmni)
     49  * Description:  Maximum number of message queue ids allowed a project.
     50  *
     51  *   When msgget() is used to allocate a message queue, one id is
     52  *   allocated.  If the id allocation doesn't succeed, msgget() fails
     53  *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
     54  *   the id is deallocated.
     55  *
     56  * Control:      process.max-msg-qbytes (rc_process_msgmnb)
     57  * Description:  Maximum number of bytes of messages on a message queue.
     58  *
     59  *   When msgget() successfully allocates a message queue, the minimum
     60  *   enforced value of this limit is used to initialize msg_qbytes.
     61  *
     62  * Control:      process.max-msg-messages (rc_process_msgtql)
     63  * Description:  Maximum number of messages on a message queue.
     64  *
     65  *   When msgget() successfully allocates a message queue, the minimum
     66  *   enforced value of this limit is used to initialize a per-queue
     67  *   limit on the number of messages.
     68  */
     69 
     70 #include <sys/types.h>
     71 #include <sys/t_lock.h>
     72 #include <sys/param.h>
     73 #include <sys/cred.h>
     74 #include <sys/user.h>
     75 #include <sys/proc.h>
     76 #include <sys/time.h>
     77 #include <sys/ipc.h>
     78 #include <sys/ipc_impl.h>
     79 #include <sys/msg.h>
     80 #include <sys/msg_impl.h>
     81 #include <sys/list.h>
     82 #include <sys/systm.h>
     83 #include <sys/sysmacros.h>
     84 #include <sys/cpuvar.h>
     85 #include <sys/kmem.h>
     86 #include <sys/ddi.h>
     87 #include <sys/errno.h>
     88 #include <sys/cmn_err.h>
     89 #include <sys/debug.h>
     90 #include <sys/project.h>
     91 #include <sys/modctl.h>
     92 #include <sys/syscall.h>
     93 #include <sys/policy.h>
     94 #include <sys/zone.h>
     95 
     96 #include <c2/audit.h>
     97 
     98 /*
     99  * The following tunables are obsolete.  Though for compatibility we
    100  * still read and interpret msginfo_msgmnb, msginfo_msgmni, and
    101  * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
    102  * mechanism for administrating the IPC Message facility is through the
    103  * resource controls described at the top of this file.
    104  */
    105 size_t	msginfo_msgmax = 2048;	/* (obsolete) */
    106 size_t	msginfo_msgmnb = 4096;	/* (obsolete) */
    107 int	msginfo_msgmni = 50;	/* (obsolete) */
    108 int	msginfo_msgtql = 40;	/* (obsolete) */
    109 int	msginfo_msgssz = 8;	/* (obsolete) */
    110 int	msginfo_msgmap = 0;	/* (obsolete) */
    111 ushort_t msginfo_msgseg = 1024;	/* (obsolete) */
    112 
    113 extern rctl_hndl_t rc_zone_msgmni;
    114 extern rctl_hndl_t rc_project_msgmni;
    115 extern rctl_hndl_t rc_process_msgmnb;
    116 extern rctl_hndl_t rc_process_msgtql;
    117 static ipc_service_t *msq_svc;
    118 static zone_key_t msg_zone_key;
    119 
    120 static void msg_dtor(kipc_perm_t *);
    121 static void msg_rmid(kipc_perm_t *);
    122 static void msg_remove_zone(zoneid_t, void *);
    123 
    124 /*
    125  * Module linkage information for the kernel.
    126  */
    127 static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2,
    128 	uintptr_t a4, uintptr_t a5);
    129 
    130 static struct sysent ipcmsg_sysent = {
    131 	6,
    132 #ifdef	_LP64
    133 	SE_ARGC | SE_NOUNLOAD | SE_64RVAL,
    134 #else
    135 	SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
    136 #endif
    137 	(int (*)())msgsys
    138 };
    139 
    140 #ifdef	_SYSCALL32_IMPL
    141 static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2,
    142 	uint32_t a4, uint32_t a5);
    143 
    144 static struct sysent ipcmsg_sysent32 = {
    145 	6,
    146 	SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
    147 	(int (*)())msgsys32
    148 };
    149 #endif	/* _SYSCALL32_IMPL */
    150 
    151 static struct modlsys modlsys = {
    152 	&mod_syscallops, "System V message facility", &ipcmsg_sysent
    153 };
    154 
    155 #ifdef _SYSCALL32_IMPL
    156 static struct modlsys modlsys32 = {
    157 	&mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32
    158 };
    159 #endif
    160 
    161 /*
    162  *      Big Theory statement for message queue correctness
    163  *
    164  * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
    165  * receivers who are waiting for an event.  Using the cv_broadcast method
    166  * resulted in negative scaling when the number of waiting receivers are large
    167  * (the thundering herd problem).  Instead, the receivers waiting to receive a
    168  * message are now linked in a queue-like fashion and awaken one at a time in
    169  * a controlled manner.
    170  *
    171  * Receivers can block on two different classes of waiting list:
    172  *    1) "sendwait" list, which is the more complex list of the two.  The
    173  *	  receiver will be awakened by a sender posting a new message.  There
    174  *	  are two types of "sendwait" list used:
    175  *		a) msg_wait_snd: handles all receivers who are looking for
    176  *		   a message type >= 0, but was unable to locate a match.
    177  *
    178  *		   slot 0: reserved for receivers that have designated they
    179  *			   will take any message type.
    180  *		   rest:   consist of receivers requesting a specific type
    181  *			   but the type was not present.  The entries are
    182  *			   hashed into a bucket in an attempt to keep
    183  *			   any list search relatively short.
    184  * 		b) msg_wait_snd_ngt: handles all receivers that have designated
    185  *		   a negative message type. Unlike msg_wait_snd, the hash bucket
    186  *		   serves a range of negative message types (-1 to -5, -6 to -10
    187  *		   and so forth), where the last bucket is reserved for all the
    188  *		   negative message types that hash outside of MSG_MAX_QNUM - 1.
    189  *		   This is done this way to simplify the operation of locating a
    190  *		   negative message type.
    191  *
    192  *    2) "copyout" list, where the receiver is awakened by another
    193  *	 receiver after a message is copied out.  This is a linked list
    194  *	 of waiters that are awakened one at a time.  Although the solution is
    195  *	 not optimal, the complexity that would be added in for waking
    196  *	 up the right entry far exceeds any potential pay back (too many
    197  *	 correctness and corner case issues).
    198  *
    199  * The lists are doubly linked.  In the case of the "sendwait"
    200  * list, this allows the thread to remove itself from the list without having
    201  * to traverse the list.  In the case of the "copyout" list it simply allows
    202  * us to use common functions with the "sendwait" list.
    203  *
    204  * To make sure receivers are not hung out to dry, we must guarantee:
    205  *    1. If any queued message matches any receiver, then at least one
    206  *       matching receiver must be processing the request.
    207  *    2. Blocking on the copyout queue is only temporary while messages
    208  *	 are being copied out.  The process is guaranted to wakeup
    209  *	 when it gets to front of the queue (copyout is a FIFO).
    210  *
    211  * Rules for blocking and waking up:
    212  *   1. A receiver entering msgrcv must examine all messages for a match
    213  *      before blocking on a sendwait queue.
    214  *   2. If the receiver blocks because the message it chose is already
    215  *	being copied out, then when it wakes up needs to start start
    216  *	checking the messages from the beginning.
    217  *   3) When ever a process returns from msgrcv for any reason, if it
    218  *	had attempted to copy a message or blocked waiting for a copy
    219  *	to complete it needs to wakeup the next receiver blocked on
    220  *	a copy out.
    221  *   4) When a message is sent, the sender selects a process waiting
    222  *	for that type of message.  This selection process rotates between
    223  *	receivers types of 0, negative and positive to prevent starvation of
    224  *	any one particular receiver type.
    225  *   5) The following are the scenarios for processes that are awakened
    226  *	by a msgsnd:
    227  *		a) The process finds the message and is able to copy
    228  *		   it out.  Once complete, the process returns.
    229  *		b) The message that was sent that triggered the wakeup is no
    230  *		   longer available (another process found the message first).
    231  *		   We issue a wakeup on copy queue and then go back to
    232  *		   sleep waiting for another matching message to be sent.
    233  *		c) The message that was supposed to be processed was
    234  *		   already serviced by another process.  However a different
    235  *		   message is present which we can service.  The message
    236  *		   is copied and the process returns.
    237  *		d) The message is found, but some sort of error occurs that
    238  *		   prevents the message from being copied.  The receiver
    239  *		   wakes up the next sender that can service this message
    240  *		   type and returns an error to the caller.
    241  *		e) The message is found, but it is marked as being copied
    242  *		   out.  The receiver then goes to sleep on the copyout
    243  *		   queue where it will be awakened again sometime in the future.
    244  *
    245  *
    246  *   6) Whenever a message is found that matches the message type designated,
    247  * 	but is being copied out we have to block on the copyout queue.
    248  *	After process copying finishes the copy out, it  must wakeup (either
    249  *	directly or indirectly) all receivers who blocked on its copyout,
    250  *	so they are guaranteed a chance to examine the remaining messages.
    251  *	This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
    252  *	and so on.  The chain cannot be broken.  This leads to the following
    253  *	cases:
    254  *		a) A receiver is finished copying the message (or encountered)
    255  *		   an error), the first entry on the copyout queue is woken
    256  *		   up.
    257  *		b) When the receiver is woken up, it attempts to locate
    258  *		   a message type match.
    259  *		c) If a message type is found and
    260  *			-- MSG_RCVCOPY flag is not set, the message is
    261  *			   marked for copying out.  Regardless of the copyout
    262  *			   success the next entry on the copyout queue is
    263  *			   awakened and the operation is completed.
    264  *			-- MSG_RCVCOPY is set, we simply go back to sleep again
    265  *			   on the copyout queue.
    266  *		d) If the message type is not found then we wakeup the next
    267  *		   process on the copyout queue.
    268  */
    269 
    270 static ulong_t msg_type_hash(long);
    271 static int msgq_check_err(kmsqid_t *qp, int cvres);
    272 static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
    273     kmsqid_t *);
    274 static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
    275     struct msg *, struct ipcmsgbuf *, int);
    276 static void msg_rcvq_wakeup_all(list_t *);
    277 static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
    278 static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
    279 static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
    280 static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
    281 static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
    282 static struct msg *msgrcv_lookup(kmsqid_t *, long);
    283 
    284 msg_select_t msg_fnd_sndr[] = {
    285 	{ msg_fnd_any_snd, &msg_fnd_sndr[1] },
    286 	{ msg_fnd_spc_snd, &msg_fnd_sndr[2] },
    287 	{ msg_fnd_neg_snd, &msg_fnd_sndr[0] }
    288 };
    289 
    290 msg_select_t msg_fnd_rdr[1] = {
    291 	{ msg_fnd_any_rdr, &msg_fnd_rdr[0] },
    292 };
    293 
    294 static struct modlinkage modlinkage = {
    295 	MODREV_1,
    296 	&modlsys,
    297 #ifdef _SYSCALL32_IMPL
    298 	&modlsys32,
    299 #endif
    300 	NULL
    301 };
    302 
    303 
    304 int
    305 _init(void)
    306 {
    307 	int result;
    308 
    309 	msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni,
    310 	    sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG,
    311 	    offsetof(ipc_rqty_t, ipcq_msgmni));
    312 	zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL);
    313 
    314 	if ((result = mod_install(&modlinkage)) == 0)
    315 		return (0);
    316 
    317 	(void) zone_key_delete(msg_zone_key);
    318 	ipcs_destroy(msq_svc);
    319 
    320 	return (result);
    321 }
    322 
    323 int
    324 _fini(void)
    325 {
    326 	return (EBUSY);
    327 }
    328 
    329 int
    330 _info(struct modinfo *modinfop)
    331 {
    332 	return (mod_info(&modlinkage, modinfop));
    333 }
    334 
    335 static void
    336 msg_dtor(kipc_perm_t *perm)
    337 {
    338 	kmsqid_t *qp = (kmsqid_t *)perm;
    339 	int		ii;
    340 
    341 	for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
    342 		ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
    343 		ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
    344 		list_destroy(&qp->msg_wait_snd[ii]);
    345 		list_destroy(&qp->msg_wait_snd_ngt[ii]);
    346 	}
    347 	ASSERT(list_is_empty(&qp->msg_cpy_block));
    348 	list_destroy(&qp->msg_cpy_block);
    349 	ASSERT(qp->msg_snd_cnt == 0);
    350 	ASSERT(qp->msg_cbytes == 0);
    351 	list_destroy(&qp->msg_list);
    352 }
    353 
    354 
    355 #define	msg_hold(mp)	(mp)->msg_copycnt++
    356 
    357 /*
    358  * msg_rele - decrement the reference count on the message.  When count
    359  * reaches zero, free message header and contents.
    360  */
    361 static void
    362 msg_rele(struct msg *mp)
    363 {
    364 	ASSERT(mp->msg_copycnt > 0);
    365 	if (mp->msg_copycnt-- == 1) {
    366 		if (mp->msg_addr)
    367 			kmem_free(mp->msg_addr, mp->msg_size);
    368 		kmem_free(mp, sizeof (struct msg));
    369 	}
    370 }
    371 
    372 /*
    373  * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
    374  * waiting for free bytes on queue.
    375  *
    376  * Called with queue locked.
    377  */
    378 static void
    379 msgunlink(kmsqid_t *qp, struct msg *mp)
    380 {
    381 	list_remove(&qp->msg_list, mp);
    382 	qp->msg_qnum--;
    383 	qp->msg_cbytes -= mp->msg_size;
    384 	msg_rele(mp);
    385 
    386 	/* Wake up waiting writers */
    387 	if (qp->msg_snd_cnt)
    388 		cv_broadcast(&qp->msg_snd_cv);
    389 }
    390 
    391 static void
    392 msg_rmid(kipc_perm_t *perm)
    393 {
    394 	kmsqid_t *qp = (kmsqid_t *)perm;
    395 	struct msg *mp;
    396 	int		ii;
    397 
    398 
    399 	while ((mp = list_head(&qp->msg_list)) != NULL)
    400 		msgunlink(qp, mp);
    401 	ASSERT(qp->msg_cbytes == 0);
    402 
    403 	/*
    404 	 * Wake up everyone who is in a wait state of some sort
    405 	 * for this message queue.
    406 	 */
    407 	for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
    408 		msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
    409 		msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
    410 	}
    411 	msg_rcvq_wakeup_all(&qp->msg_cpy_block);
    412 	if (qp->msg_snd_cnt)
    413 		cv_broadcast(&qp->msg_snd_cv);
    414 }
    415 
    416 /*
    417  * msgctl system call.
    418  *
    419  * gets q lock (via ipc_lookup), releases before return.
    420  * may call users of msg_lock
    421  */
    422 static int
    423 msgctl(int msgid, int cmd, void *arg)
    424 {
    425 	STRUCT_DECL(msqid_ds, ds);		/* SVR4 queue work area */
    426 	kmsqid_t		*qp;		/* ptr to associated q */
    427 	int			error;
    428 	struct	cred		*cr;
    429 	model_t	mdl = get_udatamodel();
    430 	struct msqid_ds64	ds64;
    431 	kmutex_t		*lock;
    432 	proc_t			*pp = curproc;
    433 
    434 	STRUCT_INIT(ds, mdl);
    435 	cr = CRED();
    436 
    437 	/*
    438 	 * Perform pre- or non-lookup actions (e.g. copyins, RMID).
    439 	 */
    440 	switch (cmd) {
    441 	case IPC_SET:
    442 		if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds)))
    443 			return (set_errno(EFAULT));
    444 		break;
    445 
    446 	case IPC_SET64:
    447 		if (copyin(arg, &ds64, sizeof (struct msqid_ds64)))
    448 			return (set_errno(EFAULT));
    449 		break;
    450 
    451 	case IPC_RMID:
    452 		if (error = ipc_rmid(msq_svc, msgid, cr))
    453 			return (set_errno(error));
    454 		return (0);
    455 	}
    456 
    457 	/*
    458 	 * get msqid_ds for this msgid
    459 	 */
    460 	if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL)
    461 		return (set_errno(EINVAL));
    462 
    463 	switch (cmd) {
    464 	case IPC_SET:
    465 		if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes &&
    466 		    secpolicy_ipc_config(cr) != 0) {
    467 			mutex_exit(lock);
    468 			return (set_errno(EPERM));
    469 		}
    470 		if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm,
    471 		    &STRUCT_BUF(ds)->msg_perm, mdl)) {
    472 			mutex_exit(lock);
    473 			return (set_errno(error));
    474 		}
    475 		qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes);
    476 		qp->msg_ctime = gethrestime_sec();
    477 		break;
    478 
    479 	case IPC_STAT:
    480 		if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) {
    481 			mutex_exit(lock);
    482 			return (set_errno(error));
    483 		}
    484 
    485 		if (qp->msg_rcv_cnt)
    486 			qp->msg_perm.ipc_mode |= MSG_RWAIT;
    487 		if (qp->msg_snd_cnt)
    488 			qp->msg_perm.ipc_mode |= MSG_WWAIT;
    489 		ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
    490 		qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
    491 		STRUCT_FSETP(ds, msg_first, NULL); 	/* kernel addr */
    492 		STRUCT_FSETP(ds, msg_last, NULL);
    493 		STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes);
    494 		STRUCT_FSET(ds, msg_qnum, qp->msg_qnum);
    495 		STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes);
    496 		STRUCT_FSET(ds, msg_lspid, qp->msg_lspid);
    497 		STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid);
    498 		STRUCT_FSET(ds, msg_stime, qp->msg_stime);
    499 		STRUCT_FSET(ds, msg_rtime, qp->msg_rtime);
    500 		STRUCT_FSET(ds, msg_ctime, qp->msg_ctime);
    501 		break;
    502 
    503 	case IPC_SET64:
    504 		mutex_enter(&pp->p_lock);
    505 		if ((ds64.msgx_qbytes > qp->msg_qbytes) &&
    506 		    secpolicy_ipc_config(cr) != 0 &&
    507 		    rctl_test(rc_process_msgmnb, pp->p_rctls, pp,
    508 		    ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) {
    509 			mutex_exit(&pp->p_lock);
    510 			mutex_exit(lock);
    511 			return (set_errno(EPERM));
    512 		}
    513 		mutex_exit(&pp->p_lock);
    514 		if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm,
    515 		    &ds64.msgx_perm)) {
    516 			mutex_exit(lock);
    517 			return (set_errno(error));
    518 		}
    519 		qp->msg_qbytes = ds64.msgx_qbytes;
    520 		qp->msg_ctime = gethrestime_sec();
    521 		break;
    522 
    523 	case IPC_STAT64:
    524 		if (qp->msg_rcv_cnt)
    525 			qp->msg_perm.ipc_mode |= MSG_RWAIT;
    526 		if (qp->msg_snd_cnt)
    527 			qp->msg_perm.ipc_mode |= MSG_WWAIT;
    528 		ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
    529 		qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
    530 		ds64.msgx_cbytes = qp->msg_cbytes;
    531 		ds64.msgx_qnum = qp->msg_qnum;
    532 		ds64.msgx_qbytes = qp->msg_qbytes;
    533 		ds64.msgx_lspid = qp->msg_lspid;
    534 		ds64.msgx_lrpid = qp->msg_lrpid;
    535 		ds64.msgx_stime = qp->msg_stime;
    536 		ds64.msgx_rtime = qp->msg_rtime;
    537 		ds64.msgx_ctime = qp->msg_ctime;
    538 		break;
    539 
    540 	default:
    541 		mutex_exit(lock);
    542 		return (set_errno(EINVAL));
    543 	}
    544 
    545 	mutex_exit(lock);
    546 
    547 	/*
    548 	 * Do copyout last (after releasing mutex).
    549 	 */
    550 	switch (cmd) {
    551 	case IPC_STAT:
    552 		if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds)))
    553 			return (set_errno(EFAULT));
    554 		break;
    555 
    556 	case IPC_STAT64:
    557 		if (copyout(&ds64, arg, sizeof (struct msqid_ds64)))
    558 			return (set_errno(EFAULT));
    559 		break;
    560 	}
    561 
    562 	return (0);
    563 }
    564 
    565 /*
    566  * Remove all message queues associated with a given zone.  Called by
    567  * zone_shutdown when the zone is halted.
    568  */
    569 /*ARGSUSED1*/
    570 static void
    571 msg_remove_zone(zoneid_t zoneid, void *arg)
    572 {
    573 	ipc_remove_zone(msq_svc, zoneid);
    574 }
    575 
    576 /*
    577  * msgget system call.
    578  */
    579 static int
    580 msgget(key_t key, int msgflg)
    581 {
    582 	kmsqid_t	*qp;
    583 	kmutex_t	*lock;
    584 	int		id, error;
    585 	int		ii;
    586 	proc_t		*pp = curproc;
    587 
    588 top:
    589 	if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock))
    590 		return (set_errno(error));
    591 
    592 	if (IPC_FREE(&qp->msg_perm)) {
    593 		mutex_exit(lock);
    594 		mutex_exit(&pp->p_lock);
    595 
    596 		list_create(&qp->msg_list, sizeof (struct msg),
    597 		    offsetof(struct msg, msg_node));
    598 		qp->msg_qnum = 0;
    599 		qp->msg_lspid = qp->msg_lrpid = 0;
    600 		qp->msg_stime = qp->msg_rtime = 0;
    601 		qp->msg_ctime = gethrestime_sec();
    602 		qp->msg_ngt_cnt = 0;
    603 		qp->msg_neg_copy = 0;
    604 		for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
    605 			list_create(&qp->msg_wait_snd[ii],
    606 			    sizeof (msgq_wakeup_t),
    607 			    offsetof(msgq_wakeup_t, msgw_list));
    608 			list_create(&qp->msg_wait_snd_ngt[ii],
    609 			    sizeof (msgq_wakeup_t),
    610 			    offsetof(msgq_wakeup_t, msgw_list));
    611 		}
    612 		/*
    613 		 * The proper initialization of msg_lowest_type is to the
    614 		 * highest possible value.  By doing this we guarantee that
    615 		 * when the first send happens, the lowest type will be set
    616 		 * properly.
    617 		 */
    618 		qp->msg_lowest_type = -1;
    619 		list_create(&qp->msg_cpy_block,
    620 		    sizeof (msgq_wakeup_t),
    621 		    offsetof(msgq_wakeup_t, msgw_list));
    622 		qp->msg_fnd_sndr = &msg_fnd_sndr[0];
    623 		qp->msg_fnd_rdr = &msg_fnd_rdr[0];
    624 		qp->msg_rcv_cnt = 0;
    625 		qp->msg_snd_cnt = 0;
    626 
    627 		if (error = ipc_commit_begin(msq_svc, key, msgflg,
    628 		    (kipc_perm_t *)qp)) {
    629 			if (error == EAGAIN)
    630 				goto top;
    631 			return (set_errno(error));
    632 		}
    633 		qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb,
    634 		    pp->p_rctls, pp);
    635 		qp->msg_qmax = rctl_enforced_value(rc_process_msgtql,
    636 		    pp->p_rctls, pp);
    637 		lock = ipc_commit_end(msq_svc, &qp->msg_perm);
    638 	}
    639 	if (audit_active)
    640 		audit_ipcget(AT_IPC_MSG, (void *)qp);
    641 	id = qp->msg_perm.ipc_id;
    642 	mutex_exit(lock);
    643 	return (id);
    644 }
    645 
    646 static ssize_t
    647 msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
    648 {
    649 	struct msg	*smp;	/* ptr to best msg on q */
    650 	kmsqid_t	*qp;	/* ptr to associated q */
    651 	kmutex_t	*lock;
    652 	size_t		xtsz;	/* transfer byte count */
    653 	int		error = 0;
    654 	int		cvres;
    655 	ulong_t		msg_hash;
    656 	msgq_wakeup_t	msg_entry;
    657 
    658 	CPU_STATS_ADDQ(CPU, sys, msg, 1);	/* bump msg send/rcv count */
    659 
    660 	msg_hash = msg_type_hash(msgtyp);
    661 	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
    662 		return ((ssize_t)set_errno(EINVAL));
    663 	}
    664 	ipc_hold(msq_svc, (kipc_perm_t *)qp);
    665 
    666 	if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
    667 		goto msgrcv_out;
    668 	}
    669 
    670 	/*
    671 	 * Various information (including the condvar_t) required for the
    672 	 * process to sleep is provided by it's stack.
    673 	 */
    674 	msg_entry.msgw_thrd = curthread;
    675 	msg_entry.msgw_snd_wake = 0;
    676 	msg_entry.msgw_type = msgtyp;
    677 findmsg:
    678 	smp = msgrcv_lookup(qp, msgtyp);
    679 
    680 	if (smp) {
    681 		/*
    682 		 * We found a possible message to copy out.
    683 		 */
    684 		if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
    685 			/*
    686 			 * It is available, attempt to copy it.
    687 			 */
    688 			error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
    689 			    smp, msgp, msgflg);
    690 			/*
    691 			 * Don't forget to wakeup a sleeper that blocked because
    692 			 * we were copying things out.
    693 			 */
    694 			msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
    695 			goto msgrcv_out;
    696 		}
    697 		/*
    698 		 * The selected message is being copied out, so block.  We do
    699 		 * not need to wake the next person up on the msg_cpy_block list
    700 		 * due to the fact some one is copying out and they will get
    701 		 * things moving again once the copy is completed.
    702 		 */
    703 		cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
    704 		    &msg_entry, &lock, qp);
    705 		error = msgq_check_err(qp, cvres);
    706 		if (error) {
    707 			goto msgrcv_out;
    708 		}
    709 		goto findmsg;
    710 	}
    711 	/*
    712 	 * There isn't a message to copy out that matches the designated
    713 	 * criteria.
    714 	 */
    715 	if (msgflg & IPC_NOWAIT) {
    716 		error = ENOMSG;
    717 		goto msgrcv_out;
    718 	}
    719 	msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
    720 
    721 	/*
    722 	 * Wait for new message.  We keep the negative and positive types
    723 	 * separate for performance reasons.
    724 	 */
    725 	msg_entry.msgw_snd_wake = 0;
    726 	if (msgtyp >= 0) {
    727 		cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
    728 		    &msg_entry, &lock, qp);
    729 	} else {
    730 		qp->msg_ngt_cnt++;
    731 		cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
    732 		    &msg_entry, &lock, qp);
    733 		qp->msg_ngt_cnt--;
    734 	}
    735 
    736 	if (!(error = msgq_check_err(qp, cvres))) {
    737 		goto findmsg;
    738 	}
    739 
    740 msgrcv_out:
    741 	if (error) {
    742 		msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
    743 		if (msg_entry.msgw_snd_wake) {
    744 			msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
    745 			    msg_entry.msgw_snd_wake);
    746 		}
    747 		ipc_rele(msq_svc, (kipc_perm_t *)qp);
    748 		return ((ssize_t)set_errno(error));
    749 	}
    750 	ipc_rele(msq_svc, (kipc_perm_t *)qp);
    751 	return ((ssize_t)xtsz);
    752 }
    753 
    754 static int
    755 msgq_check_err(kmsqid_t *qp, int cvres)
    756 {
    757 	if (IPC_FREE(&qp->msg_perm)) {
    758 		return (EIDRM);
    759 	}
    760 
    761 	if (cvres == 0) {
    762 		return (EINTR);
    763 	}
    764 
    765 	return (0);
    766 }
    767 
    768 static int
    769 msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
    770     size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
    771 {
    772 	size_t		xtsz;
    773 	STRUCT_HANDLE(ipcmsgbuf, umsgp);
    774 	model_t		mdl = get_udatamodel();
    775 	int		copyerror = 0;
    776 
    777 	STRUCT_SET_HANDLE(umsgp, mdl, msgp);
    778 	if (msgsz < smp->msg_size) {
    779 		if ((msgflg & MSG_NOERROR) == 0) {
    780 			return (E2BIG);
    781 		} else {
    782 			xtsz = msgsz;
    783 		}
    784 	} else {
    785 		xtsz = smp->msg_size;
    786 	}
    787 	*xtsz_ret = xtsz;
    788 
    789 	/*
    790 	 * To prevent a DOS attack we mark the message as being
    791 	 * copied out and release mutex.  When the copy is completed
    792 	 * we need to acquire the mutex and make the appropriate updates.
    793 	 */
    794 	ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
    795 	smp->msg_flags |= MSG_RCVCOPY;
    796 	msg_hold(smp);
    797 	if (msgtyp < 0) {
    798 		ASSERT(qp->msg_neg_copy == 0);
    799 		qp->msg_neg_copy = 1;
    800 	}
    801 	mutex_exit(*lock);
    802 
    803 	if (mdl == DATAMODEL_NATIVE) {
    804 		copyerror = copyout(&smp->msg_type, msgp,
    805 		    sizeof (smp->msg_type));
    806 	} else {
    807 		/*
    808 		 * 32-bit callers need an imploded msg type.
    809 		 */
    810 		int32_t	msg_type32 = smp->msg_type;
    811 
    812 		copyerror = copyout(&msg_type32, msgp,
    813 		    sizeof (msg_type32));
    814 	}
    815 
    816 	if (copyerror == 0 && xtsz) {
    817 		copyerror = copyout(smp->msg_addr,
    818 		    STRUCT_FADDR(umsgp, mtext), xtsz);
    819 	}
    820 
    821 	/*
    822 	 * Reclaim the mutex and make sure the message queue still exists.
    823 	 */
    824 
    825 	*lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
    826 	if (msgtyp < 0) {
    827 		qp->msg_neg_copy = 0;
    828 	}
    829 	ASSERT(smp->msg_flags & MSG_RCVCOPY);
    830 	smp->msg_flags &= ~MSG_RCVCOPY;
    831 	msg_rele(smp);
    832 	if (IPC_FREE(&qp->msg_perm)) {
    833 		return (EIDRM);
    834 	}
    835 	if (copyerror) {
    836 		return (EFAULT);
    837 	}
    838 	qp->msg_lrpid = ttoproc(curthread)->p_pid;
    839 	qp->msg_rtime = gethrestime_sec();
    840 	msgunlink(qp, smp);
    841 	return (0);
    842 }
    843 
    844 static struct msg *
    845 msgrcv_lookup(kmsqid_t *qp, long msgtyp)
    846 {
    847 	struct msg 		*smp = NULL;
    848 	int			qp_low;
    849 	struct msg		*mp;	/* ptr to msg on q */
    850 	int			low_msgtype;
    851 	static struct msg	neg_copy_smp;
    852 
    853 	mp = list_head(&qp->msg_list);
    854 	if (msgtyp == 0) {
    855 		smp = mp;
    856 	} else {
    857 		qp_low = qp->msg_lowest_type;
    858 		if (msgtyp > 0) {
    859 			/*
    860 			 * If our lowest possible message type is larger than
    861 			 * the message type desired, then we know there is
    862 			 * no entry present.
    863 			 */
    864 			if (qp_low > msgtyp) {
    865 				return (NULL);
    866 			}
    867 
    868 			for (; mp; mp = list_next(&qp->msg_list, mp)) {
    869 				if (msgtyp == mp->msg_type) {
    870 					smp = mp;
    871 					break;
    872 				}
    873 			}
    874 		} else {
    875 			/*
    876 			 * We have kept track of the lowest possible message
    877 			 * type on the send queue.  This allows us to terminate
    878 			 * the search early if we find a message type of that
    879 			 * type.  Note, the lowest type may not be the actual
    880 			 * lowest value in the system, it is only guaranteed
    881 			 * that there isn't a value lower than that.
    882 			 */
    883 			low_msgtype = -msgtyp;
    884 			if (low_msgtype++ < qp_low) {
    885 				return (NULL);
    886 			}
    887 			if (qp->msg_neg_copy) {
    888 				neg_copy_smp.msg_flags = MSG_RCVCOPY;
    889 				return (&neg_copy_smp);
    890 			}
    891 			for (; mp; mp = list_next(&qp->msg_list, mp)) {
    892 				if (mp->msg_type < low_msgtype) {
    893 					smp = mp;
    894 					low_msgtype = mp->msg_type;
    895 					if (low_msgtype == qp_low) {
    896 						break;
    897 					}
    898 				}
    899 			}
    900 			if (smp) {
    901 				/*
    902 				 * Update the lowest message type.
    903 				 */
    904 				qp->msg_lowest_type = smp->msg_type;
    905 			}
    906 		}
    907 	}
    908 	return (smp);
    909 }
    910 
    911 /*
    912  * msgids system call.
    913  */
    914 static int
    915 msgids(int *buf, uint_t nids, uint_t *pnids)
    916 {
    917 	int error;
    918 
    919 	if (error = ipc_ids(msq_svc, buf, nids, pnids))
    920 		return (set_errno(error));
    921 
    922 	return (0);
    923 }
    924 
    925 #define	RND(x)		roundup((x), sizeof (size_t))
    926 #define	RND32(x)	roundup((x), sizeof (size32_t))
    927 
    928 /*
    929  * msgsnap system call.
    930  */
    931 static int
    932 msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp)
    933 {
    934 	struct msg	*mp;	/* ptr to msg on q */
    935 	kmsqid_t	*qp;	/* ptr to associated q */
    936 	kmutex_t	*lock;
    937 	size_t		size;
    938 	size_t		nmsg;
    939 	struct msg	**snaplist;
    940 	int		error, i;
    941 	model_t		mdl = get_udatamodel();
    942 	STRUCT_DECL(msgsnap_head, head);
    943 	STRUCT_DECL(msgsnap_mhead, mhead);
    944 
    945 	STRUCT_INIT(head, mdl);
    946 	STRUCT_INIT(mhead, mdl);
    947 
    948 	if (bufsz < STRUCT_SIZE(head))
    949 		return (set_errno(EINVAL));
    950 
    951 	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
    952 		return (set_errno(EINVAL));
    953 
    954 	if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
    955 		mutex_exit(lock);
    956 		return (set_errno(error));
    957 	}
    958 	ipc_hold(msq_svc, (kipc_perm_t *)qp);
    959 
    960 	/*
    961 	 * First compute the required buffer size and
    962 	 * the number of messages on the queue.
    963 	 */
    964 	size = nmsg = 0;
    965 	for (mp = list_head(&qp->msg_list); mp;
    966 	    mp = list_next(&qp->msg_list, mp)) {
    967 		if (msgtyp == 0 ||
    968 		    (msgtyp > 0 && msgtyp == mp->msg_type) ||
    969 		    (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
    970 			nmsg++;
    971 			if (mdl == DATAMODEL_NATIVE)
    972 				size += RND(mp->msg_size);
    973 			else
    974 				size += RND32(mp->msg_size);
    975 		}
    976 	}
    977 
    978 	size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead);
    979 	if (size > bufsz)
    980 		nmsg = 0;
    981 
    982 	if (nmsg > 0) {
    983 		/*
    984 		 * Mark the messages as being copied.
    985 		 */
    986 		snaplist = (struct msg **)kmem_alloc(nmsg *
    987 		    sizeof (struct msg *), KM_SLEEP);
    988 		i = 0;
    989 		for (mp = list_head(&qp->msg_list); mp;
    990 		    mp = list_next(&qp->msg_list, mp)) {
    991 			if (msgtyp == 0 ||
    992 			    (msgtyp > 0 && msgtyp == mp->msg_type) ||
    993 			    (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
    994 				msg_hold(mp);
    995 				snaplist[i] = mp;
    996 				i++;
    997 			}
    998 		}
    999 	}
   1000 	mutex_exit(lock);
   1001 
   1002 	/*
   1003 	 * Copy out the buffer header.
   1004 	 */
   1005 	STRUCT_FSET(head, msgsnap_size, size);
   1006 	STRUCT_FSET(head, msgsnap_nmsg, nmsg);
   1007 	if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head)))
   1008 		error = EFAULT;
   1009 
   1010 	buf += STRUCT_SIZE(head);
   1011 
   1012 	/*
   1013 	 * Now copy out the messages one by one.
   1014 	 */
   1015 	for (i = 0; i < nmsg; i++) {
   1016 		mp = snaplist[i];
   1017 		if (error == 0) {
   1018 			STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size);
   1019 			STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type);
   1020 			if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead)))
   1021 				error = EFAULT;
   1022 			buf += STRUCT_SIZE(mhead);
   1023 
   1024 			if (error == 0 &&
   1025 			    mp->msg_size != 0 &&
   1026 			    copyout(mp->msg_addr, buf, mp->msg_size))
   1027 				error = EFAULT;
   1028 			if (mdl == DATAMODEL_NATIVE)
   1029 				buf += RND(mp->msg_size);
   1030 			else
   1031 				buf += RND32(mp->msg_size);
   1032 		}
   1033 		lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
   1034 		msg_rele(mp);
   1035 		/* Check for msg q deleted or reallocated */
   1036 		if (IPC_FREE(&qp->msg_perm))
   1037 			error = EIDRM;
   1038 		mutex_exit(lock);
   1039 	}
   1040 
   1041 	(void) ipc_lock(msq_svc, qp->msg_perm.ipc_id);
   1042 	ipc_rele(msq_svc, (kipc_perm_t *)qp);
   1043 
   1044 	if (nmsg > 0)
   1045 		kmem_free(snaplist, nmsg * sizeof (struct msg *));
   1046 
   1047 	if (error)
   1048 		return (set_errno(error));
   1049 	return (0);
   1050 }
   1051 
   1052 #define	MSG_PREALLOC_LIMIT 8192
   1053 
   1054 /*
   1055  * msgsnd system call.
   1056  */
   1057 static int
   1058 msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg)
   1059 {
   1060 	kmsqid_t	*qp;
   1061 	kmutex_t	*lock = NULL;
   1062 	struct msg	*mp = NULL;