Home | History | Annotate | Download | only in rpc
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License, Version 1.0 only
      6  * (the "License").  You may not use this file except in compliance
      7  * with the License.
      8  *
      9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
     10  * or http://www.opensolaris.org/os/licensing.
     11  * See the License for the specific language governing permissions
     12  * and limitations under the License.
     13  *
     14  * When distributing Covered Code, include this CDDL HEADER in each
     15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     16  * If applicable, add the following below this CDDL HEADER, with the
     17  * fields enclosed by brackets "[]" replaced with your own identifying
     18  * information: Portions Copyright [yyyy] [name of copyright owner]
     19  *
     20  * CDDL HEADER END
     21  */
     22 /*
     23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 
     27 #pragma ident	"@(#)xdr_rdma.c	1.4	05/06/08 SMI"
     28 
     29 /*
     30  * xdr_rdma.c, XDR implementation using RDMA to move large chunks
     31  */
     32 
     33 #include <sys/param.h>
     34 #include <sys/types.h>
     35 #include <sys/systm.h>
     36 #include <sys/kmem.h>
     37 
     38 #include <rpc/types.h>
     39 #include <rpc/xdr.h>
     40 #include <sys/cmn_err.h>
     41 #include <rpc/rpc_sztypes.h>
     42 #include <rpc/rpc_rdma.h>
     43 
     44 static struct xdr_ops *xdrrdma_ops(void);
     45 
     46 /*
     47  * A chunk list entry identifies a chunk
     48  * of opaque data to be moved separately
     49  * from the rest of the RPC message.
     50  * xp_min_chunk = 0, is a special case for ENCODING, which means
     51  * do not chunk the incoming stream of data.
     52  */
     53 
     54 struct private {
     55 	caddr_t		xp_offp;
     56 	int		xp_min_chunk;
     57 	uint_t		xp_flags;	/* Controls setting for rdma xdr */
     58 	int		xp_buf_size;		/* size of xdr buffer */
     59 	struct clist	*xp_cl;			/* head of chunk list */
     60 	struct clist	**xp_cl_next;	/* location to place/find next chunk */
     61 	CONN		*xp_conn;	/* connection for chunk data xfer */
     62 };
     63 
     64 
     65 /*
     66  * The procedure xdrrdma_create initializes a stream descriptor for a
     67  * memory buffer.
     68  */
     69 void
     70 xdrrdma_create(XDR *xdrs, caddr_t addr, uint_t size,
     71 	int min_chunk, struct clist *cl, enum xdr_op op, CONN *conn)
     72 {
     73 	struct private *xdrp;
     74 	struct clist *cle;
     75 
     76 	xdrs->x_op = op;
     77 	xdrs->x_ops = xdrrdma_ops();
     78 	xdrs->x_base = addr;
     79 	xdrs->x_handy = size;
     80 	xdrs->x_public = NULL;
     81 
     82 	xdrp = (struct private *)kmem_zalloc(sizeof (struct private), KM_SLEEP);
     83 	xdrs->x_private = (caddr_t)xdrp;
     84 	xdrp->xp_offp = addr;
     85 	xdrp->xp_min_chunk = min_chunk;
     86 	xdrp->xp_flags = 0;
     87 	xdrp->xp_buf_size = size;
     88 	xdrp->xp_cl = cl;
     89 	if (op == XDR_ENCODE && cl != NULL) {
     90 		/* Find last element in chunk list and set xp_cl_next */
     91 		for (cle = cl; cle->c_next != NULL; cle = cle->c_next);
     92 		xdrp->xp_cl_next = &(cle->c_next);
     93 	} else
     94 		xdrp->xp_cl_next = &(xdrp->xp_cl);
     95 	xdrp->xp_conn = conn;
     96 	if (xdrp->xp_min_chunk == 0)
     97 		xdrp->xp_flags |= RDMA_NOCHUNK;
     98 }
     99 
    100 /* ARGSUSED */
    101 void
    102 xdrrdma_destroy(XDR *xdrs)
    103 {
    104 	(void) kmem_free(xdrs->x_private, sizeof (struct private));
    105 }
    106 
    107 struct clist *
    108 xdrrdma_clist(XDR *xdrs) {
    109 	return (((struct private *)(xdrs->x_private))->xp_cl);
    110 }
    111 
    112 static bool_t
    113 xdrrdma_getint32(XDR *xdrs, int32_t *int32p)
    114 {
    115 	struct private *xdrp = (struct private *)(xdrs->x_private);
    116 
    117 	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
    118 		return (FALSE);
    119 
    120 	/* LINTED pointer alignment */
    121 	*int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp))));
    122 	xdrp->xp_offp += sizeof (int32_t);
    123 
    124 	return (TRUE);
    125 }
    126 
    127 static bool_t
    128 xdrrdma_putint32(XDR *xdrs, int32_t *int32p)
    129 {
    130 	struct private *xdrp = (struct private *)(xdrs->x_private);
    131 
    132 	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
    133 		return (FALSE);
    134 
    135 	/* LINTED pointer alignment */
    136 	*(int32_t *)xdrp->xp_offp = (int32_t)htonl((uint32_t)(*int32p));
    137 	xdrp->xp_offp += sizeof (int32_t);
    138 
    139 	return (TRUE);
    140 }
    141 
    142 /*
    143  * DECODE some bytes from an XDR stream
    144  */
    145 static bool_t
    146 xdrrdma_getbytes(XDR *xdrs, caddr_t addr, int len)
    147 {
    148 	struct private *xdrp = (struct private *)(xdrs->x_private);
    149 	struct clist *cle = *(xdrp->xp_cl_next);
    150 	struct clist cl;
    151 	bool_t  retval = TRUE;
    152 
    153 	/*
    154 	 * If there was a chunk at the current offset
    155 	 * first record the destination address and length
    156 	 * in the chunk list that came with the message, then
    157 	 * RDMA READ the chunk data.
    158 	 */
    159 	if (cle != NULL &&
    160 		cle->c_xdroff == (xdrp->xp_offp - xdrs->x_base)) {
    161 		cle->c_daddr = (uint64)(uintptr_t)addr;
    162 		cle->c_len  = len;
    163 		xdrp->xp_cl_next = &cle->c_next;
    164 
    165 		/*
    166 		 * RDMA READ the chunk data from the remote end.
    167 		 * First prep the destination buffer by registering
    168 		 * it, then RDMA READ the chunk data. Since we are
    169 		 * doing streaming memory, sync the destination buffer
    170 		 * to CPU and deregister the buffer.
    171 		 */
    172 		if (xdrp->xp_conn == NULL) {
    173 			return (FALSE);
    174 		}
    175 
    176 		cl = *cle;
    177 		cl.c_next = NULL;
    178 		if (clist_register(xdrp->xp_conn, &cl, 0) != RDMA_SUCCESS) {
    179 			return (FALSE);
    180 		}
    181 
    182 		/*
    183 		 * Now read the chunk in
    184 		 */
    185 		if (RDMA_READ(xdrp->xp_conn, &cl, WAIT) != RDMA_SUCCESS) {
    186 #ifdef DEBUG
    187 			cmn_err(CE_WARN,
    188 				"xdrrdma_getbytes: RDMA_READ failed\n");
    189 #endif
    190 			retval = FALSE;
    191 			goto out;
    192 		}
    193 		/*
    194 		 * sync the memory for cpu
    195 		 */
    196 		if (clist_syncmem(xdrp->xp_conn, &cl, 0) != RDMA_SUCCESS) {
    197 			retval = FALSE;
    198 			goto out;
    199 		}
    200 
    201 out:
    202 		/*
    203 		 * Deregister the chunks
    204 		 */
    205 		(void) clist_deregister(xdrp->xp_conn, &cl, 0);
    206 		return (retval);
    207 	}
    208 
    209 	if ((xdrs->x_handy -= len) < 0)
    210 		return (FALSE);
    211 
    212 	bcopy(xdrp->xp_offp, addr, len);
    213 	xdrp->xp_offp += len;
    214 
    215 	return (TRUE);
    216 }
    217 
    218 /*
    219  * ENCODE some bytes into an XDR stream
    220  * xp_min_chunk = 0, means the stream of bytes contain no chunks
    221  * to seperate out, and if the bytes do not fit in the supplied
    222  * buffer, grow the buffer and free the old buffer.
    223  */
    224 static bool_t
    225 xdrrdma_putbytes(XDR *xdrs, caddr_t addr, int len)
    226 {
    227 	struct private *xdrp = (struct private *)(xdrs->x_private);
    228 	struct clist *clzero = xdrp->xp_cl;
    229 
    230 	/*
    231 	 * If this chunk meets the minimum chunk size
    232 	 * then don't encode it.  Just record its address
    233 	 * and length in a chunk list entry so that it
    234 	 * can be moved separately via RDMA.
    235 	 */
    236 	if (!(xdrp->xp_flags & RDMA_NOCHUNK) && xdrp->xp_min_chunk != 0 &&
    237 	    len >= xdrp->xp_min_chunk) {
    238 		struct clist *cle;
    239 		int offset = xdrp->xp_offp - xdrs->x_base;
    240 
    241 		cle = (struct clist *)kmem_zalloc(sizeof (struct clist),
    242 				KM_SLEEP);
    243 		cle->c_xdroff = offset;
    244 		cle->c_len  = len;
    245 		cle->c_saddr = (uint64)(uintptr_t)addr;
    246 		cle->c_next = NULL;
    247 
    248 		*(xdrp->xp_cl_next) = cle;
    249 		xdrp->xp_cl_next = &(cle->c_next);
    250 
    251 		return (TRUE);
    252 	}
    253 
    254 	if ((xdrs->x_handy -= len) < 0) {
    255 		if (xdrp->xp_min_chunk == 0) {
    256 			int  newbuflen, encodelen;
    257 			caddr_t newbuf;
    258 
    259 			xdrs->x_handy += len;
    260 			encodelen = xdrp->xp_offp - xdrs->x_base;
    261 			newbuflen = xdrp->xp_buf_size + len;
    262 			newbuf = kmem_zalloc(newbuflen, KM_SLEEP);
    263 			bcopy(xdrs->x_base, newbuf, encodelen);
    264 			(void) kmem_free(xdrs->x_base, xdrp->xp_buf_size);
    265 			xdrs->x_base = newbuf;
    266 			xdrp->xp_offp = newbuf + encodelen;
    267 			xdrp->xp_buf_size = newbuflen;
    268 			if (xdrp->xp_min_chunk == 0 && clzero->c_xdroff == 0) {
    269 				clzero->c_len = newbuflen;
    270 				clzero->c_saddr = (uint64)(uintptr_t)newbuf;
    271 			}
    272 		} else
    273 			return (FALSE);
    274 	}
    275 
    276 	bcopy(addr, xdrp->xp_offp, len);
    277 	xdrp->xp_offp += len;
    278 
    279 	return (TRUE);
    280 }
    281 
    282 uint_t
    283 xdrrdma_getpos(XDR *xdrs)
    284 {
    285 	struct private *xdrp = (struct private *)(xdrs->x_private);
    286 
    287 	return ((uint_t)((uintptr_t)xdrp->xp_offp - (uintptr_t)xdrs->x_base));
    288 }
    289 
    290 bool_t
    291 xdrrdma_setpos(XDR *xdrs, uint_t pos)
    292 {
    293 	struct private *xdrp = (struct private *)(xdrs->x_private);
    294 
    295 	caddr_t newaddr = xdrs->x_base + pos;
    296 	caddr_t lastaddr = xdrp->xp_offp + xdrs->x_handy;
    297 	ptrdiff_t diff;
    298 
    299 	if (newaddr > lastaddr)
    300 		return (FALSE);
    301 
    302 	xdrp->xp_offp = newaddr;
    303 	diff = lastaddr - newaddr;
    304 	xdrs->x_handy = (int)diff;
    305 
    306 	return (TRUE);
    307 }
    308 
    309 /* ARGSUSED */
    310 static rpc_inline_t *
    311 xdrrdma_inline(XDR *xdrs, int len)
    312 {
    313 	rpc_inline_t *buf = NULL;
    314 	struct private *xdrp = (struct private *)(xdrs->x_private);
    315 	struct clist *cle = *(xdrp->xp_cl_next);
    316 
    317 	if (xdrs->x_op == XDR_DECODE) {
    318 		/*
    319 		 * Since chunks aren't in-line, check to see whether
    320 		 * there is a chunk in the inline range.
    321 		 */
    322 		if (cle != NULL &&
    323 			cle->c_xdroff <= (xdrp->xp_offp - xdrs->x_base + len))
    324 		return (NULL);
    325 	}
    326 
    327 	if ((xdrs->x_handy < len) || (xdrp->xp_min_chunk != 0 &&
    328 	    len >= xdrp->xp_min_chunk)) {
    329 		return (NULL);
    330 	} else {
    331 		xdrs->x_handy -= len;
    332 		/* LINTED pointer alignment */
    333 		buf = (rpc_inline_t *)xdrp->xp_offp;
    334 		xdrp->xp_offp += len;
    335 		return (buf);
    336 	}
    337 }
    338 
    339 static bool_t
    340 xdrrdma_control(XDR *xdrs, int request, void *info)
    341 {
    342 	int32_t *int32p;
    343 	int len;
    344 	uint_t in_flags;
    345 	struct private *xdrp = (struct private *)(xdrs->x_private);
    346 
    347 	switch (request) {
    348 	case XDR_PEEK:
    349 		/*
    350 		 * Return the next 4 byte unit in the XDR stream.
    351 		 */
    352 		if (xdrs->x_handy < sizeof (int32_t))
    353 			return (FALSE);
    354 
    355 		int32p = (int32_t *)info;
    356 		*int32p = (int32_t)ntohl((uint32_t)
    357 		    (*((int32_t *)(xdrp->xp_offp))));
    358 
    359 		return (TRUE);
    360 
    361 	case XDR_SKIPBYTES:
    362 		/*
    363 		 * Skip the next N bytes in the XDR stream.
    364 		 */
    365 		int32p = (int32_t *)info;
    366 		len = RNDUP((int)(*int32p));
    367 		if ((xdrs->x_handy -= len) < 0)
    368 			return (FALSE);
    369 		xdrp->xp_offp += len;
    370 
    371 		return (TRUE);
    372 
    373 	case XDR_RDMASET:
    374 		/*
    375 		 * Set the flags provided in the *info in xp_flags for rdma xdr
    376 		 * stream control.
    377 		 */
    378 		int32p = (int32_t *)info;
    379 		in_flags = (uint_t)(*int32p);
    380 
    381 		xdrp->xp_flags |= in_flags;
    382 		return (TRUE);
    383 
    384 	case XDR_RDMAGET:
    385 		/*
    386 		 * Get the flags provided in xp_flags return through *info
    387 		 */
    388 		int32p = (int32_t *)info;
    389 
    390 		*int32p = (int32_t)xdrp->xp_flags;
    391 		return (TRUE);
    392 
    393 	default:
    394 		return (FALSE);
    395 	}
    396 }
    397 
    398 static struct xdr_ops *
    399 xdrrdma_ops(void)
    400 {
    401 	static struct xdr_ops ops;
    402 
    403 	if (ops.x_getint32 == NULL) {
    404 		ops.x_getbytes = xdrrdma_getbytes;
    405 		ops.x_putbytes = xdrrdma_putbytes;
    406 		ops.x_getpostn = xdrrdma_getpos;
    407 		ops.x_setpostn = xdrrdma_setpos;
    408 		ops.x_inline = xdrrdma_inline;
    409 		ops.x_destroy = xdrrdma_destroy;
    410 		ops.x_control = xdrrdma_control;
    411 		ops.x_getint32 = xdrrdma_getint32;
    412 		ops.x_putint32 = xdrrdma_putint32;
    413 	}
    414 	return (&ops);
    415 }
    416 
    417 /*
    418  * Not all fields in struct clist are interesting to the
    419  * RPC over RDMA protocol. Only XDR the interesting fields.
    420  */
    421 bool_t
    422 xdr_clist(XDR *xdrs, clist *objp)
    423 {
    424 
    425 	if (!xdr_uint32(xdrs, &objp->c_xdroff))
    426 		return (FALSE);
    427 	if (!xdr_uint32(xdrs, &objp->c_len))
    428 		return (FALSE);
    429 	if (!xdr_uint32(xdrs, &objp->c_smemhandle.mrc_rmr))
    430 		return (FALSE);
    431 	if (!xdr_uint64(xdrs, &objp->c_saddr))
    432 		return (FALSE);
    433 	if (!xdr_pointer(xdrs, (char **)&objp->c_next, sizeof (clist),
    434 		(xdrproc_t)xdr_clist))
    435 		return (FALSE);
    436 	return (TRUE);
    437 }
    438 
    439 bool_t
    440 xdr_do_clist(XDR *xdrs, clist **clp)
    441 {
    442 	return (xdr_pointer(xdrs, (char **)clp,
    443 		sizeof (clist), (xdrproc_t)xdr_clist));
    444 }
    445 
    446 uint_t
    447 xdr_getbufsize(XDR *xdrs)
    448 {
    449 	struct private *xdrp = (struct private *)(xdrs->x_private);
    450 
    451 	return ((uint_t)xdrp->xp_buf_size);
    452 }
    453