Home | History | Annotate | Download | only in rpc
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License, Version 1.0 only
      6  * (the "License").  You may not use this file except in compliance
      7  * with the License.
      8  *
      9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
     10  * or http://www.opensolaris.org/os/licensing.
     11  * See the License for the specific language governing permissions
     12  * and limitations under the License.
     13  *
     14  * When distributing Covered Code, include this CDDL HEADER in each
     15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     16  * If applicable, add the following below this CDDL HEADER, with the
     17  * fields enclosed by brackets "[]" replaced with your own identifying
     18  * information: Portions Copyright [yyyy] [name of copyright owner]
     19  *
     20  * CDDL HEADER END
     21  */
     22 /*
     23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
     27 /* All Rights Reserved */
     28 /*
     29  * Portions of this source code were derived from Berkeley
     30  * 4.3 BSD under license from the Regents of the University of
     31  * California.
     32  */
     33 
     34 #pragma ident	"@(#)svc_rdma.c	1.8	05/06/10 SMI"
     35 
     36 /*
     37  * Server side of RPC over RDMA in the kernel.
     38  */
     39 
     40 #include <sys/param.h>
     41 #include <sys/types.h>
     42 #include <sys/user.h>
     43 #include <sys/sysmacros.h>
     44 #include <sys/proc.h>
     45 #include <sys/file.h>
     46 #include <sys/errno.h>
     47 #include <sys/kmem.h>
     48 #include <sys/debug.h>
     49 #include <sys/systm.h>
     50 #include <sys/cmn_err.h>
     51 #include <sys/kstat.h>
     52 #include <sys/vtrace.h>
     53 #include <sys/debug.h>
     54 
     55 #include <rpc/types.h>
     56 #include <rpc/xdr.h>
     57 #include <rpc/auth.h>
     58 #include <rpc/clnt.h>
     59 #include <rpc/rpc_msg.h>
     60 #include <rpc/svc.h>
     61 #include <rpc/rpc_rdma.h>
     62 #include <sys/ddi.h>
     63 #include <sys/sunddi.h>
     64 
     65 #include <inet/common.h>
     66 #include <inet/ip.h>
     67 #include <inet/ip6.h>
     68 
     69 /*
     70  * RDMA transport specific data associated with SVCMASTERXPRT
     71  */
     72 struct rdma_data {
     73 	SVCMASTERXPRT 	*rd_xprt;	/* back ptr to SVCMASTERXPRT */
     74 	struct rdma_svc_data rd_data;	/* rdma data */
     75 	rdma_mod_t	*r_mod;		/* RDMA module containing ops ptr */
     76 };
     77 
     78 /*
     79  * Plugin connection specific data stashed away in clone SVCXPRT
     80  */
     81 struct clone_rdma_data {
     82 	CONN		*conn;		/* RDMA connection */
     83 	rdma_buf_t	rpcbuf;		/* RPC req/resp buffer */
     84 };
     85 
     86 #ifdef DEBUG
     87 int rdma_svc_debug = 0;
     88 #endif
     89 
     90 #define	MAXADDRLEN	128	/* max length for address mask */
     91 
     92 /*
     93  * Routines exported through ops vector.
     94  */
     95 static bool_t		svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *);
     96 static bool_t		svc_rdma_ksend(SVCXPRT *, struct rpc_msg *);
     97 static bool_t		svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t);
     98 static bool_t		svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t);
     99 void			svc_rdma_kdestroy(SVCMASTERXPRT *);
    100 static int		svc_rdma_kdup(struct svc_req *, caddr_t, int,
    101 				struct dupreq **, bool_t *);
    102 static void		svc_rdma_kdupdone(struct dupreq *, caddr_t,
    103 				void (*)(), int, int);
    104 static int32_t		*svc_rdma_kgetres(SVCXPRT *, int);
    105 static void		svc_rdma_kfreeres(SVCXPRT *);
    106 static void		svc_rdma_kclone_destroy(SVCXPRT *);
    107 static void		svc_rdma_kstart(SVCMASTERXPRT *);
    108 void			svc_rdma_kstop(SVCMASTERXPRT *);
    109 
    110 /*
    111  * Server transport operations vector.
    112  */
    113 struct svc_ops rdma_svc_ops = {
    114 	svc_rdma_krecv,		/* Get requests */
    115 	svc_rdma_kgetargs,	/* Deserialize arguments */
    116 	svc_rdma_ksend,		/* Send reply */
    117 	svc_rdma_kfreeargs,	/* Free argument data space */
    118 	svc_rdma_kdestroy,	/* Destroy transport handle */
    119 	svc_rdma_kdup,		/* Check entry in dup req cache */
    120 	svc_rdma_kdupdone,	/* Mark entry in dup req cache as done */
    121 	svc_rdma_kgetres,	/* Get pointer to response buffer */
    122 	svc_rdma_kfreeres,	/* Destroy pre-serialized response header */
    123 	svc_rdma_kclone_destroy,	/* Destroy a clone xprt */
    124 	svc_rdma_kstart		/* Tell `ready-to-receive' to rpcmod */
    125 };
    126 
    127 /*
    128  * Server statistics
    129  * NOTE: This structure type is duplicated in the NFS fast path.
    130  */
    131 struct {
    132 	kstat_named_t	rscalls;
    133 	kstat_named_t	rsbadcalls;
    134 	kstat_named_t	rsnullrecv;
    135 	kstat_named_t	rsbadlen;
    136 	kstat_named_t	rsxdrcall;
    137 	kstat_named_t	rsdupchecks;
    138 	kstat_named_t	rsdupreqs;
    139 	kstat_named_t	rslongrpcs;
    140 } rdmarsstat = {
    141 	{ "calls",	KSTAT_DATA_UINT64 },
    142 	{ "badcalls",	KSTAT_DATA_UINT64 },
    143 	{ "nullrecv",	KSTAT_DATA_UINT64 },
    144 	{ "badlen",	KSTAT_DATA_UINT64 },
    145 	{ "xdrcall",	KSTAT_DATA_UINT64 },
    146 	{ "dupchecks",	KSTAT_DATA_UINT64 },
    147 	{ "dupreqs",	KSTAT_DATA_UINT64 },
    148 	{ "longrpcs",	KSTAT_DATA_UINT64 }
    149 };
    150 
    151 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat;
    152 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t);
    153 
    154 #define	RSSTAT_INCR(x)	rdmarsstat.x.value.ui64++
    155 
    156 /*
    157  * Create a transport record.
    158  * The transport record, output buffer, and private data structure
    159  * are allocated.  The output buffer is serialized into using xdrmem.
    160  * There is one transport record per user process which implements a
    161  * set of services.
    162  */
    163 /* ARGSUSED */
    164 int
    165 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
    166 	rdma_xprt_group_t *started_xprts)
    167 {
    168 	int error;
    169 	SVCMASTERXPRT *xprt;
    170 	struct rdma_data *rd;
    171 	rdma_registry_t *rmod;
    172 	rdma_xprt_record_t *xprt_rec;
    173 	queue_t	*q;
    174 
    175 	/*
    176 	 * modload the RDMA plugins is not already done.
    177 	 */
    178 	if (!rdma_modloaded) {
    179 		mutex_enter(&rdma_modload_lock);
    180 		if (!rdma_modloaded) {
    181 			error = rdma_modload();
    182 		}
    183 		mutex_exit(&rdma_modload_lock);
    184 
    185 		if (error)
    186 			return (error);
    187 	}
    188 
    189 	/*
    190 	 * master_xprt_count is the count of master transport handles
    191 	 * that were successfully created and are ready to recieve for
    192 	 * RDMA based access.
    193 	 */
    194 	error = 0;
    195 	xprt_rec = NULL;
    196 	rw_enter(&rdma_lock, RW_READER);
    197 	if (rdma_mod_head == NULL) {
    198 		started_xprts->rtg_count = 0;
    199 		rw_exit(&rdma_lock);
    200 		if (rdma_dev_available)
    201 			return (EPROTONOSUPPORT);
    202 		else
    203 			return (ENODEV);
    204 	}
    205 
    206 	/*
    207 	 * If we have reached here, then atleast one RDMA plugin has loaded.
    208 	 * Create a master_xprt, make it start listenining on the device,
    209 	 * if an error is generated, record it, we might need to shut
    210 	 * the master_xprt.
    211 	 * SVC_START() calls svc_rdma_kstart which calls plugin binding
    212 	 * routines.
    213 	 */
    214 	for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) {
    215 
    216 		/*
    217 		 * One SVCMASTERXPRT per RDMA plugin.
    218 		 */
    219 		xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP);
    220 		xprt->xp_ops = &rdma_svc_ops;
    221 		xprt->xp_sct = sct;
    222 		xprt->xp_type = T_RDMA;
    223 		mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
    224 		mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
    225 		xprt->xp_req_head = (mblk_t *)0;
    226 		xprt->xp_req_tail = (mblk_t *)0;
    227 		xprt->xp_threads = 0;
    228 		xprt->xp_detached_threads = 0;
    229 
    230 		rd = kmem_zalloc(sizeof (*rd), KM_SLEEP);
    231 		xprt->xp_p2 = (caddr_t)rd;
    232 		rd->rd_xprt = xprt;
    233 		rd->r_mod = rmod->r_mod;
    234 
    235 		q = &rd->rd_data.q;
    236 		xprt->xp_wq = q;
    237 		q->q_ptr = &rd->rd_xprt;
    238 		xprt->xp_netid = NULL;
    239 
    240 		if (netid != NULL) {
    241 			xprt->xp_netid = kmem_alloc(strlen(netid) + 1,
    242 						KM_SLEEP);
    243 			(void) strcpy(xprt->xp_netid, netid);
    244 		}
    245 
    246 		xprt->xp_addrmask.maxlen =
    247 		    xprt->xp_addrmask.len = sizeof (struct sockaddr_in);
    248 		xprt->xp_addrmask.buf =
    249 		    kmem_zalloc(xprt->xp_addrmask.len, KM_SLEEP);
    250 		((struct sockaddr_in *)xprt->xp_addrmask.buf)->sin_addr.s_addr =
    251 		    (uint32_t)~0;
    252 		((struct sockaddr_in *)xprt->xp_addrmask.buf)->sin_family =
    253 		    (ushort_t)~0;
    254 
    255 		/*
    256 		 * Each of the plugins will have their own Service ID
    257 		 * to listener specific mapping, like port number for VI
    258 		 * and service name for IB.
    259 		 */
    260 		rd->rd_data.svcid = id;
    261 		error = svc_xprt_register(xprt, id);
    262 		if (error) {
    263 			cmn_err(CE_WARN, "svc_rdma_kcreate: svc_xprt_register"
    264 				"failed");
    265 			goto cleanup;
    266 		}
    267 
    268 		SVC_START(xprt);
    269 		if (!rd->rd_data.active) {
    270 			svc_xprt_unregister(xprt);
    271 			error = rd->rd_data.err_code;
    272 			goto cleanup;
    273 		}
    274 
    275 		/*
    276 		 * This is set only when there is atleast one or more
    277 		 * transports successfully created. We insert the pointer
    278 		 * to the created RDMA master xprt into a separately maintained
    279 		 * list. This way we can easily reference it later to cleanup,
    280 		 * when NFS kRPC service pool is going away/unregistered.
    281 		 */
    282 		started_xprts->rtg_count ++;
    283 		xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP);
    284 		xprt_rec->rtr_xprt_ptr = xprt;
    285 		xprt_rec->rtr_next = started_xprts->rtg_listhead;
    286 		started_xprts->rtg_listhead = xprt_rec;
    287 		continue;
    288 cleanup:
    289 		SVC_DESTROY(xprt);
    290 		if (error == RDMA_FAILED)
    291 			error = EPROTONOSUPPORT;
    292 	}
    293 
    294 	rw_exit(&rdma_lock);
    295 
    296 	/*
    297 	 * Don't return any error even if a single plugin was started
    298 	 * successfully.
    299 	 */
    300 	if (started_xprts->rtg_count == 0)
    301 		return (error);
    302 	return (0);
    303 }
    304 
    305 /*
    306  * Cleanup routine for freeing up memory allocated by
    307  * svc_rdma_kcreate()
    308  */
    309 void
    310 svc_rdma_kdestroy(SVCMASTERXPRT *xprt)
    311 {
    312 	struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2;
    313 
    314 
    315 	mutex_destroy(&xprt->xp_req_lock);
    316 	mutex_destroy(&xprt->xp_thread_lock);
    317 	kmem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
    318 	kmem_free(rd, sizeof (*rd));
    319 	kmem_free(xprt->xp_addrmask.buf, xprt->xp_addrmask.maxlen);
    320 	kmem_free(xprt, sizeof (*xprt));
    321 }
    322 
    323 
    324 static void
    325 svc_rdma_kstart(SVCMASTERXPRT *xprt)
    326 {
    327 	struct rdma_svc_data *svcdata;
    328 	rdma_mod_t *rmod;
    329 
    330 	svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
    331 	rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
    332 
    333 	/*
    334 	 * Create a listener for  module at this port
    335 	 */
    336 
    337 	(*rmod->rdma_ops->rdma_svc_listen)(svcdata);
    338 }
    339 
    340 void
    341 svc_rdma_kstop(SVCMASTERXPRT *xprt)
    342 {
    343 	struct rdma_svc_data *svcdata;
    344 	rdma_mod_t *rmod;
    345 
    346 	svcdata	= &((struct rdma_data *)xprt->xp_p2)->rd_data;
    347 	rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
    348 
    349 	/*
    350 	 * Call the stop listener routine for each plugin.
    351 	 */
    352 	(*rmod->rdma_ops->rdma_svc_stop)(svcdata);
    353 	if (svcdata->active)
    354 		cmn_err(CE_WARN, "rdma_stop: Failed to shutdown RDMA based kRPC"
    355 			"  listener");
    356 }
    357 
    358 /* ARGSUSED */
    359 static void
    360 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt)
    361 {
    362 }
    363 
    364 static bool_t
    365 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg)
    366 {
    367 	XDR *xdrs;
    368 	rdma_stat status;
    369 	struct recv_data *rdp = (struct recv_data *)mp->b_rptr;
    370 	CONN *conn;
    371 	struct clone_rdma_data *vd;
    372 	struct clist *cl;
    373 	uint_t vers, op, pos;
    374 	uint32_t xid;
    375 
    376 	vd = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
    377 	RSSTAT_INCR(rscalls);
    378 	conn = rdp->conn;
    379 
    380 	/*
    381 	 * Post a receive descriptor on this
    382 	 * endpoint to ensure all packets are received.
    383 	 */
    384 	status = rdma_svc_postrecv(conn);
    385 	if (status != RDMA_SUCCESS) {
    386 		cmn_err(CE_NOTE,
    387 		    "svc_rdma_krecv: rdma_svc_postrecv failed %d", status);
    388 	}
    389 
    390 	if (rdp->status != 0) {
    391 		RDMA_BUF_FREE(conn, &rdp->rpcmsg);
    392 		RDMA_REL_CONN(conn);
    393 		RSSTAT_INCR(rsbadcalls);
    394 		freeb(mp);
    395 		return (FALSE);
    396 	}
    397 
    398 	/*
    399 	 * Decode rpc message
    400 	 */
    401 	xdrs = &clone_xprt->xp_xdrin;
    402 	xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE);
    403 
    404 	/*
    405 	 * Get the XID
    406 	 */
    407 	/*
    408 	 * Treat xid as opaque (xid is the first entity
    409 	 * in the rpc rdma message).
    410 	 */
    411 	xid = *(uint32_t *)rdp->rpcmsg.addr;
    412 	/* Skip xid and set the xdr position accordingly. */
    413 	XDR_SETPOS(xdrs, sizeof (uint32_t));
    414 	if (! xdr_u_int(xdrs, &vers) ||
    415 	    ! xdr_u_int(xdrs, &op)) {
    416 		cmn_err(CE_WARN, "svc_rdma_krecv: xdr_u_int failed");
    417 		XDR_DESTROY(xdrs);
    418 		RDMA_BUF_FREE(conn, &rdp->rpcmsg);
    419 		RDMA_REL_CONN(conn);
    420 		freeb(mp);
    421 		RSSTAT_INCR(rsbadcalls);
    422 		return (FALSE);
    423 	}
    424 	if (op == RDMA_DONE) {
    425 		/*
    426 		 * Should not get RDMA_DONE
    427 		 */
    428 		freeb(mp);
    429 		XDR_DESTROY(xdrs);
    430 		RDMA_BUF_FREE(conn, &rdp->rpcmsg);
    431 		RDMA_REL_CONN(conn);
    432 		RSSTAT_INCR(rsbadcalls);
    433 		return (FALSE); /* no response */
    434 	}
    435 
    436 #ifdef DEBUG
    437 	if (rdma_svc_debug)
    438 		printf("svc_rdma_krecv: recv'd call xid %u\n", xid);
    439 #endif
    440 	/*
    441 	 * Now decode the chunk list
    442 	 */
    443 	cl = NULL;
    444 	if (! xdr_do_clist(xdrs, &cl)) {
    445 		cmn_err(CE_WARN, "svc_rdma_krecv: xdr_do_clist failed");
    446 	}
    447 
    448 	/*
    449 	 * A chunk at 0 offset indicates that the RPC call message
    450 	 * is in a chunk. Get the RPC call message chunk.
    451 	 */
    452 	if (cl != NULL && op == RDMA_NOMSG) {
    453 		struct clist *cllong;	/* Long RPC chunk */
    454 
    455 		/* Remove RPC call message chunk from chunklist */
    456 		cllong = cl;
    457 		cl = cl->c_next;
    458 		cllong->c_next = NULL;
    459 
    460 		/* Allocate and register memory for the RPC call msg chunk */
    461 		cllong->c_daddr = (uint64)(uintptr_t)
    462 		    kmem_alloc(cllong->c_len, KM_SLEEP);
    463 		if (cllong->c_daddr == NULL) {
    464 			cmn_err(CE_WARN,
    465 				"svc_rdma_krecv: no memory for rpc call");
    466 			XDR_DESTROY(xdrs);
    467 			RDMA_BUF_FREE(conn, &rdp->rpcmsg);
    468 			RDMA_REL_CONN(conn);
    469 			freeb(mp);
    470 			RSSTAT_INCR(rsbadcalls);
    471 			clist_free(cl);
    472 			clist_free(cllong);
    473 			return (FALSE);
    474 		}
    475 		status = clist_register(conn, cllong, 0);
    476 		if (status) {
    477 			cmn_err(CE_WARN,
    478 				"svc_rdma_krecv: clist_register failed");
    479 			kmem_free((void *)(uintptr_t)cllong->c_daddr,
    480 			    cllong->c_len);
    481 			XDR_DESTROY(xdrs);
    482 			RDMA_BUF_FREE(conn, &rdp->rpcmsg);
    483 			RDMA_REL_CONN(conn);
    484 			freeb(mp);
    485 			RSSTAT_INCR(rsbadcalls);
    486 			clist_free(cl);
    487 			clist_free(cllong);
    488 			return (FALSE);
    489 		}
    490 
    491 		/*
    492 		 * Now read the RPC call message in
    493 		 */
    494 		status = RDMA_READ(conn, cllong, WAIT);
    495 		if (status) {
    496 			cmn_err(CE_WARN,
    497 			    "svc_rdma_krecv: rdma_read failed %d", status);
    498 			(void) clist_deregister(conn, cllong, 0);
    499 			kmem_free((void *)(uintptr_t)cllong->c_daddr,
    500 			    cllong->c_len);
    501 			XDR_DESTROY(xdrs);
    502 			RDMA_BUF_FREE(conn, &rdp->rpcmsg);
    503 			RDMA_REL_CONN(conn);
    504 			freeb(mp);
    505 			RSSTAT_INCR(rsbadcalls);
    506 			clist_free(cl);
    507 			clist_free(cllong);
    508 			return (FALSE);
    509 		}
    510 		/*
    511 		 * Sync memory for CPU after DMA
    512 		 */
    513 		status = clist_syncmem(conn, cllong, 0);
    514 
    515 		/*
    516 		 * Deregister the chunk
    517 		 */
    518 		(void) clist_deregister(conn, cllong, 0);
    519 
    520 		/*
    521 		 * Setup the XDR for the RPC call message
    522 		 */
    523 		xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->c_daddr,
    524 		    cllong->c_len, 0, cl, XDR_DECODE, conn);
    525 		vd->rpcbuf.type = CHUNK_BUFFER;
    526 		vd->rpcbuf.addr = (caddr_t)(uintptr_t)cllong->c_daddr;
    527 		vd->rpcbuf.len = cllong->c_len;
    528 		vd->rpcbuf.handle.mrc_rmr = 0;
    529 
    530 		/*
    531 		 * Free the chunk element with the Long RPC details and
    532 		 * the message received.
    533 		 */
    534 		clist_free(cllong);
    535 		RDMA_BUF_FREE(conn, &rdp->rpcmsg);
    536 	} else {
    537 		pos = XDR_GETPOS(xdrs);
    538 
    539 		/*
    540 		 * Now the RPC call message header
    541 		 */
    542 		xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos,
    543 			rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn);
    544 		vd->rpcbuf = rdp->rpcmsg;
    545 	}
    546 	if (! xdr_callmsg(xdrs, msg)) {
    547 		cmn_err(CE_WARN, "svc_rdma_krecv: xdr_callmsg failed");
    548 		if (cl != NULL)
    549 			clist_free(cl);
    550 		XDR_DESTROY(xdrs);
    551 		rdma_buf_free(conn, &vd->rpcbuf);
    552 		RDMA_REL_CONN(conn);
    553 		freeb(mp);
    554 		RSSTAT_INCR(rsxdrcall);
    555 		RSSTAT_INCR(rsbadcalls);
    556 		return (FALSE);
    557 	}
    558 
    559 	/*
    560 	 * Point the remote transport address in the service_transport
    561 	 * handle at the address in the request.
    562 	 */
    563 	clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf;
    564 	clone_xprt->xp_rtaddr.len = conn->c_raddr.len;
    565 	clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len;
    566 
    567 #ifdef DEBUG
    568 	if (rdma_svc_debug) {
    569 		struct sockaddr_in *sin4;
    570 		char print_addr[INET_ADDRSTRLEN];
    571 
    572 		sin4 = (struct sockaddr_in *)clone_xprt->xp_rtaddr.buf;
    573 		bzero(print_addr, INET_ADDRSTRLEN);
    574 		(void) inet_ntop(AF_INET,
    575 		    &sin4->sin_addr, print_addr, INET_ADDRSTRLEN);
    576 		cmn_err(CE_NOTE,
    577 		    "svc_rdma_krecv: remote clnt_addr: %s", print_addr);
    578 	}
    579 #endif
    580 
    581 	clone_xprt->xp_xid = xid;
    582 	vd->conn = conn;
    583 	freeb(mp);
    584 	return (TRUE);
    585 }
    586 
    587 /*
    588  * Send rpc reply.
    589  */
    590 static bool_t
    591 svc_rdma_ksend(SVCXPRT *clone_xprt, struct rpc_msg *msg)
    592 {
    593 	struct clone_rdma_data *vd;
    594 	XDR *xdrs = &(clone_xprt->xp_xdrout), rxdrs;
    595 	int retval = FALSE;
    596 	xdrproc_t xdr_results;
    597 	caddr_t xdr_location;
    598 	bool_t has_args, reg = FALSE;
    599 	uint_t len, op;
    600 	uint_t vers;
    601 	struct clist *cl = NULL, *cle = NULL;
    602 	struct clist *sendlist = NULL;
    603 	int status;
    604 	int msglen;
    605 	rdma_buf_t clmsg, longreply, rpcreply;
    606 
    607 	vd = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
    608 
    609 	/*
    610 	 * If there is a result procedure specified in the reply message,
    611 	 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP.
    612 	 * We need to make sure it won't be processed twice, so we null
    613 	 * it for xdr_replymsg here.
    614 	 */
    615 	has_args = FALSE;
    616 	if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
    617 	    msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
    618 		if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) {
    619 			has_args = TRUE;
    620 			xdr_location = msg->acpted_rply.ar_results.where;
    621 			msg->acpted_rply.ar_results.proc = xdr_void;
    622 			msg->acpted_rply.ar_results.where = NULL;
    623 		}
    624 	}
    625 
    626 	/*
    627 	 * Get the size of the rpc reply message. Need this
    628 	 * to determine if the rpc reply message will fit in
    629 	 * the pre-allocated RDMA buffers. If the rpc reply
    630 	 * message length is greater that the pre-allocated
    631 	 * buffers then, a one time use buffer is allocated
    632 	 * and registered for this rpc reply.
    633 	 */
    634 	msglen = xdr_sizeof(xdr_replymsg, msg);
    635 	if (has_args && msg->rm_reply.rp_acpt.ar_verf.oa_flavor != RPCSEC_GSS) {
    636 		msglen += xdrrdma_sizeof(xdr_results, xdr_location,
    637 				rdma_minchunk);
    638 		if (msglen > RPC_MSG_SZ) {
    639 
    640 			/*
    641 			 * Allocate chunk buffer for rpc reply
    642 			 */
    643 			rpcreply.type = CHUNK_BUFFER;
    644 			rpcreply.addr = kmem_zalloc(msglen, KM_SLEEP);
    645 			cle = kmem_zalloc(sizeof (*cle), KM_SLEEP);
    646 			cle->c_xdroff = 0;
    647 			cle->c_len  = rpcreply.len = msglen;
    648 			cle->c_saddr = (uint64)(uintptr_t)rpcreply.addr;
    649 			cle->c_next = NULL;
    650 			xdrrdma_create(xdrs, rpcreply.addr, msglen,
    651 			    rdma_minchunk, cle, XDR_ENCODE, NULL);
    652 			op = RDMA_NOMSG;
    653 		} else {
    654 			/*
    655 			 * Get a pre-allocated buffer for rpc reply
    656 			 */
    657 			rpcreply.type = SEND_BUFFER;
    658 			if (RDMA_BUF_ALLOC(vd->conn, &rpcreply)) {
    659 				cmn_err(CE_WARN,
    660 				    "svc_rdma_ksend: no free buffers!");
    661 				return (retval);
    662 			}
    663 			xdrrdma_create(xdrs, rpcreply.addr, rpcreply.len,
    664 			    rdma_minchunk, NULL, XDR_ENCODE, NULL);
    665 			op = RDMA_MSG;
    666 		}
    667 
    668 		/*
    669 		 * Initialize the XDR encode stream.
    670 		 */
    671 		msg->rm_xid = clone_xprt->xp_xid;
    672 
    673 		if (!(xdr_replymsg(xdrs, msg) &&
    674 		    (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, xdrs,
    675 		    xdr_results, xdr_location)))) {
    676 			rdma_buf_free(vd->conn, &rpcreply);
    677 			if (cle)
    678 				clist_free(cle);
    679 			cmn_err(CE_WARN,
    680 			    "svc_rdma_ksend: xdr_replymsg/SVCAUTH_WRAP "
    681 			    "failed");
    682 			goto out;
    683 		}
    684 		len = XDR_GETPOS(xdrs);
    685 	}
    686 	if (has_args && msg->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS) {
    687 
    688 		/*
    689 		 * For RPCSEC_GSS since we cannot accurately presize the
    690 		 * buffer required for encoding, we assume that its going
    691 		 * to be a Long RPC to start with. We also create the
    692 		 * the XDR stream with min_chunk set to 0 which instructs
    693 		 * the XDR layer to not chunk the incoming byte stream.
    694 		 */
    695 		msglen += 2 * MAX_AUTH_BYTES + 2 * sizeof (struct opaque_auth);
    696 		msglen += xdr_sizeof(xdr_results, xdr_location);
    697 
    698 		/*
    699 		 * Long RPC. Allocate one time use custom buffer.
    700 		 */
    701 		longreply.type = CHUNK_BUFFER;
    702 		longreply.addr = kmem_zalloc(msglen, KM_SLEEP);
    703 		cle = kmem_zalloc(sizeof (*cle), KM_SLEEP);
    704 		cle->c_xdroff = 0;
    705 		cle->c_len  = longreply.len = msglen;
    706 		cle->c_saddr = (uint64)(uintptr_t)longreply.addr;
    707 		cle->c_next = NULL;
    708 		xdrrdma_create(xdrs, longreply.addr, msglen, 0, cle,
    709 		    XDR_ENCODE, NULL);
    710 		op = RDMA_NOMSG;
    711 		/*
    712 		 * Initialize the XDR encode stream.
    713 		 */
    714 		msg->rm_xid = clone_xprt->xp_xid;
    715 
    716 		if (!(xdr_replymsg(xdrs, msg) &&
    717 		    (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, xdrs,
    718 		    xdr_results, xdr_location)))) {
    719 			if (longreply.addr != xdrs->x_base) {
    720 				longreply.addr = xdrs->x_base;
    721 				longreply.len = xdr_getbufsize(xdrs);
    722 			}
    723 			rdma_buf_free(vd->conn, &longreply);
    724 			if (cle)
    725 				clist_free(cle);
    726 			cmn_err(CE_WARN,
    727 			    "svc_rdma_ksend: xdr_replymsg/SVCAUTH_WRAP "
    728 			    "failed");
    729 			goto out;
    730 		}
    731 
    732 		/*
    733 		 * If we had to allocate a new buffer while encoding
    734 		 * then update the addr and len.
    735 		 */
    736 		if (longreply.addr != xdrs->x_base) {
    737 			longreply.addr = xdrs->x_base;
    738 			longreply.len = xdr_getbufsize(xdrs);
    739 		}
    740 
    741 		len = XDR_GETPOS(xdrs);
    742 
    743 		/*
    744 		 * If it so happens that the encoded message is after all
    745 		 * not long enough to be a Long RPC then allocate a
    746 		 * SEND_BUFFER and copy the encoded message into it.
    747 		 */
    748 		if (len > RPC_MSG_SZ) {
    749 			rpcreply.type = CHUNK_BUFFER;
    750 			rpcreply.addr = longreply.addr;
    751 			rpcreply.len = longreply.len;
    752 		} else {
    753 			clist_free(cle);
    754 			XDR_DESTROY(xdrs);
    755 			/*
    756 			 * Get a pre-allocated buffer for rpc reply
    757 			 */
    758 			rpcreply.type = SEND_BUFFER;
    759 			if (RDMA_BUF_ALLOC(vd->conn, &rpcreply)) {
    760 				cmn_err(CE_WARN,
    761 				    "svc_rdma_ksend: no free buffers!");
    762 				rdma_buf_free(vd->conn, &longreply);
    763 				return (retval);
    764 			}
    765 			bcopy(longreply.addr, rpcreply.addr, len);
    766 			xdrrdma_create(xdrs, rpcreply.addr, len, 0, NULL,
    767 			    XDR_ENCODE, NULL);
    768 			rdma_buf_free(vd->conn, &longreply);
    769 			op = RDMA_MSG;
    770 		}
    771 	}
    772 
    773 	if (has_args == FALSE) {
    774 
    775 		if (msglen > RPC_MSG_SZ) {
    776 
    777 			/*
    778 			 * Allocate chunk buffer for rpc reply
    779 			 */
    780 			rpcreply.type = CHUNK_BUFFER;
    781 			rpcreply.addr = kmem_zalloc(msglen, KM_SLEEP);
    782 			cle = kmem_zalloc(sizeof (*cle), KM_SLEEP);
    783 			cle->c_xdroff = 0;
    784 			cle->c_len  = rpcreply.len = msglen;
    785 			cle->c_saddr = (uint64)(uintptr_t)rpcreply.addr;
    786 			cle->c_next = NULL;
    787 			xdrrdma_create(xdrs, rpcreply.addr, msglen,
    788 			    rdma_minchunk, cle, XDR_ENCODE, NULL);
    789 			op = RDMA_NOMSG;
    790 		} else {
    791 			/*
    792 			 * Get a pre-allocated buffer for rpc reply
    793 			 */
    794 			rpcreply.type = SEND_BUFFER;
    795 			if (RDMA_BUF_ALLOC(vd->conn, &rpcreply)) {
    796 				cmn_err(CE_WARN,
    797 				    "svc_rdma_ksend: no free buffers!");
    798 				return (retval);
    799 			}
    800 			xdrrdma_create(xdrs, rpcreply.addr, rpcreply.len,
    801 			    rdma_minchunk, NULL, XDR_ENCODE, NULL);
    802 			op = RDMA_MSG;
    803 		}
    804 
    805 		/*
    806 		 * Initialize the XDR encode stream.
    807 		 */
    808 		msg->rm_xid = clone_xprt->xp_xid;
    809 
    810 		if (!xdr_replymsg(xdrs, msg)) {
    811 			rdma_buf_free(vd->conn, &rpcreply);
    812 			if (cle)
    813 				clist_free(cle);
    814 			cmn_err(CE_WARN,
    815 			    "svc_rdma_ksend: xdr_replymsg/SVCAUTH_WRAP "
    816 			    "failed");
    817 			goto out;
    818 		}
    819 		len = XDR_GETPOS(xdrs);
    820 	}
    821 
    822 	/*
    823 	 * Get clist and a buffer for sending it across
    824 	 */
    825 	cl = xdrrdma_clist(xdrs);
    826 	clmsg.type = SEND_BUFFER;
    827 	if (RDMA_BUF_ALLOC(vd->conn, &clmsg)) {
    828 		rdma_buf_free(vd->conn, &rpcreply);
    829 		cmn_err(CE_WARN, "svc_rdma_ksend: no free buffers!!");
    830 		goto out;
    831 	}
    832 
    833 	/*
    834 	 * Now register the chunks in the list
    835 	 */
    836 	if (cl != NULL) {
    837 		status = clist_register(vd->conn, cl, 1);
    838 		if (status != RDMA_SUCCESS) {
    839 			rdma_buf_free(vd->conn, &clmsg);
    840 			cmn_err(CE_WARN,
    841 				"svc_rdma_ksend: clist register failed");
    842 			goto out;
    843 		}
    844 		reg = TRUE;
    845 	}
    846 
    847 	/*
    848 	 * XDR the XID, vers, and op
    849 	 */
    850 	/*
    851 	 * Treat xid as opaque (xid is the first entity
    852 	 * in the rpc rdma message).
    853 	 */
    854 	vers = RPCRDMA_VERS;
    855 	xdrs = &rxdrs;
    856 	xdrmem_create(xdrs, clmsg.addr, clmsg.len, XDR_ENCODE);
    857 	(*(uint32_t *)clmsg.addr) = msg->rm_xid;
    858 	/* Skip xid and set the xdr position accordingly. */
    859 	XDR_SETPOS(xdrs, sizeof (uint32_t));
    860 	if (! xdr_u_int(xdrs, &vers) ||
    861 	    ! xdr_u_int(xdrs, &op)) {
    862 		rdma_buf_free(vd->conn, &rpcreply);
    863 		rdma_buf_free(vd->conn, &clmsg);
    864 		cmn_err(CE_WARN, "svc_rdma_ksend: xdr_u_int failed");
    865 		goto out;
    866 	}
    867 
    868 	/*
    869 	 * Now XDR the chunk list
    870 	 */
    871 	(void) xdr_do_clist(xdrs, &cl);
    872 
    873 	clist_add(&sendlist, 0, XDR_GETPOS(xdrs), &clmsg.handle, clmsg.addr,
    874 		NULL, NULL);
    875 
    876 	if (op == RDMA_MSG) {
    877 		clist_add(&sendlist, 0, len, &rpcreply.handle, rpcreply.addr,
    878 			NULL, NULL);
    879 	} else {
    880 		cl->c_len = len;
    881 		RSSTAT_INCR(rslongrpcs);
    882 	}
    883 
    884 	/*
    885 	 * Send the reply message to the client
    886 	 */
    887 	if (cl != NULL) {
    888 		status = clist_syncmem(vd->conn, cl, 1);
    889 		if (status != RDMA_SUCCESS) {
    890 			rdma_buf_free(vd->conn, &rpcreply);
    891 			rdma_buf_free(vd->conn, &clmsg);
    892 			goto out;
    893 		}
    894 #ifdef DEBUG
    895 	if (rdma_svc_debug)
    896 		printf("svc_rdma_ksend: chunk response len %d xid %u\n",
    897 			cl->c_len, msg->rm_xid);
    898 #endif
    899 		/*
    900 		 * Post a receive buffer because we expect a RDMA_DONE
    901 		 * message.
    902 		 */
    903 		status = rdma_svc_postrecv(vd->conn);
    904 
    905 		/*
    906 		 * Send the RPC reply message and wait for RDMA_DONE
    907 		 */
    908 		status = RDMA_SEND_RESP(vd->conn, sendlist, msg->rm_xid);
    909 		if (status != RDMA_SUCCESS) {
    910 #ifdef DEBUG
    911 			if (rdma_svc_debug)
    912 				cmn_err(CE_NOTE, "svc_rdma_ksend: "
    913 					"rdma_send_resp failed %d", status);
    914 #endif
    915 			goto out;
    916 		}
    917 #ifdef DEBUG
    918 	if (rdma_svc_debug)
    919 		printf("svc_rdma_ksend: got RDMA_DONE xid %u\n", msg->rm_xid);
    920 #endif
    921 	} else {
    922 #ifdef DEBUG
    923 	if (rdma_svc_debug)
    924 		printf("svc_rdma_ksend: msg response xid %u\n", msg->rm_xid);
    925 #endif
    926 		status = RDMA_SEND(vd->conn, sendlist, msg->rm_xid);
    927 		if (status != RDMA_SUCCESS) {
    928 #ifdef DEBUG
    929 			if (rdma_svc_debug)
    930 				cmn_err(CE_NOTE, "svc_rdma_ksend: "
    931 					"rdma_send failed %d", status);
    932 #endif
    933 			goto out;
    934 		}
    935 	}
    936 
    937 	retval = TRUE;
    938 out:
    939 	/*
    940 	 * Deregister the chunks
    941 	 */
    942 	if (cl != NULL) {
    943 		if (reg)
    944 			(void) clist_deregister(vd->conn, cl, 1);
    945 		if (op == RDMA_NOMSG) {
    946 			/*
    947 			 * Long RPC reply in chunk. Free it up.
    948 			 */
    949 			rdma_buf_free(vd->conn, &rpcreply);
    950 		}
    951 		clist_free(cl);
    952 	}
    953 
    954 	/*
    955 	 * Free up sendlist chunks
    956 	 */
    957 	if (sendlist != NULL)
    958 		clist_free(sendlist);
    959 
    960 	/*
    961 	 * Destroy private data for xdr rdma
    962 	 */
    963 	XDR_DESTROY(&(clone_xprt->xp_xdrout));
    964 
    965 	/*
    966 	 * This is completely disgusting.  If public is set it is
    967 	 * a pointer to a structure whose first field is the address
    968 	 * of the function to free that structure and any related
    969 	 * stuff.  (see rrokfree in nfs_xdr.c).
    970 	 */
    971 	if (xdrs->x_public) {
    972 		/* LINTED pointer alignment */
    973 		(**((int (**)())xdrs->x_public))(xdrs->x_public);
    974 	}
    975 
    976 	return (retval);
    977 }
    978 
    979 /*
    980  * Deserialize arguments.
    981  */
    982 static bool_t
    983 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr)
    984 {
    985 	if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin,
    986 	    xdr_args, args_ptr)) != TRUE)
    987 		return (FALSE);
    988 	return (TRUE);
    989 }
    990 
    991 static bool_t
    992 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args,
    993     caddr_t args_ptr)
    994 {
    995 	struct clone_rdma_data *vd;
    996 	bool_t retval;
    997 
    998 	vd = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
    999 	if (args_ptr) {
   1000 		XDR	*xdrs = &clone_xprt->xp_xdrin;
   1001 		struct clist *cl;
   1002 
   1003 		cl = xdrrdma_clist(xdrs);
   1004 		if (cl != NULL)
   1005 			clist_free(cl);
   1006 
   1007 		xdrs->x_op = XDR_FREE;
   1008 		retval = (*xdr_args)(xdrs, args_ptr);
   1009 	}
   1010 	XDR_DESTROY(&(clone_xprt->xp_xdrin));
   1011 	rdma_buf_free(vd->conn, &vd->rpcbuf);
   1012 	RDMA_REL_CONN(vd->conn);
   1013 	return (retval);
   1014 }
   1015 
   1016 /* ARGSUSED */
   1017 static int32_t *
   1018 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size)
   1019 {
   1020 	return (NULL);
   1021 }
   1022 
   1023 /* ARGSUSED */
   1024 static void
   1025 svc_rdma_kfreeres(SVCXPRT *clone_xprt)
   1026 {
   1027 }
   1028 
   1029 /*
   1030  * the dup cacheing routines below provide a cache of non-failure
   1031  * transaction id's.  rpc service routines can use this to detect
   1032  * retransmissions and re-send a non-failure response.
   1033  */
   1034 
   1035 /*
   1036  * MAXDUPREQS is the number of cached items.  It should be adjusted
   1037  * to the service load so that there is likely to be a response entry
   1038  * when the first retransmission comes in.
   1039  */
   1040 #define	MAXDUPREQS	1024
   1041 
   1042 /*
   1043  * This should be appropriately scaled to MAXDUPREQS.
   1044  */
   1045 #define	DRHASHSZ	257
   1046 
   1047 #if ((DRHASHSZ & (DRHASHSZ - <