Home | History | Annotate | Download | only in rpc
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License, Version 1.0 only
      6  * (the "License").  You may not use this file except in compliance
      7  * with the License.
      8  *
      9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
     10  * or http://www.opensolaris.org/os/licensing.
     11  * See the License for the specific language governing permissions
     12  * and limitations under the License.
     13  *
     14  * When distributing Covered Code, include this CDDL HEADER in each
     15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     16  * If applicable, add the following below this CDDL HEADER, with the
     17  * fields enclosed by brackets "[]" replaced with your own identifying
     18  * information: Portions Copyright [yyyy] [name of copyright owner]
     19  *
     20  * CDDL HEADER END
     21  */
     22 /*
     23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
     27 /* All Rights Reserved */
     28 /*
     29  * Portions of this source code were derived from Berkeley
     30  * 4.3 BSD under license from the Regents of the University of
     31  * California.
     32  */
     33 
     34 #pragma ident	"@(#)clnt_rdma.c	1.10	05/07/26 SMI"
     35 
     36 #include <sys/param.h>
     37 #include <sys/types.h>
     38 #include <sys/user.h>
     39 #include <sys/systm.h>
     40 #include <sys/sysmacros.h>
     41 #include <sys/errno.h>
     42 #include <sys/kmem.h>
     43 #include <sys/debug.h>
     44 #include <sys/systm.h>
     45 #include <sys/kstat.h>
     46 #include <sys/t_lock.h>
     47 #include <sys/ddi.h>
     48 #include <sys/cmn_err.h>
     49 #include <sys/time.h>
     50 #include <sys/isa_defs.h>
     51 #include <sys/zone.h>
     52 
     53 #include <rpc/types.h>
     54 #include <rpc/xdr.h>
     55 #include <rpc/auth.h>
     56 #include <rpc/clnt.h>
     57 #include <rpc/rpc_msg.h>
     58 #include <rpc/rpc_rdma.h>
     59 
     60 
     61 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
     62     caddr_t, xdrproc_t, caddr_t, struct timeval);
     63 static void	clnt_rdma_kabort(CLIENT *);
     64 static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
     65 static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
     66 static void	clnt_rdma_kdestroy(CLIENT *);
     67 static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
     68 static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
     69     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
     70 
     71 /*
     72  * Operations vector for RDMA based RPC
     73  */
     74 static struct clnt_ops rdma_clnt_ops = {
     75 	clnt_rdma_kcallit,	/* do rpc call */
     76 	clnt_rdma_kabort,	/* abort call */
     77 	clnt_rdma_kerror,	/* return error status */
     78 	clnt_rdma_kfreeres,	/* free results */
     79 	clnt_rdma_kdestroy,	/* destroy rpc handle */
     80 	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
     81 	clnt_rdma_ksettimers,	/* set retry timers */
     82 };
     83 
     84 /*
     85  * The size of the preserialized RPC header information.
     86  */
     87 #define	CKU_HDRSIZE	20
     88 
     89 /*
     90  * Per RPC RDMA endpoint details
     91  */
     92 typedef struct cku_private {
     93 	CLIENT			cku_client;	/* client handle */
     94 	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
     95 	void			*cku_rd_handle;	/* underlying RDMA device */
     96 	struct netbuf		cku_addr;	/* remote netbuf address */
     97 	int			cku_addrfmly;	/* for finding addr_type */
     98 	struct rpc_err		cku_err;	/* error status */
     99 	struct cred		*cku_cred;	/* credentials */
    100 	XDR			cku_outxdr;	/* xdr stream for output */
    101 	uint32_t		cku_outsz;
    102 	XDR			cku_inxdr;	/* xdr stream for input */
    103 	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
    104 	uint32_t		cku_xid;	/* current XID */
    105 } cku_private_t;
    106 
    107 #define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
    108 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
    109 
    110 struct {
    111 	kstat_named_t	rccalls;
    112 	kstat_named_t	rcbadcalls;
    113 	kstat_named_t	rcbadxids;
    114 	kstat_named_t	rctimeouts;
    115 	kstat_named_t	rcnewcreds;
    116 	kstat_named_t	rcbadverfs;
    117 	kstat_named_t	rctimers;
    118 	kstat_named_t	rccantconn;
    119 	kstat_named_t	rcnomem;
    120 	kstat_named_t	rcintrs;
    121 	kstat_named_t	rclongrpcs;
    122 } rdmarcstat = {
    123 	{ "calls",	KSTAT_DATA_UINT64 },
    124 	{ "badcalls",	KSTAT_DATA_UINT64 },
    125 	{ "badxids",	KSTAT_DATA_UINT64 },
    126 	{ "timeouts",	KSTAT_DATA_UINT64 },
    127 	{ "newcreds",	KSTAT_DATA_UINT64 },
    128 	{ "badverfs",	KSTAT_DATA_UINT64 },
    129 	{ "timers",	KSTAT_DATA_UINT64 },
    130 	{ "cantconn",	KSTAT_DATA_UINT64 },
    131 	{ "nomem",	KSTAT_DATA_UINT64 },
    132 	{ "interrupts", KSTAT_DATA_UINT64 },
    133 	{ "longrpc", 	KSTAT_DATA_UINT64 }
    134 };
    135 
    136 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
    137 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
    138 
    139 #ifdef DEBUG
    140 int rdma_clnt_debug = 0;
    141 #endif
    142 
    143 #ifdef accurate_stats
    144 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
    145 
    146 #define	RCSTAT_INCR(x)			\
    147 	mutex_enter(&rdmarcstat_lock);	\
    148 	rdmarcstat.x.value.ui64++;	\
    149 	mutex_exit(&rdmarcstat_lock);
    150 #else
    151 #define	RCSTAT_INCR(x)			\
    152 	rdmarcstat.x.value.ui64++;
    153 #endif
    154 
    155 #define	ptoh(p)		(&((p)->cku_client))
    156 #define	htop(h)		((cku_private_t *)((h)->cl_private))
    157 
    158 int
    159 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
    160     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
    161 {
    162 	CLIENT *h;
    163 	struct cku_private *p;
    164 	struct rpc_msg call_msg;
    165 	rdma_registry_t *rp;
    166 
    167 	ASSERT(INGLOBALZONE(curproc));
    168 
    169 	if (cl == NULL)
    170 		return (EINVAL);
    171 	*cl = NULL;
    172 
    173 	p = kmem_zalloc(sizeof (*p), KM_SLEEP);
    174 
    175 	/*
    176 	 * Find underlying RDMATF plugin
    177 	 */
    178 	rw_enter(&rdma_lock, RW_READER);
    179 	rp = rdma_mod_head;
    180 	while (rp != NULL) {
    181 		if (strcmp(rp->r_mod->rdma_api, proto))
    182 			rp = rp->r_next;
    183 		else {
    184 			p->cku_rd_mod = rp->r_mod;
    185 			p->cku_rd_handle = handle;
    186 			break;
    187 		}
    188 	}
    189 	rw_exit(&rdma_lock);
    190 
    191 	if (p->cku_rd_mod == NULL) {
    192 		/*
    193 		 * Should not happen.
    194 		 * No matching RDMATF plugin.
    195 		 */
    196 		kmem_free(p, sizeof (struct cku_private));
    197 		return (EINVAL);
    198 	}
    199 
    200 	h = ptoh(p);
    201 	h->cl_ops = &rdma_clnt_ops;
    202 	h->cl_private = (caddr_t)p;
    203 	h->cl_auth = authkern_create();
    204 
    205 	/* call message, just used to pre-serialize below */
    206 	call_msg.rm_xid = 0;
    207 	call_msg.rm_direction = CALL;
    208 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
    209 	call_msg.rm_call.cb_prog = pgm;
    210 	call_msg.rm_call.cb_vers = vers;
    211 
    212 	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
    213 	/* pre-serialize call message header */
    214 	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
    215 		XDR_DESTROY(&p->cku_outxdr);
    216 		auth_destroy(h->cl_auth);
    217 		kmem_free(p, sizeof (struct cku_private));
    218 		return (EINVAL);
    219 	}
    220 
    221 	/*
    222 	 * Set up the rpc information
    223 	 */
    224 	p->cku_cred = cred;
    225 	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
    226 	p->cku_addr.maxlen = raddr->maxlen;
    227 	p->cku_addr.len = raddr->len;
    228 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
    229 	p->cku_addrfmly = family;
    230 
    231 	*cl = h;
    232 	return (0);
    233 }
    234 
    235 static void
    236 clnt_rdma_kdestroy(CLIENT *h)
    237 {
    238 	struct cku_private *p = htop(h);
    239 
    240 	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
    241 	kmem_free(p, sizeof (*p));
    242 }
    243 
    244 void
    245 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
    246     struct cred *cred)
    247 {
    248 	struct cku_private *p = htop(h);
    249 	rdma_registry_t *rp;
    250 
    251 	ASSERT(INGLOBALZONE(curproc));
    252 	/*
    253 	 * Find underlying RDMATF plugin
    254 	 */
    255 	p->cku_rd_mod = NULL;
    256 	rw_enter(&rdma_lock, RW_READER);
    257 	rp = rdma_mod_head;
    258 	while (rp != NULL) {
    259 		if (strcmp(rp->r_mod->rdma_api, proto))
    260 			rp = rp->r_next;
    261 		else {
    262 			p->cku_rd_mod = rp->r_mod;
    263 			p->cku_rd_handle = handle;
    264 			break;
    265 		}
    266 
    267 	}
    268 	rw_exit(&rdma_lock);
    269 
    270 	/*
    271 	 * Set up the rpc information
    272 	 */
    273 	p->cku_cred = cred;
    274 	p->cku_xid = 0;
    275 
    276 	if (p->cku_addr.maxlen < raddr->len) {
    277 		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
    278 			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
    279 		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
    280 		p->cku_addr.maxlen = raddr->maxlen;
    281 	}
    282 
    283 	p->cku_addr.len = raddr->len;
    284 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
    285 	h->cl_ops = &rdma_clnt_ops;
    286 }
    287 
    288 /* ARGSUSED */
    289 static enum clnt_stat
    290 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
    291     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp, struct timeval wait)
    292 {
    293 	cku_private_t *p = htop(h);
    294 	int 	status;
    295 	XDR 	*xdrs;
    296 	XDR	*cxdrp = NULL, callxdr;	/* for xdrrdma encoding the RPC call */
    297 	XDR	*rxdrp = NULL, replxdr;	/* for xdrrdma decoding the RPC reply */
    298 	struct rpc_msg 	reply_msg;
    299 	struct clist *sendlist, *recvlist = NULL;
    300 	struct clist *cl = NULL, *cle = NULL;
    301 	uint_t vers, op;
    302 	uint_t off;
    303 	uint32_t xid;
    304 	CONN *conn = NULL;
    305 	rdma_buf_t clmsg, rpcmsg, longmsg, rpcreply;
    306 	int msglen;
    307 	clock_t	ticks;
    308 
    309 	RCSTAT_INCR(rccalls);
    310 	/*
    311 	 * Get unique xid
    312 	 */
    313 	if (p->cku_xid == 0)
    314 		p->cku_xid = alloc_xid();
    315 
    316 	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_addr,
    317 	    p->cku_addrfmly, p->cku_rd_handle, &conn);
    318 
    319 	if (conn == NULL) {
    320 		/*
    321 		 * Connect failed to server. Could be because of one
    322 		 * of several things. In some cases we don't want
    323 		 * the caller to retry immediately - delay before
    324 		 * returning to caller.
    325 		 */
    326 		switch (status) {
    327 		case RDMA_TIMEDOUT:
    328 			/*
    329 			 * Already timed out. No need to delay
    330 			 * some more.
    331 			 */
    332 			p->cku_err.re_status = RPC_TIMEDOUT;
    333 			p->cku_err.re_errno = ETIMEDOUT;
    334 			break;
    335 		case RDMA_INTR:
    336 			/*
    337 			 * Failed because of an signal. Very likely
    338 			 * the caller will not retry.
    339 			 */
    340 			p->cku_err.re_status = RPC_INTR;
    341 			p->cku_err.re_errno = EINTR;
    342 			break;
    343 		default:
    344 			/*
    345 			 * All other failures - server down or service
    346 			 * down or temporary resource failure. Delay before
    347 			 * returning to caller.
    348 			 */
    349 			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
    350 			p->cku_err.re_status = RPC_CANTCONNECT;
    351 			p->cku_err.re_errno = EIO;
    352 
    353 			if (h->cl_nosignal == TRUE) {
    354 				delay(ticks);
    355 			} else {
    356 				if (delay_sig(ticks) == EINTR) {
    357 					p->cku_err.re_status = RPC_INTR;
    358 					p->cku_err.re_errno = EINTR;
    359 				}
    360 			}
    361 			break;
    362 		}
    363 
    364 		return (p->cku_err.re_status);
    365 	}
    366 	/*
    367 	 * Get the size of the rpc call message. Need this
    368 	 * to determine if the rpc call message will fit in
    369 	 * the pre-allocated RDMA buffers. If the rpc call
    370 	 * message length is greater that the pre-allocated
    371 	 * buffers then, it is a Long RPC. A one time use
    372 	 * buffer is allocated and registered for the Long
    373 	 * RPC call.
    374 	 */
    375 	xdrs = &callxdr;
    376 	msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT;
    377 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
    378 		msglen += xdrrdma_authsize(h->cl_auth, p->cku_cred,
    379 				rdma_minchunk);
    380 		msglen += xdrrdma_sizeof(xdr_args, argsp, rdma_minchunk);
    381 
    382 		if (msglen > RPC_MSG_SZ) {
    383 
    384 			/*
    385 			 * Long RPC. Allocate one time use custom buffer.
    386 			 */
    387 			rpcmsg.type = CHUNK_BUFFER;
    388 			rpcmsg.addr = kmem_zalloc(msglen, KM_SLEEP);
    389 			cle = kmem_zalloc(sizeof (*cle), KM_SLEEP);
    390 			cle->c_xdroff = 0;
    391 			cle->c_len  = rpcmsg.len = msglen;
    392 			cle->c_saddr = (uint64)(uintptr_t)rpcmsg.addr;
    393 			cle->c_next = NULL;
    394 			xdrrdma_create(xdrs, rpcmsg.addr, msglen,
    395 			    rdma_minchunk, cle, XDR_ENCODE, NULL);
    396 			cxdrp = xdrs;
    397 			op = RDMA_NOMSG;
    398 		} else {
    399 			/*
    400 			 * Get a pre-allocated buffer for rpc call
    401 			 */
    402 			rpcmsg.type = SEND_BUFFER;
    403 			if (RDMA_BUF_ALLOC(conn, &rpcmsg)) {
    404 				p->cku_err.re_status = RPC_CANTSEND;
    405 				p->cku_err.re_errno = EIO;
    406 				RCSTAT_INCR(rcnomem);
    407 				cmn_err(CE_WARN,
    408 				    "clnt_rdma_kcallit: no buffers!");
    409 				goto done;
    410 			}
    411 			xdrrdma_create(xdrs, rpcmsg.addr, rpcmsg.len,
    412 			    rdma_minchunk, NULL, XDR_ENCODE, NULL);
    413 			cxdrp = xdrs;
    414 			op = RDMA_MSG;
    415 		}
    416 	} else {
    417 		/*
    418 		 * For RPCSEC_GSS since we cannot accurately presize the
    419 		 * buffer required for encoding, we assume that its going
    420 		 * to be a Long RPC to start with. We also create the
    421 		 * the XDR stream with min_chunk set to 0 which instructs
    422 		 * the XDR layer to not chunk the incoming byte stream.
    423 		 */
    424 
    425 		msglen += 2 * MAX_AUTH_BYTES + 2 * sizeof (struct opaque_auth);
    426 		msglen += xdr_sizeof(xdr_args, argsp);
    427 
    428 		/*
    429 		 * Long RPC. Allocate one time use custom buffer.
    430 		 */
    431 		longmsg.type = CHUNK_BUFFER;
    432 		longmsg.addr = kmem_zalloc(msglen, KM_SLEEP);
    433 		cle = kmem_zalloc(sizeof (*cle), KM_SLEEP);
    434 		cle->c_xdroff = 0;
    435 		cle->c_len  = longmsg.len = msglen;
    436 		cle->c_saddr = (uint64)(uintptr_t)longmsg.addr;
    437 		cle->c_next = NULL;
    438 		xdrrdma_create(xdrs, longmsg.addr, msglen, 0, cle,
    439 		    XDR_ENCODE, NULL);
    440 		cxdrp = xdrs;
    441 		op = RDMA_NOMSG;
    442 	}
    443 
    444 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
    445 		/*
    446 		 * Copy in the preserialized RPC header
    447 		 * information.
    448 		 */
    449 		bcopy(p->cku_rpchdr, rpcmsg.addr, CKU_HDRSIZE);
    450 
    451 		/*
    452 		 * transaction id is the 1st thing in the output
    453 		 * buffer.
    454 		 */
    455 		/* LINTED pointer alignment */
    456 		(*(uint32_t *)(rpcmsg.addr)) = p->cku_xid;
    457 
    458 		/* Skip the preserialized stuff. */
    459 		XDR_SETPOS(xdrs, CKU_HDRSIZE);
    460 
    461 		/* Serialize dynamic stuff into the output buffer. */
    462 		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
    463 		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
    464 		    (!(*xdr_args)(xdrs, argsp))) {
    465 			rdma_buf_free(conn, &rpcmsg);
    466 			if (cle)
    467 				clist_free(cle);
    468 			p->cku_err.re_status = RPC_CANTENCODEARGS;
    469 			p->cku_err.re_errno = EIO;
    470 			cmn_err(CE_WARN,
    471 	"clnt_rdma_kcallit: XDR_PUTINT32/AUTH_MARSHAL/xdr_args failed");
    472 			goto done;
    473 		}
    474 		p->cku_outsz = XDR_GETPOS(xdrs);
    475 	} else {
    476 		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
    477 		IXDR_PUT_U_INT32(uproc, procnum);
    478 		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
    479 		XDR_SETPOS(xdrs, 0);
    480 
    481 		/* Serialize the procedure number and the arguments. */
    482 		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
    483 		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
    484 			if (longmsg.addr != xdrs->x_base) {
    485 				longmsg.addr = xdrs->x_base;
    486 				longmsg.len = xdr_getbufsize(xdrs);
    487 			}
    488 			rdma_buf_free(conn, &longmsg);
    489 			clist_free(cle);
    490 			p->cku_err.re_status = RPC_CANTENCODEARGS;
    491 			p->cku_err.re_errno = EIO;
    492 			cmn_err(CE_WARN,
    493 		"clnt_rdma_kcallit: AUTH_WRAP failed");
    494 			goto done;
    495 		}
    496 		/*
    497 		 * If we had to allocate a new buffer while encoding
    498 		 * then update the addr and len.
    499 		 */
    500 		if (longmsg.addr != xdrs->x_base) {
    501 			longmsg.addr = xdrs->x_base;
    502 			longmsg.len = xdr_getbufsize(xdrs);
    503 		}
    504 
    505 		/*
    506 		 * If it so happens that the encoded message is after all
    507 		 * not long enough to be a Long RPC then allocate a
    508 		 * SEND_BUFFER and copy the encoded message into it.
    509 		 */
    510 		p->cku_outsz = XDR_GETPOS(xdrs);
    511 		if (p->cku_outsz > RPC_MSG_SZ) {
    512 			rpcmsg.type = CHUNK_BUFFER;
    513 			rpcmsg.addr = longmsg.addr;
    514 			rpcmsg.len = longmsg.len;
    515 		} else {
    516 			clist_free(cle);
    517 			XDR_DESTROY(cxdrp);
    518 			cxdrp = NULL;
    519 			/*
    520 			 * Get a pre-allocated buffer for rpc call
    521 			 */
    522 			rpcmsg.type = SEND_BUFFER;
    523 			if (RDMA_BUF_ALLOC(conn, &rpcmsg)) {
    524 				p->cku_err.re_status = RPC_CANTSEND;
    525 				p->cku_err.re_errno = EIO;
    526 				RCSTAT_INCR(rcnomem);
    527 				cmn_err(CE_WARN,
    528 				    "clnt_rdma_kcallit: no buffers!");
    529 				rdma_buf_free(conn, &longmsg);
    530 				goto done;
    531 			}
    532 			bcopy(longmsg.addr, rpcmsg.addr, p->cku_outsz);
    533 			xdrrdma_create(xdrs, rpcmsg.addr, p->cku_outsz, 0,
    534 			    NULL, XDR_ENCODE, NULL);
    535 			cxdrp = xdrs;
    536 			rdma_buf_free(conn, &longmsg);
    537 			op = RDMA_MSG;
    538 		}
    539 	}
    540 
    541 	cl = xdrrdma_clist(xdrs);
    542 
    543 	/*
    544 	 * Update the chunk size information for the Long RPC msg.
    545 	 */
    546 	if (cl && op == RDMA_NOMSG)
    547 		cl->c_len = p->cku_outsz;
    548 
    549 	/*
    550 	 * Set up the RDMA chunk message
    551 	 */
    552 	vers = RPCRDMA_VERS;
    553 	clmsg.type = SEND_BUFFER;
    554 	if (RDMA_BUF_ALLOC(conn, &clmsg)) {
    555 		p->cku_err.re_status = RPC_CANTSEND;
    556 		p->cku_err.re_errno = EIO;
    557 		rdma_buf_free(conn, &rpcmsg);
    558 		RCSTAT_INCR(rcnomem);
    559 		cmn_err(CE_WARN, "clnt_rdma_kcallit: no free buffers!!");
    560 		goto done;
    561 	}
    562 	xdrs = &p->cku_outxdr;
    563 	xdrmem_create(xdrs, clmsg.addr, clmsg.len, XDR_ENCODE);
    564 	/*
    565 	 * Treat xid as opaque (xid is the first entity
    566 	 * in the rpc rdma message).
    567 	 */
    568 	(*(uint32_t *)clmsg.addr) = p->cku_xid;
    569 	/* Skip xid and set the xdr position accordingly. */
    570 	XDR_SETPOS(xdrs, sizeof (uint32_t));
    571 	(void) xdr_u_int(xdrs, &vers);
    572 	(void) xdr_u_int(xdrs, &op);
    573 
    574 	/*
    575 	 * Now XDR the chunk list
    576 	 */
    577 	if (cl != NULL) {
    578 
    579 		/*
    580 		 * Register the chunks in the list
    581 		 */
    582 		status = clist_register(conn, cl, 1);
    583 		if (status != RDMA_SUCCESS) {
    584 			cmn_err(CE_WARN,
    585 		"clnt_rdma_kcallit: clist register failed");
    586 			rdma_buf_free(conn, &clmsg);
    587 			rdma_buf_free(conn, &rpcmsg);
    588 			clist_free(cl);
    589 			p->cku_err.re_status = RPC_CANTSEND;
    590 			p->cku_err.re_errno = EIO;
    591 			goto done;
    592 		}
    593 
    594 	}
    595 	(void) xdr_do_clist(xdrs, &cl);
    596 
    597 	/*
    598 	 * Start with the RDMA header and clist (if any)
    599 	 */
    600 	sendlist = NULL;
    601 	clist_add(&sendlist, 0, XDR_GETPOS(xdrs), &clmsg.handle,
    602 		clmsg.addr, NULL, NULL);
    603 
    604 	/*
    605 	 * Put the RPC call message in the send list if small RPC
    606 	 */
    607 	if (op == RDMA_MSG) {
    608 		clist_add(&sendlist, 0, p->cku_outsz, &rpcmsg.handle,
    609 			rpcmsg.addr, NULL, NULL);
    610 	} else {
    611 		/* Long RPC already in chunk list */
    612 		RCSTAT_INCR(rclongrpcs);
    613 	}
    614 
    615 	/*
    616 	 * Set up a reply buffer ready for the reply
    617 	 */
    618 	status = rdma_clnt_postrecv(conn, p->cku_xid);
    619 	if (status != RDMA_SUCCESS) {
    620 		rdma_buf_free(conn, &clmsg);
    621 		rdma_buf_free(conn, &rpcmsg);
    622 		if (cl) {
    623 			(void) clist_deregister(conn, cl, 1);
    624 			clist_free(cl);
    625 		}
    626 		clist_free(sendlist);
    627 		p->cku_err.re_status = RPC_CANTSEND;
    628 		p->cku_err.re_errno = EIO;
    629 		goto done;
    630 	}
    631 	/*
    632 	 * sync the memory for dma
    633 	 */
    634 	if (cl != NULL) {
    635 		status = clist_syncmem(conn, cl, 1);
    636 		if (status != RDMA_SUCCESS) {
    637 			rdma_buf_free(conn, &clmsg);
    638 			rdma_buf_free(conn, &rpcmsg);
    639 			(void) clist_deregister(conn, cl, 1);
    640 			clist_free(cl);
    641 			clist_free(sendlist);
    642 			p->cku_err.re_status = RPC_CANTSEND;
    643 			p->cku_err.re_errno = EIO;
    644 			goto done;
    645 		}
    646 	}
    647 
    648 	/*
    649 	 * Send the call message to the server
    650 	 */
    651 	status = RDMA_SEND(conn, sendlist, p->cku_xid);
    652 	if (status != RDMA_SUCCESS) {
    653 		if (cl) {
    654 			(void) clist_deregister(conn, cl, 1);
    655 			clist_free(cl);
    656 			/*
    657 			 * If this was a long RPC message, need
    658 			 * to free that buffer.
    659 			 */
    660 			if (rpcmsg.type == CHUNK_BUFFER)
    661 				rdma_buf_free(conn, &rpcmsg);
    662 		}
    663 		clist_free(sendlist);
    664 		p->cku_err.re_status = RPC_CANTSEND;
    665 		p->cku_err.re_errno = EIO;
    666 		goto done;
    667 	} else {
    668 		/*
    669 		 * RDMA plugin now owns the send msg buffers.
    670 		 * Clear them out and don't free them here.
    671 		 */
    672 		clmsg.addr = NULL;
    673 		if (rpcmsg.type == SEND_BUFFER)
    674 			rpcmsg.addr = NULL;
    675 	}
    676 	clist_free(sendlist);
    677 #ifdef DEBUG
    678 if (rdma_clnt_debug) {
    679 		printf("clnt_rdma_kcallit: send request xid %u\n", p->cku_xid);
    680 	}
    681 #endif
    682 
    683 	/*
    684 	 * Recv rpc reply
    685 	 */
    686 	status = RDMA_RECV(conn, &recvlist, p->cku_xid);
    687 
    688 	/*
    689 	 * Deregister chunks sent. Do this only after the reply
    690 	 * is received as that is a sure indication that the
    691 	 * remote end has completed RDMA of the chunks.
    692 	 */
    693 	if (cl != NULL) {
    694 		/*
    695 		 * Deregister the chunks
    696 		 */
    697 		(void) clist_deregister(conn, cl, 1);
    698 		clist_free(cl);
    699 		/*
    700 		 * If long RPC free chunk
    701 		 */
    702 		rdma_buf_free(conn, &rpcmsg);
    703 	}
    704 
    705 	/*
    706 	 * Now check recv status
    707 	 */
    708 	if (status != 0) {
    709 #ifdef DEBUG
    710 		if (rdma_clnt_debug)
    711 			cmn_err(CE_NOTE,
    712 			    "clnt_rdma_kcallit: reply failed %u status %d",
    713 			    p->cku_xid, status);
    714 #endif
    715 		if (status == RDMA_INTR) {
    716 			p->cku_err.re_status = RPC_INTR;
    717 			p->cku_err.re_errno = EINTR;
    718 			RCSTAT_INCR(rcintrs);
    719 		} else if (status == RPC_TIMEDOUT) {
    720 			p->cku_err.re_status = RPC_TIMEDOUT;
    721 			p->cku_err.re_errno = ETIMEDOUT;
    722 			RCSTAT_INCR(rctimeouts);
    723 		} else {
    724 			p->cku_err.re_status = RPC_CANTRECV;
    725 			p->cku_err.re_errno = EIO;
    726 		}
    727 		goto done;
    728 	}
    729 #ifdef DEBUG
    730 	if (rdma_clnt_debug)
    731 		printf("clnt_rdma_kcallit: got response xid %u\n", p->cku_xid);
    732 #endif
    733 	/*
    734 	 * Process the reply message.
    735 	 *
    736 	 * First the chunk list (if any)
    737 	 */
    738 	xdrs = &(p->cku_inxdr);
    739 	xdrmem_create(xdrs, (caddr_t)(uintptr_t)recvlist->c_saddr,
    740 	    recvlist->c_len, XDR_DECODE);
    741 	/*
    742 	 * Treat xid as opaque (xid is the first entity
    743 	 * in the rpc rdma message).
    744 	 */
    745 	xid = *(uint32_t *)(uintptr_t)recvlist->c_saddr;
    746 	/* Skip xid and set the xdr position accordingly. */
    747 	XDR_SETPOS(xdrs, sizeof (uint32_t));
    748 	(void) xdr_u_int(xdrs, &vers);
    749 	(void) xdr_u_int(xdrs, &op);
    750 	(void) xdr_do_clist(xdrs, &cl);
    751 	off = xdr_getpos(xdrs);
    752 
    753 	/*
    754 	 * Now the RPC reply message itself. If the reply
    755 	 * came as a chunk item, then RDMA the reply over.
    756 	 */
    757 	xdrs = &replxdr;
    758 	if (cl && op == RDMA_NOMSG) {
    759 		struct clist		*cle = cl;
    760 
    761 		rpcreply.type = CHUNK_BUFFER;
    762 		rpcreply.addr = kmem_alloc(cle->c_len, KM_SLEEP);
    763 		rpcreply.len = cle->c_len;
    764 		cle->c_daddr = (uint64)(uintptr_t)rpcreply.addr;
    765 		cl = cl->c_next;
    766 		cle->c_next = NULL;
    767 
    768 		/*
    769 		 * Register the rpc reply chunk destination
    770 		 */
    771 		status = clist_register(conn, cle, 0);
    772 		if (status) {
    773 			rdma_buf_free(conn, &rpcreply);
    774 			clist_free(cle);
    775 			p->cku_err.re_status = RPC_CANTDECODERES;
    776 			p->cku_err.re_errno = EIO;
    777 			cmn_err(CE_WARN,
    778 			    "clnt_rdma_kcallit: clist_register failed");
    779 			goto rdma_done;
    780 		}
    781 
    782 		/*
    783 		 * Now read rpc reply in
    784 		 */
    785 #ifdef DEBUG
    786 	if (rdma_clnt_debug)
    787 		printf("clnt_rdma_kcallit: read chunk, len %d, xid %u, \
    788 			reply xid %u\n", cle->c_len, p->cku_xid, xid);
    789 #endif
    790 		status = RDMA_READ(conn, cle, WAIT);
    791 		if (status) {
    792 			(void) clist_deregister(conn, cle, 0);
    793 			rdma_buf_free(conn, &rpcreply);
    794 			clist_free(cle);
    795 			p->cku_err.re_status = RPC_CANTDECODERES;
    796 			p->cku_err.re_errno = EIO;
    797 			cmn_err(CE_WARN,
    798 				"clnt_rdma_kcallit: RDMA_READ failed");
    799 			goto rdma_done;
    800 		}
    801 
    802 		/*
    803 		 * sync the memory for dma
    804 		 */
    805 		status = clist_syncmem(conn, cle, 0);
    806 		if (status != RDMA_SUCCESS) {
    807 			(void) clist_deregister(conn, cle, 0);
    808 			rdma_buf_free(conn, &rpcreply);
    809 			clist_free(cle);
    810 			p->cku_err.re_status = RPC_CANTDECODERES;
    811 			p->cku_err.re_errno = EIO;
    812 			goto rdma_done;
    813 		}
    814 
    815 		/*
    816 		 * Deregister the Long RPC chunk
    817 		 */
    818 		(void) clist_deregister(conn, cle, 0);
    819 		clist_free(cle);
    820 		xdrrdma_create(xdrs, rpcreply.addr, rpcreply.len, 0, cl,
    821 			XDR_DECODE, conn);
    822 		rxdrp = xdrs;
    823 	} else {
    824 		rpcreply.addr = NULL;
    825 		xdrrdma_create(xdrs,
    826 		    (caddr_t)(uintptr_t)(recvlist->c_saddr + off),
    827 		    recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
    828 		rxdrp = xdrs;
    829 	}
    830 
    831 	reply_msg.rm_direction = REPLY;
    832 	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
    833 	reply_msg.acpted_rply.ar_stat = SUCCESS;
    834 	reply_msg.acpted_rply.ar_verf = _null_auth;
    835 	/*
    836 	 *  xdr_results will be done in AUTH_UNWRAP.
    837 	 */
    838 	reply_msg.acpted_rply.ar_results.where = NULL;
    839 	reply_msg.acpted_rply.ar_results.proc = xdr_void;
    840 
    841 	/*
    842 	 * Decode and validate the response.
    843 	 */
    844 	if (xdr_replymsg(xdrs, &reply_msg)) {
    845 		enum clnt_stat re_status;
    846 
    847 		_seterr_reply(&reply_msg, &(p->cku_err));
    848 
    849 		re_status = p->cku_err.re_status;
    850 		if (re_status == RPC_SUCCESS) {
    851 			/*
    852 			 * Reply is good, check auth.
    853 			 */
    854 			if (!AUTH_VALIDATE(h->cl_auth,
    855 			    &reply_msg.acpted_rply.ar_verf)) {
    856 				p->cku_err.re_status = RPC_AUTHERROR;
    857 				p->cku_err.re_why = AUTH_INVALIDRESP;
    858 				RCSTAT_INCR(rcbadverfs);
    859 				cmn_err(CE_WARN,
    860 			    "clnt_rdma_kcallit: AUTH_VALIDATE failed");
    861 			} else if (!AUTH_UNWRAP(h->cl_auth, xdrs,
    862 			    xdr_results, resultsp)) {
    863 				p->cku_err.re_status = RPC_CANTDECODERES;
    864 				p->cku_err.re_errno = EIO;
    865 				cmn_err(CE_WARN,
    866 				    "clnt_rdma_kcallit: AUTH_UNWRAP failed");
    867 			}
    868 		} else {
    869 			/* set errno in case we can't recover */
    870 			if (re_status != RPC_VERSMISMATCH &&
    871 			    re_status != RPC_AUTHERROR &&
    872 			    re_status != RPC_PROGVERSMISMATCH)
    873 				p->cku_err.re_errno = EIO;
    874 
    875 			if (re_status == RPC_AUTHERROR) {
    876 				/*
    877 				 * Map recoverable and unrecoverable
    878 				 * authentication errors to appropriate
    879 				 * errno
    880 				 */
    881 				switch (p->cku_err.re_why) {
    882 				case AUTH_BADCRED:
    883 				case AUTH_BADVERF:
    884 				case AUTH_INVALIDRESP:
    885 				case AUTH_TOOWEAK:
    886 				case AUTH_FAILED:
    887 				case RPCSEC_GSS_NOCRED:
    888 				case RPCSEC_GSS_FAILED:
    889 					p->cku_err.re_errno = EACCES;
    890 					break;
    891 				case AUTH_REJECTEDCRED:
    892 				case AUTH_REJECTEDVERF:
    893 				default:
    894 					p->cku_err.re_errno = EIO;
    895 					break;
    896 				}
    897 				RPCLOG(1, "clnt_rdma_kcallit : "
    898 				    "authentication failed with "
    899 				    "RPC_AUTHERROR of type %d\n",
    900 				    p->cku_err.re_why);
    901 			}
    902 			cmn_err(CE_WARN,
    903 				    "clnt_rdma_kcallit: RPC failed");
    904 
    905 		}
    906 	} else {
    907 		p->cku_err.re_status = RPC_CANTDECODERES;
    908 		p->cku_err.re_errno = EIO;
    909 		cmn_err(CE_WARN, "clnt_rdma_kcallit: xdr_replymsg failed");
    910 	}
    911 
    912 	/*
    913 	 * If rpc reply is in a chunk, free it now.
    914 	 */
    915 	if (rpcreply.addr != NULL)
    916 		rdma_buf_free(conn, &rpcreply);
    917 
    918 rdma_done:
    919 	if ((cl != NULL) || (op == RDMA_NOMSG)) {
    920 		rdma_buf_t	donemsg;
    921 
    922 		/*
    923 		 * Free the list holding the chunk info
    924 		 */
    925 		if (cl) {
    926 			clist_free(cl);
    927 			cl = NULL;
    928 		}
    929 
    930 		/*
    931 		 * Tell the server that the reads are done
    932 		 */
    933 		donemsg.type = SEND_BUFFER;
    934 		if (RDMA_BUF_ALLOC(conn, &donemsg)) {
    935 			p->cku_err.re_status = RPC_CANTSEND;
    936 			p->cku_err.re_errno = EIO;
    937 			RCSTAT_INCR(rcnomem);
    938 			cmn_err(CE_WARN, "clnt_rdma_kcallit: no free buffer");
    939 			goto done;
    940 		}
    941 		xdrs = &p->cku_outxdr;
    942 		xdrmem_create(xdrs, donemsg.addr, donemsg.len, XDR_ENCODE);
    943 		vers = RPCRDMA_VERS;
    944 		op = RDMA_DONE;
    945 
    946 		/*
    947 		 * Treat xid as opaque (xid is the first entity
    948 		 * in the rpc rdma message).
    949 		 */
    950 		(*(uint32_t *)donemsg.addr) = p->cku_xid;
    951 		/* Skip xid and set the xdr position accordingly. */
    952 		XDR_SETPOS(xdrs, sizeof (uint32_t));
    953 		if (!xdr_u_int(xdrs, &vers) ||
    954 		    !xdr_u_int(xdrs, &op)) {
    955 			cmn_err(CE_WARN,
    956 				"clnt_rdma_kcallit: xdr_u_int failed");
    957 			rdma_buf_free(conn, &donemsg);
    958 			goto done;
    959 		}
    960 
    961 		sendlist = NULL;
    962 		clist_add(&sendlist, 0, XDR_GETPOS(xdrs), &donemsg.handle,
    963 			donemsg.addr, NULL, NULL);
    964 
    965 		status = RDMA_SEND(conn, sendlist, p->cku_xid);
    966 		if (status != RDMA_SUCCESS) {
    967 			cmn_err(CE_WARN,
    968 				"clnt_rdma_kcallit: RDMA_SEND failed xid %u",
    969 					p->cku_xid);
    970 		}
    971 #ifdef DEBUG
    972 		else {
    973 		if (rdma_clnt_debug)
    974 			printf("clnt_rdma_kcallit: sent RDMA_DONE xid %u\n",
    975 				p->cku_xid);
    976 		}
    977 #endif
    978 		clist_free(sendlist);
    979 	}
    980 
    981 done:
    982 	if (cxdrp)
    983 		XDR_DESTROY(cxdrp);
    984 	if (rxdrp) {
    985 		(void) xdr_rpc_free_verifier(rxdrp, &reply_msg);
    986 		XDR_DESTROY(rxdrp);
    987 	}
    988 
    989 	if (recvlist) {
    990 		rdma_buf_t	recvmsg;
    991 
    992 		recvmsg.addr = (caddr_t)(uintptr_t)recvlist->c_saddr;
    993 		recvmsg.type = RECV_BUFFER;
    994 		RDMA_BUF_FREE(conn, &recvmsg);
    995 		clist_free(recvlist);
    996 	}
    997 	RDMA_REL_CONN(conn);
    998 	if (p->cku_err.re_status != RPC_SUCCESS) {
    999 		RCSTAT_INCR(rcbadcalls);
   1000 	}
   1001 	return (p->cku_err.re_status);
   1002 }
   1003 
   1004 /* ARGSUSED */
   1005 static void
   1006 clnt_rdma_kabort(CLIENT *h)
   1007 {
   1008 }
   1009 
   1010 static void
   1011 clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
   1012 {
   1013 	struct cku_private *p = htop(h);
   1014 
   1015 	*err = p->cku_err;
   1016 }
   1017 
   1018 static bool_t
   1019 clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
   1020 {
   1021 	struct cku_private *p = htop(h);
   1022 	XDR *xdrs;
   1023 
   1024 	xdrs = &(p->cku_outxdr);
   1025 	xdrs->x_op = XDR_FREE;
   1026 	return ((*xdr_res)(xdrs, res_ptr));
   1027 }
   1028 
   1029 /* ARGSUSED */
   1030 static bool_t
   1031 clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
   1032 {
   1033 	return (TRUE);
   1034 }
   1035 
   1036 /* ARGSUSED */
   1037 static int
   1038 clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
   1039 	int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
   1040 	uint32_t xid)
   1041 {
   1042 	RCSTAT_INCR(rctimers);
   1043 	return (0);
   1044 }