Home | History | Annotate | Download | only in rpc
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
     26 /* All Rights Reserved */
     27 /*
     28  * Portions of this source code were derived from Berkeley
     29  * 4.3 BSD under license from the Regents of the University of
     30  * California.
     31  */
     32 
     33 #include <sys/param.h>
     34 #include <sys/types.h>
     35 #include <sys/user.h>
     36 #include <sys/systm.h>
     37 #include <sys/sysmacros.h>
     38 #include <sys/errno.h>
     39 #include <sys/kmem.h>
     40 #include <sys/debug.h>
     41 #include <sys/systm.h>
     42 #include <sys/kstat.h>
     43 #include <sys/t_lock.h>
     44 #include <sys/ddi.h>
     45 #include <sys/cmn_err.h>
     46 #include <sys/time.h>
     47 #include <sys/isa_defs.h>
     48 #include <sys/zone.h>
     49 #include <sys/sdt.h>
     50 
     51 #include <rpc/types.h>
     52 #include <rpc/xdr.h>
     53 #include <rpc/auth.h>
     54 #include <rpc/clnt.h>
     55 #include <rpc/rpc_msg.h>
     56 #include <rpc/rpc_rdma.h>
     57 #include <nfs/nfs.h>
     58 #include <nfs/nfs4_kprot.h>
     59 
     60 static uint32_t rdma_bufs_rqst = RDMA_BUFS_RQST;
     61 
     62 static int clnt_compose_rpcmsg(CLIENT *, rpcproc_t, rdma_buf_t *,
     63 			    XDR *, xdrproc_t, caddr_t);
     64 static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
     65 		    XDR **, uint_t *);
     66 static int clnt_setup_rlist(CONN *, XDR *, XDR *);
     67 static int clnt_setup_wlist(CONN *, XDR *, XDR *);
     68 static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
     69 static void clnt_check_credit(CONN *);
     70 static void clnt_return_credit(CONN *);
     71 static void clnt_decode_long_reply(CONN *, struct clist *,
     72 		struct clist *, XDR *, XDR **, struct clist *,
     73 		struct clist *, uint_t, uint_t);
     74 
     75 static void clnt_update_credit(CONN *, uint32_t);
     76 static void check_dereg_wlist(CONN *, struct clist *);
     77 
     78 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
     79     caddr_t, xdrproc_t, caddr_t, struct timeval);
     80 static void	clnt_rdma_kabort(CLIENT *);
     81 static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
     82 static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
     83 static void	clnt_rdma_kdestroy(CLIENT *);
     84 static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
     85 static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
     86     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
     87 
     88 /*
     89  * Operations vector for RDMA based RPC
     90  */
     91 static struct clnt_ops rdma_clnt_ops = {
     92 	clnt_rdma_kcallit,	/* do rpc call */
     93 	clnt_rdma_kabort,	/* abort call */
     94 	clnt_rdma_kerror,	/* return error status */
     95 	clnt_rdma_kfreeres,	/* free results */
     96 	clnt_rdma_kdestroy,	/* destroy rpc handle */
     97 	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
     98 	clnt_rdma_ksettimers,	/* set retry timers */
     99 };
    100 
    101 /*
    102  * The size of the preserialized RPC header information.
    103  */
    104 #define	CKU_HDRSIZE	20
    105 #define	CLNT_RDMA_SUCCESS 0
    106 #define	CLNT_RDMA_FAIL (-1)
    107 
    108 #define	AUTH_REFRESH_COUNT 2
    109 
    110 #define	IS_RPCSEC_GSS(authh)			\
    111 	(authh->cl_auth->ah_cred.oa_flavor == RPCSEC_GSS)
    112 
    113 /*
    114  * Per RPC RDMA endpoint details
    115  */
    116 typedef struct cku_private {
    117 	CLIENT			cku_client;	/* client handle */
    118 	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
    119 	void			*cku_rd_handle;	/* underlying RDMA device */
    120 	struct netbuf		cku_addr;	/* remote netbuf address */
    121 	int			cku_addrfmly;	/* for finding addr_type */
    122 	struct rpc_err		cku_err;	/* error status */
    123 	struct cred		*cku_cred;	/* credentials */
    124 	XDR			cku_outxdr;	/* xdr stream for output */
    125 	uint32_t		cku_outsz;
    126 	XDR			cku_inxdr;	/* xdr stream for input */
    127 	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
    128 	uint32_t		cku_xid;	/* current XID */
    129 } cku_private_t;
    130 
    131 #define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
    132 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
    133 
    134 struct {
    135 	kstat_named_t	rccalls;
    136 	kstat_named_t	rcbadcalls;
    137 	kstat_named_t	rcbadxids;
    138 	kstat_named_t	rctimeouts;
    139 	kstat_named_t	rcnewcreds;
    140 	kstat_named_t	rcbadverfs;
    141 	kstat_named_t	rctimers;
    142 	kstat_named_t	rccantconn;
    143 	kstat_named_t	rcnomem;
    144 	kstat_named_t	rcintrs;
    145 	kstat_named_t	rclongrpcs;
    146 } rdmarcstat = {
    147 	{ "calls",	KSTAT_DATA_UINT64 },
    148 	{ "badcalls",	KSTAT_DATA_UINT64 },
    149 	{ "badxids",	KSTAT_DATA_UINT64 },
    150 	{ "timeouts",	KSTAT_DATA_UINT64 },
    151 	{ "newcreds",	KSTAT_DATA_UINT64 },
    152 	{ "badverfs",	KSTAT_DATA_UINT64 },
    153 	{ "timers",	KSTAT_DATA_UINT64 },
    154 	{ "cantconn",	KSTAT_DATA_UINT64 },
    155 	{ "nomem",	KSTAT_DATA_UINT64 },
    156 	{ "interrupts", KSTAT_DATA_UINT64 },
    157 	{ "longrpc", 	KSTAT_DATA_UINT64 }
    158 };
    159 
    160 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
    161 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
    162 
    163 #ifdef DEBUG
    164 int rdma_clnt_debug = 0;
    165 #endif
    166 
    167 #ifdef accurate_stats
    168 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
    169 
    170 #define	RCSTAT_INCR(x)			\
    171 	mutex_enter(&rdmarcstat_lock);	\
    172 	rdmarcstat.x.value.ui64++;	\
    173 	mutex_exit(&rdmarcstat_lock);
    174 #else
    175 #define	RCSTAT_INCR(x)			\
    176 	rdmarcstat.x.value.ui64++;
    177 #endif
    178 
    179 #define	ptoh(p)		(&((p)->cku_client))
    180 #define	htop(h)		((cku_private_t *)((h)->cl_private))
    181 
    182 uint_t
    183 calc_length(uint_t len)
    184 {
    185 	len = RNDUP(len);
    186 
    187 	if (len <= 64 * 1024) {
    188 		if (len > 32 * 1024) {
    189 			len = 64 * 1024;
    190 		} else {
    191 			if (len > 16 * 1024) {
    192 				len = 32 * 1024;
    193 			} else {
    194 				if (len > 8 * 1024) {
    195 					len = 16 * 1024;
    196 				} else {
    197 					len = 8 * 1024;
    198 				}
    199 			}
    200 		}
    201 	}
    202 	return (len);
    203 }
    204 int
    205 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
    206     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
    207 {
    208 	CLIENT *h;
    209 	struct cku_private *p;
    210 	struct rpc_msg call_msg;
    211 	rdma_registry_t *rp;
    212 
    213 	ASSERT(INGLOBALZONE(curproc));
    214 
    215 	if (cl == NULL)
    216 		return (EINVAL);
    217 	*cl = NULL;
    218 
    219 	p = kmem_zalloc(sizeof (*p), KM_SLEEP);
    220 
    221 	/*
    222 	 * Find underlying RDMATF plugin
    223 	 */
    224 	rw_enter(&rdma_lock, RW_READER);
    225 	rp = rdma_mod_head;
    226 	while (rp != NULL) {
    227 		if (strcmp(rp->r_mod->rdma_api, proto))
    228 			rp = rp->r_next;
    229 		else {
    230 			p->cku_rd_mod = rp->r_mod;
    231 			p->cku_rd_handle = handle;
    232 			break;
    233 		}
    234 	}
    235 	rw_exit(&rdma_lock);
    236 
    237 	if (p->cku_rd_mod == NULL) {
    238 		/*
    239 		 * Should not happen.
    240 		 * No matching RDMATF plugin.
    241 		 */
    242 		kmem_free(p, sizeof (struct cku_private));
    243 		return (EINVAL);
    244 	}
    245 
    246 	h = ptoh(p);
    247 	h->cl_ops = &rdma_clnt_ops;
    248 	h->cl_private = (caddr_t)p;
    249 	h->cl_auth = authkern_create();
    250 
    251 	/* call message, just used to pre-serialize below */
    252 	call_msg.rm_xid = 0;
    253 	call_msg.rm_direction = CALL;
    254 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
    255 	call_msg.rm_call.cb_prog = pgm;
    256 	call_msg.rm_call.cb_vers = vers;
    257 
    258 	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
    259 	/* pre-serialize call message header */
    260 	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
    261 		XDR_DESTROY(&p->cku_outxdr);
    262 		auth_destroy(h->cl_auth);
    263 		kmem_free(p, sizeof (struct cku_private));
    264 		return (EINVAL);
    265 	}
    266 
    267 	/*
    268 	 * Set up the rpc information
    269 	 */
    270 	p->cku_cred = cred;
    271 	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
    272 	p->cku_addr.maxlen = raddr->maxlen;
    273 	p->cku_addr.len = raddr->len;
    274 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
    275 	p->cku_addrfmly = family;
    276 
    277 	*cl = h;
    278 	return (0);
    279 }
    280 
    281 static void
    282 clnt_rdma_kdestroy(CLIENT *h)
    283 {
    284 	struct cku_private *p = htop(h);
    285 
    286 	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
    287 	kmem_free(p, sizeof (*p));
    288 }
    289 
    290 void
    291 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
    292     struct cred *cred)
    293 {
    294 	struct cku_private *p = htop(h);
    295 	rdma_registry_t *rp;
    296 
    297 	ASSERT(INGLOBALZONE(curproc));
    298 	/*
    299 	 * Find underlying RDMATF plugin
    300 	 */
    301 	p->cku_rd_mod = NULL;
    302 	rw_enter(&rdma_lock, RW_READER);
    303 	rp = rdma_mod_head;
    304 	while (rp != NULL) {
    305 		if (strcmp(rp->r_mod->rdma_api, proto))
    306 			rp = rp->r_next;
    307 		else {
    308 			p->cku_rd_mod = rp->r_mod;
    309 			p->cku_rd_handle = handle;
    310 			break;
    311 		}
    312 
    313 	}
    314 	rw_exit(&rdma_lock);
    315 
    316 	/*
    317 	 * Set up the rpc information
    318 	 */
    319 	p->cku_cred = cred;
    320 	p->cku_xid = 0;
    321 
    322 	if (p->cku_addr.maxlen < raddr->len) {
    323 		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
    324 			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
    325 		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
    326 		p->cku_addr.maxlen = raddr->maxlen;
    327 	}
    328 
    329 	p->cku_addr.len = raddr->len;
    330 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
    331 	h->cl_ops = &rdma_clnt_ops;
    332 }
    333 
    334 static int
    335 clnt_compose_rpcmsg(CLIENT *h, rpcproc_t procnum,
    336     rdma_buf_t *rpcmsg, XDR *xdrs,
    337     xdrproc_t xdr_args, caddr_t argsp)
    338 {
    339 	cku_private_t *p = htop(h);
    340 
    341 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
    342 		/*
    343 		 * Copy in the preserialized RPC header
    344 		 * information.
    345 		 */
    346 		bcopy(p->cku_rpchdr, rpcmsg->addr, CKU_HDRSIZE);
    347 
    348 		/*
    349 		 * transaction id is the 1st thing in the output
    350 		 * buffer.
    351 		 */
    352 		/* LINTED pointer alignment */
    353 		(*(uint32_t *)(rpcmsg->addr)) = p->cku_xid;
    354 
    355 		/* Skip the preserialized stuff. */
    356 		XDR_SETPOS(xdrs, CKU_HDRSIZE);
    357 
    358 		/* Serialize dynamic stuff into the output buffer. */
    359 		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
    360 		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
    361 		    (!(*xdr_args)(xdrs, argsp))) {
    362 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__dynargs);
    363 			return (CLNT_RDMA_FAIL);
    364 		}
    365 		p->cku_outsz = XDR_GETPOS(xdrs);
    366 	} else {
    367 		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
    368 		IXDR_PUT_U_INT32(uproc, procnum);
    369 		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
    370 		XDR_SETPOS(xdrs, 0);
    371 
    372 		/* Serialize the procedure number and the arguments. */
    373 		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
    374 		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
    375 			if (rpcmsg->addr != xdrs->x_base) {
    376 				rpcmsg->addr = xdrs->x_base;
    377 				rpcmsg->len = xdr_getbufsize(xdrs);
    378 			}
    379 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__procnum);
    380 			return (CLNT_RDMA_FAIL);
    381 		}
    382 		/*
    383 		 * If we had to allocate a new buffer while encoding
    384 		 * then update the addr and len.
    385 		 */
    386 		if (rpcmsg->addr != xdrs->x_base) {
    387 			rpcmsg->addr = xdrs->x_base;
    388 			rpcmsg->len = xdr_getbufsize(xdrs);
    389 		}
    390 
    391 		p->cku_outsz = XDR_GETPOS(xdrs);
    392 		DTRACE_PROBE1(krpc__i__compose__size__sec, int, p->cku_outsz)
    393 	}
    394 
    395 	return (CLNT_RDMA_SUCCESS);
    396 }
    397 
    398 static int
    399 clnt_compose_rdma_header(CONN *conn, CLIENT *h, rdma_buf_t *clmsg,
    400     XDR **xdrs, uint_t *op)
    401 {
    402 	cku_private_t *p = htop(h);
    403 	uint_t vers;
    404 	uint32_t rdma_credit = rdma_bufs_rqst;
    405 
    406 	vers = RPCRDMA_VERS;
    407 	clmsg->type = SEND_BUFFER;
    408 
    409 	if (rdma_buf_alloc(conn, clmsg)) {
    410 		return (CLNT_RDMA_FAIL);
    411 	}
    412 
    413 	*xdrs = &p->cku_outxdr;
    414 	xdrmem_create(*xdrs, clmsg->addr, clmsg->len, XDR_ENCODE);
    415 
    416 	(*(uint32_t *)clmsg->addr) = p->cku_xid;
    417 	XDR_SETPOS(*xdrs, sizeof (uint32_t));
    418 	(void) xdr_u_int(*xdrs, &vers);
    419 	(void) xdr_u_int(*xdrs, &rdma_credit);
    420 	(void) xdr_u_int(*xdrs, op);
    421 
    422 	return (CLNT_RDMA_SUCCESS);
    423 }
    424 
    425 /*
    426  * If xp_cl is NULL value, then the RPC payload will NOT carry
    427  * an RDMA READ chunk list, in this case we insert FALSE into
    428  * the XDR stream. Otherwise we use the clist and RDMA register
    429  * the memory and encode the clist into the outbound XDR stream.
    430  */
    431 static int
    432 clnt_setup_rlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
    433 {
    434 	int status;
    435 	struct clist *rclp;
    436 	int32_t xdr_flag = XDR_RDMA_RLIST_REG;
    437 
    438 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &rclp);
    439 
    440 	if (rclp != NULL) {
    441 		status = clist_register(conn, rclp, CLIST_REG_SOURCE);
    442 		if (status != RDMA_SUCCESS) {
    443 			return (CLNT_RDMA_FAIL);
    444 		}
    445 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
    446 	}
    447 	(void) xdr_do_clist(xdrs, &rclp);
    448 
    449 	return (CLNT_RDMA_SUCCESS);
    450 }
    451 
    452 /*
    453  * If xp_wcl is NULL value, then the RPC payload will NOT carry
    454  * an RDMA WRITE chunk list, in this case we insert FALSE into
    455  * the XDR stream. Otherwise we use the clist and  RDMA register
    456  * the memory and encode the clist into the outbound XDR stream.
    457  */
    458 static int
    459 clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
    460 {
    461 	int status;
    462 	struct clist *wlist;
    463 	int32_t xdr_flag = XDR_RDMA_WLIST_REG;
    464 
    465 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);
    466 
    467 	if (wlist != NULL) {
    468 		status = clist_register(conn, wlist, CLIST_REG_DST);
    469 		if (status != RDMA_SUCCESS) {
    470 			return (CLNT_RDMA_FAIL);
    471 		}
    472 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
    473 	}
    474 
    475 	if (!xdr_encode_wlist(xdrs, wlist))
    476 		return (CLNT_RDMA_FAIL);
    477 
    478 	return (CLNT_RDMA_SUCCESS);
    479 }
    480 
    481 static int
    482 clnt_setup_long_reply(CONN *conn, struct clist **clpp, uint_t length)
    483 {
    484 	if (length == 0) {
    485 		*clpp = NULL;
    486 		return (CLNT_RDMA_SUCCESS);
    487 	}
    488 
    489 	*clpp = clist_alloc();
    490 
    491 	(*clpp)->rb_longbuf.len = calc_length(length);
    492 	(*clpp)->rb_longbuf.type = RDMA_LONG_BUFFER;
    493 
    494 	if (rdma_buf_alloc(conn, &((*clpp)->rb_longbuf))) {
    495 		clist_free(*clpp);
    496 		*clpp = NULL;
    497 		return (CLNT_RDMA_FAIL);
    498 	}
    499 
    500 	(*clpp)->u.c_daddr3 = (*clpp)->rb_longbuf.addr;
    501 	(*clpp)->c_len = (*clpp)->rb_longbuf.len;
    502 	(*clpp)->c_next = NULL;
    503 	(*clpp)->c_dmemhandle = (*clpp)->rb_longbuf.handle;
    504 
    505 	if (clist_register(conn, *clpp, CLIST_REG_DST)) {
    506 		DTRACE_PROBE(krpc__e__clntrdma__longrep_regbuf);
    507 		rdma_buf_free(conn, &((*clpp)->rb_longbuf));
    508 		clist_free(*clpp);
    509 		return (CLNT_RDMA_FAIL);
    510 	}
    511 
    512 	return (CLNT_RDMA_SUCCESS);
    513 }
    514 
    515 /* ARGSUSED */
    516 static enum clnt_stat
    517 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
    518     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp,
    519     struct timeval wait)
    520 {
    521 	cku_private_t *p = htop(h);
    522 
    523 	int 	try_call_again;
    524 	int	refresh_attempt = AUTH_REFRESH_COUNT;
    525 	int 	status;
    526 	int 	msglen;
    527 
    528 	XDR	*call_xdrp, callxdr; /* for xdrrdma encoding the RPC call */
    529 	XDR	*reply_xdrp, replyxdr; /* for xdrrdma decoding the RPC reply */
    530 	XDR 	*rdmahdr_o_xdrs, *rdmahdr_i_xdrs;
    531 
    532 	struct rpc_msg 	reply_msg;
    533 
    534 	struct clist *cl_sendlist;
    535 	struct clist *cl_recvlist;
    536 	struct clist *cl;
    537 	struct clist *cl_rpcmsg;
    538 	struct clist *cl_rdma_reply;
    539 	struct clist *cl_rpcreply_wlist;
    540 	struct clist *cl_long_reply;
    541 
    542 	uint_t vers;
    543 	uint_t op;
    544 	uint_t off;
    545 	uint32_t seg_array_len;
    546 	uint_t long_reply_len;
    547 	uint_t rpcsec_gss;
    548 	uint_t gss_i_or_p;
    549 
    550 	CONN *conn = NULL;
    551 	rdma_buf_t clmsg;
    552 	rdma_buf_t rpcmsg;
    553 	rdma_chunkinfo_lengths_t rcil;
    554 
    555 	clock_t	ticks;
    556 	bool_t wlist_exists_reply;
    557 
    558 	uint32_t rdma_credit = rdma_bufs_rqst;
    559 
    560 	RCSTAT_INCR(rccalls);
    561 
    562 call_again:
    563 
    564 	bzero(&clmsg, sizeof (clmsg));
    565 	bzero(&rpcmsg, sizeof (rpcmsg));
    566 	try_call_again = 0;
    567 	cl_sendlist = NULL;
    568 	cl_recvlist = NULL;
    569 	cl = NULL;
    570 	cl_rpcmsg = NULL;
    571 	cl_rdma_reply = NULL;
    572 	call_xdrp = NULL;
    573 	reply_xdrp = NULL;
    574 	wlist_exists_reply  = FALSE;
    575 	cl_rpcreply_wlist = NULL;
    576 	cl_long_reply = NULL;
    577 	rcil.rcil_len = 0;
    578 	rcil.rcil_len_alt = 0;
    579 	long_reply_len = 0;
    580 
    581 	/*
    582 	 * Get unique xid
    583 	 */
    584 	if (p->cku_xid == 0)
    585 		p->cku_xid = alloc_xid();
    586 
    587 	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_addr,
    588 	    p->cku_addrfmly, p->cku_rd_handle, &conn);
    589 
    590 	/*
    591 	 * If there is a problem with the connection reflect the issue
    592 	 * back to the higher level to address, we MAY delay for a short
    593 	 * period so that we are kind to the transport.
    594 	 */
    595 	if (conn == NULL) {
    596 		/*
    597 		 * Connect failed to server. Could be because of one
    598 		 * of several things. In some cases we don't want
    599 		 * the caller to retry immediately - delay before
    600 		 * returning to caller.
    601 		 */
    602 		switch (status) {
    603 		case RDMA_TIMEDOUT:
    604 			/*
    605 			 * Already timed out. No need to delay
    606 			 * some more.
    607 			 */
    608 			p->cku_err.re_status = RPC_TIMEDOUT;
    609 			p->cku_err.re_errno = ETIMEDOUT;
    610 			break;
    611 		case RDMA_INTR:
    612 			/*
    613 			 * Failed because of an signal. Very likely
    614 			 * the caller will not retry.
    615 			 */
    616 			p->cku_err.re_status = RPC_INTR;
    617 			p->cku_err.re_errno = EINTR;
    618 			break;
    619 		default:
    620 			/*
    621 			 * All other failures - server down or service
    622 			 * down or temporary resource failure. Delay before
    623 			 * returning to caller.
    624 			 */
    625 			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
    626 			p->cku_err.re_status = RPC_CANTCONNECT;
    627 			p->cku_err.re_errno = EIO;
    628 
    629 			if (h->cl_nosignal == TRUE) {
    630 				delay(ticks);
    631 			} else {
    632 				if (delay_sig(ticks) == EINTR) {
    633 					p->cku_err.re_status = RPC_INTR;
    634 					p->cku_err.re_errno = EINTR;
    635 				}
    636 			}
    637 			break;
    638 		}
    639 
    640 		return (p->cku_err.re_status);
    641 	}
    642 
    643 	clnt_check_credit(conn);
    644 
    645 	status = CLNT_RDMA_FAIL;
    646 
    647 	rpcsec_gss = gss_i_or_p = FALSE;
    648 
    649 	if (IS_RPCSEC_GSS(h)) {
    650 		rpcsec_gss = TRUE;
    651 		if (rpc_gss_get_service_type(h->cl_auth) ==
    652 		    rpc_gss_svc_integrity ||
    653 		    rpc_gss_get_service_type(h->cl_auth) ==
    654 		    rpc_gss_svc_privacy)
    655 			gss_i_or_p = TRUE;
    656 	}
    657 
    658 	/*
    659 	 * Try a regular RDMA message if RPCSEC_GSS is not being used
    660 	 * or if RPCSEC_GSS is being used for authentication only.
    661 	 */
    662 	if (rpcsec_gss == FALSE ||
    663 	    (rpcsec_gss == TRUE && gss_i_or_p == FALSE)) {
    664 		/*
    665 		 * Grab a send buffer for the request.  Try to
    666 		 * encode it to see if it fits. If not, then it
    667 		 * needs to be sent in a chunk.
    668 		 */
    669 		rpcmsg.type = SEND_BUFFER;
    670 		if (rdma_buf_alloc(conn, &rpcmsg)) {
    671 			DTRACE_PROBE(krpc__e__clntrdma__callit_nobufs);
    672 			goto done;
    673 		}
    674 
    675 		/* First try to encode into regular send buffer */
    676 		op = RDMA_MSG;
    677 
    678 		call_xdrp = &callxdr;
    679 
    680 		xdrrdma_create(call_xdrp, rpcmsg.addr, rpcmsg.len,
    681 		    rdma_minchunk, NULL, XDR_ENCODE, conn);
    682 
    683 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
    684 		    xdr_args, argsp);
    685 
    686 		if (status != CLNT_RDMA_SUCCESS) {
    687 			/* Clean up from previous encode attempt */
    688 			rdma_buf_free(conn, &rpcmsg);
    689 			XDR_DESTROY(call_xdrp);
    690 		} else {
    691 			XDR_CONTROL(call_xdrp, XDR_RDMA_GET_CHUNK_LEN, &rcil);
    692 		}
    693 	}
    694 
    695 	/* If the encode didn't work, then try a NOMSG */
    696 	if (status != CLNT_RDMA_SUCCESS) {
    697 
    698 		msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT + MAX_AUTH_BYTES +
    699 		    xdr_sizeof(xdr_args, argsp);
    700 
    701 		msglen = calc_length(msglen);
    702 
    703 		/* pick up the lengths for the reply buffer needed */
    704 		(void) xdrrdma_sizeof(xdr_args, argsp, 0,
    705 		    &rcil.rcil_len, &rcil.rcil_len_alt);
    706 
    707 		/*
    708 		 * Construct a clist to describe the CHUNK_BUFFER
    709 		 * for the rpcmsg.
    710 		 */
    711 		cl_rpcmsg = clist_alloc();
    712 		cl_rpcmsg->c_len = msglen;
    713 		cl_rpcmsg->rb_longbuf.type = RDMA_LONG_BUFFER;
    714 		cl_rpcmsg->rb_longbuf.len = msglen;
    715 		if (rdma_buf_alloc(conn, &cl_rpcmsg->rb_longbuf)) {
    716 			clist_free(cl_rpcmsg);
    717 			goto done;
    718 		}
    719 		cl_rpcmsg->w.c_saddr3 = cl_rpcmsg->rb_longbuf.addr;
    720 
    721 		op = RDMA_NOMSG;
    722 		call_xdrp = &callxdr;
    723 
    724 		xdrrdma_create(call_xdrp, cl_rpcmsg->rb_longbuf.addr,
    725 		    cl_rpcmsg->rb_longbuf.len, 0,
    726 		    cl_rpcmsg, XDR_ENCODE, conn);
    727 
    728 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
    729 		    xdr_args, argsp);
    730 
    731 		if (status != CLNT_RDMA_SUCCESS) {
    732 			p->cku_err.re_status = RPC_CANTENCODEARGS;
    733 			p->cku_err.re_errno = EIO;
    734 			DTRACE_PROBE(krpc__e__clntrdma__callit__composemsg);
    735 			goto done;
    736 		}
    737 	}
    738 
    739 	/*
    740 	 * During the XDR_ENCODE we may have "allocated" an RDMA READ or
    741 	 * RDMA WRITE clist.
    742 	 *
    743 	 * First pull the RDMA READ chunk list from the XDR private
    744 	 * area to keep it handy.
    745 	 */
    746 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &cl);
    747 
    748 	if (gss_i_or_p) {
    749 		long_reply_len = rcil.rcil_len + rcil.rcil_len_alt;
    750 		long_reply_len += MAX_AUTH_BYTES;
    751 	} else {
    752 		long_reply_len = rcil.rcil_len;
    753 	}
    754 
    755 	/*
    756 	 * Update the chunk size information for the Long RPC msg.
    757 	 */
    758 	if (cl && op == RDMA_NOMSG)
    759 		cl->c_len = p->cku_outsz;
    760 
    761 	/*
    762 	 * Prepare the RDMA header. On success xdrs will hold the result
    763 	 * of xdrmem_create() for a SEND_BUFFER.
    764 	 */
    765 	status = clnt_compose_rdma_header(conn, h, &clmsg,
    766 	    &rdmahdr_o_xdrs, &op);
    767 
    768 	if (status != CLNT_RDMA_SUCCESS) {
    769 		p->cku_err.re_status = RPC_CANTSEND;
    770 		p->cku_err.re_errno = EIO;
    771 		RCSTAT_INCR(rcnomem);
    772 		DTRACE_PROBE(krpc__e__clntrdma__callit__nobufs2);
    773 		goto done;
    774 	}
    775 
    776 	/*
    777 	 * Now insert the RDMA READ list iff present
    778 	 */
    779 	status = clnt_setup_rlist(conn, rdmahdr_o_xdrs, call_xdrp);
    780 	if (status != CLNT_RDMA_SUCCESS) {
    781 		DTRACE_PROBE(krpc__e__clntrdma__callit__clistreg);
    782 		rdma_buf_free(conn, &clmsg);
    783 		p->cku_err.re_status = RPC_CANTSEND;
    784 		p->cku_err.re_errno = EIO;
    785 		goto done;
    786 	}
    787 
    788 	/*
    789 	 * Setup RDMA WRITE chunk list for nfs read operation
    790 	 * other operations will have a NULL which will result
    791 	 * as a NULL list in the XDR stream.
    792 	 */
    793 	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp);
    794 	if (status != CLNT_RDMA_SUCCESS) {
    795 		rdma_buf_free(conn, &clmsg);
    796 		p->cku_err.re_status = RPC_CANTSEND;
    797 		p->cku_err.re_errno = EIO;
    798 		goto done;
    799 	}
    800 
    801 	/*
    802 	 * If NULL call and RPCSEC_GSS, provide a chunk such that
    803 	 * large responses can flow back to the client.
    804 	 * If RPCSEC_GSS with integrity or privacy is in use, get chunk.
    805 	 */
    806 	if ((procnum == 0 && rpcsec_gss == TRUE) ||
    807 	    (rpcsec_gss == TRUE && gss_i_or_p == TRUE))
    808 		long_reply_len += 1024;
    809 
    810 	status = clnt_setup_long_reply(conn, &cl_long_reply, long_reply_len);
    811 
    812 	if (status != CLNT_RDMA_SUCCESS) {
    813 		rdma_buf_free(conn, &clmsg);
    814 		p->cku_err.re_status = RPC_CANTSEND;
    815 		p->cku_err.re_errno = EIO;
    816 		goto done;
    817 	}
    818 
    819 	/*
    820 	 * XDR encode the RDMA_REPLY write chunk
    821 	 */
    822 	seg_array_len = (cl_long_reply ? 1 : 0);
    823 	(void) xdr_encode_reply_wchunk(rdmahdr_o_xdrs, cl_long_reply,
    824 	    seg_array_len);
    825 
    826 	/*
    827 	 * Construct a clist in "sendlist" that represents what we
    828 	 * will push over the wire.
    829 	 *
    830 	 * Start with the RDMA header and clist (if any)
    831 	 */
    832 	clist_add(&cl_sendlist, 0, XDR_GETPOS(rdmahdr_o_xdrs), &clmsg.handle,
    833 	    clmsg.addr, NULL, NULL);
    834 
    835 	/*
    836 	 * Put the RPC call message in  sendlist if small RPC
    837 	 */
    838 	if (op == RDMA_MSG) {
    839 		clist_add(&cl_sendlist, 0, p->cku_outsz, &rpcmsg.handle,
    840 		    rpcmsg.addr, NULL, NULL);
    841 	} else {
    842 		/* Long RPC already in chunk list */
    843 		RCSTAT_INCR(rclongrpcs);
    844 	}
    845 
    846 	/*
    847 	 * Set up a reply buffer ready for the reply
    848 	 */
    849 	status = rdma_clnt_postrecv(conn, p->cku_xid);
    850 	if (status != RDMA_SUCCESS) {
    851 		rdma_buf_free(conn, &clmsg);
    852 		p->cku_err.re_status = RPC_CANTSEND;
    853 		p->cku_err.re_errno = EIO;
    854 		goto done;
    855 	}
    856 
    857 	/*
    858 	 * sync the memory for dma
    859 	 */
    860 	if (cl != NULL) {
    861 		status = clist_syncmem(conn, cl, CLIST_REG_SOURCE);
    862 		if (status != RDMA_SUCCESS) {
    863 			(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
    864 			rdma_buf_free(conn, &clmsg);
    865 			p->cku_err.re_status = RPC_CANTSEND;
    866 			p->cku_err.re_errno = EIO;
    867 			goto done;
    868 		}
    869 	}
    870 
    871 	/*
    872 	 * Send the RDMA Header and RPC call message to the server
    873 	 */
    874 	status = RDMA_SEND(conn, cl_sendlist, p->cku_xid);
    875 	if (status != RDMA_SUCCESS) {
    876 		(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
    877 		p->cku_err.re_status = RPC_CANTSEND;
    878 		p->cku_err.re_errno = EIO;
    879 		goto done;
    880 	}
    881 
    882 	/*
    883 	 * RDMA plugin now owns the send msg buffers.
    884 	 * Clear them out and don't free them.
    885 	 */
    886 	clmsg.addr = NULL;
    887 	if (rpcmsg.type == SEND_BUFFER)
    888 		rpcmsg.addr = NULL;
    889 
    890 	/*
    891 	 * Recv rpc reply
    892 	 */
    893 	status = RDMA_RECV(conn, &cl_recvlist, p->cku_xid);
    894 
    895 	/*
    896 	 * Now check recv status
    897 	 */
    898 	if (status != 0) {
    899 		if (status == RDMA_INTR) {
    900 			p->cku_err.re_status = RPC_INTR;
    901 			p->cku_err.re_errno = EINTR;
    902 			RCSTAT_INCR(rcintrs);
    903 		} else if (status == RPC_TIMEDOUT) {
    904 			p->cku_err.re_status = RPC_TIMEDOUT;
    905 			p->cku_err.re_errno = ETIMEDOUT;
    906 			RCSTAT_INCR(rctimeouts);
    907 		} else {
    908 			p->cku_err.re_status = RPC_CANTRECV;
    909 			p->cku_err.re_errno = EIO;
    910 		}
    911 		goto done;
    912 	}
    913 
    914 	/*
    915 	 * Process the reply message.
    916 	 *
    917 	 * First the chunk list (if any)
    918 	 */
    919 	rdmahdr_i_xdrs = &(p->cku_inxdr);
    920 	xdrmem_create(rdmahdr_i_xdrs,
    921 	    (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3,
    922 	    cl_recvlist->c_len, XDR_DECODE);
    923 
    924 	/*
    925 	 * Treat xid as opaque (xid is the first entity
    926 	 * in the rpc rdma message).
    927 	 * Skip xid and set the xdr position accordingly.
    928 	 */
    929 	XDR_SETPOS(rdmahdr_i_xdrs, sizeof (uint32_t));
    930 	(void) xdr_u_int(rdmahdr_i_xdrs, &vers);
    931 	(void) xdr_u_int(rdmahdr_i_xdrs, &rdma_credit);
    932 	(void) xdr_u_int(rdmahdr_i_xdrs, &op);
    933 	(void) xdr_do_clist(rdmahdr_i_xdrs, &cl);
    934 
    935 	clnt_update_credit(conn, rdma_credit);
    936 
    937 	wlist_exists_reply = FALSE;
    938 	if (! xdr_decode_wlist(rdmahdr_i_xdrs, &cl_rpcreply_wlist,
    939 	    &wlist_exists_reply)) {
    940 		DTRACE_PROBE(krpc__e__clntrdma__callit__wlist_decode);
    941 		p->cku_err.re_status = RPC_CANTDECODERES;
    942 		p->cku_err.re_errno = EIO;
    943 		goto done;
    944 	}
    945 
    946 	/*
    947 	 * The server shouldn't have sent a RDMA_SEND that
    948 	 * the client needs to RDMA_WRITE a reply back to
    949 	 * the server.  So silently ignoring what the
    950 	 * server returns in the rdma_reply section of the
    951 	 * header.
    952 	 */
    953 	(void) xdr_decode_reply_wchunk(rdmahdr_i_xdrs, &cl_rdma_reply);
    954 	off = xdr_getpos(rdmahdr_i_xdrs);
    955 
    956 	clnt_decode_long_reply(conn, cl_long_reply,
    957 	    cl_rdma_reply, &replyxdr, &reply_xdrp,
    958 	    cl, cl_recvlist, op, off);
    959 
    960 	if (reply_xdrp == NULL)
    961 		goto done;
    962 
    963 	if (wlist_exists_reply) {
    964 		XDR_CONTROL(reply_xdrp, XDR_RDMA_SET_WLIST, cl_rpcreply_wlist);
    965 	}
    966 
    967 	reply_msg.rm_direction = REPLY;
    968 	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
    969 	reply_msg.acpted_rply.ar_stat = SUCCESS;
    970 	reply_msg.acpted_rply.ar_verf = _null_auth;
    971 
    972 	/*
    973 	 *  xdr_results will be done in AUTH_UNWRAP.
    974 	 */
    975 	reply_msg.acpted_rply.ar_results.where = NULL;
    976 	reply_msg.acpted_rply.ar_results.proc = xdr_void;
    977 
    978 	/*
    979 	 * Decode and validate the response.
    980 	 */
    981 	if (xdr_replymsg(reply_xdrp, &reply_msg)) {
    982 		enum clnt_stat re_status;
    983 
    984 		_seterr_reply(&reply_msg, &(p->cku_err));
    985 
    986 		re_status = p->cku_err.re_status;
    987 		if (re_status == RPC_SUCCESS) {
    988 			/*
    989 			 * Reply is good, check auth.
    990 			 */
    991 			if (!AUTH_VALIDATE(h->cl_auth,
    992 			    &reply_msg.acpted_rply.ar_verf)) {
    993 				p->cku_err.re_status = RPC_AUTHERROR;
    994 				p->cku_err.re_why = AUTH_INVALIDRESP;
    995 				RCSTAT_INCR(rcbadverfs);
    996 				DTRACE_PROBE(
    997 				    krpc__e__clntrdma__callit__authvalidate);
    998 			} else if (!AUTH_UNWRAP(h->cl_auth, reply_xdrp,
    999 			    xdr_results, resultsp)) {
   1000 				p->cku_err.re_status = RPC_CANTDECODERES;
   1001 				p->cku_err.re_errno = EIO;
   1002 				DTRACE_PROBE(
   1003 				    krpc__e__clntrdma__callit__authunwrap);
   1004 			}
   1005 		} else {
   1006 			/* set errno in case we can't recover */
   1007 			if (re_status != RPC_VERSMISMATCH &&
   1008 			    re_status != RPC_AUTHERROR &&
   1009 			    re_status != RPC_PROGVERSMISMATCH)
   1010 				p->cku_err.re_errno = EIO;
   1011 
   1012 			if (re_status == RPC_AUTHERROR) {
   1013 				if ((refresh_attempt > 0) &&
   1014 				    AUTH_REFRESH(h->cl_auth, &reply_msg,
   1015 				    p->cku_cred)) {
   1016 					refresh_attempt--;
   1017 					try_call_again = 1;
   1018 					goto done;
   1019 				}
   1020 
   1021 				try_call_again = 0;
   1022 
   1023 				/*
   1024 				 * We have used the client handle to
   1025 				 * do an AUTH_REFRESH and the RPC status may
   1026 				 * be set to RPC_SUCCESS; Let's make sure to
   1027 				 * set it to RPC_AUTHERROR.
   1028 				 */
   1029 				p->cku_err.re_status = RPC_AUTHERROR;
   1030 
   1031 				/*
   1032 				 * Map recoverable and unrecoverable
   1033 				 * authentication errors to appropriate
   1034 				 * errno
   1035 				 */
   1036 				switch (p->cku_err.re_why) {
   1037 				case AUTH_BADCRED:
   1038 				case AUTH_BADVERF:
   1039 				case AUTH_INVALIDRESP:
   1040 				case AUTH_TOOWEAK:
   1041 				case AUTH_FAILED: