NFSoRDMA Client Updates for Linux 5.5

New Features:
 - New tracepoints for congestion control and Local Invalidate WRs
 
 Bugfixes and Cleanups:
 - Eliminate log noise in call_reserveresult
 - Fix unstable connections after a reconnect
 - Clean up some code duplication
 - Close race between waking a sender and posting a receive
 - Fix MR list corruption, and clean up MR usage
 - Remove unused rpcrdma_sendctx fields
 - Try to avoid DMA mapping pages if it is too costly
 - Wake pending tasks if connection fails
 - Replace some dprintk()s with tracepoints
 -----BEGIN PGP SIGNATURE-----
 
 iQIzBAABCAAdFiEEnZ5MQTpR7cLU7KEp18tUv7ClQOsFAl3MGSwACgkQ18tUv7Cl
 QOt3LhAAz4T4DSGDb6QxUGxDRlusvHBpPFXA+GQOMRBVKiWHtrBcZT0UUybAm3Zp
 i2B1gVZJSo2JeA5vg3rK0yJmGiNPs4QedTUHiRsISrKOvUo+ITAEdNXgIIB3tAM9
 Pkwf0AMSqbzpUjKNHVGDGyhZ2WZM66zsDFI2CFh4Ul7VX/1NhM3xaUgTSEDJhl3z
 tE+aULrwGTQvUq4JKXQ3vu4f8rsbxxNKfvaZyQIKPo79nEFdtniVn18u5p010HDP
 ldJJAtY9qozhqWKwaSNEj6guW4U9wesLPrb7cBysHWjgivU17bwEbN/ZR3YrxoHI
 trpBdr5994FmOCz9mcKxH+BlS0bO7QSPS2r2TpgIMjKCm8scuZlhlnMQxHV8mEpz
 EpoC65qgcmqyeeOcIHnA/eN13ZAYgGKsRBIPEWRE/w+3Yz4bupsKZ/blSRzXdJpQ
 forMrAGTYa64NqdnRRDxf6PMwk8fqIDeTHTybMSLghUAQi89zYK0tZpgFikwBYRJ
 dqNGp4usCLtZ0c2nDnDg00arOZwqPQnxycNexHYNpHOACurCF9FhbaaZjsecZCoy
 QsSFN98K6KI5ztPY1p7DL5N36IC3VDwbgi0COKtF+xB3P0pIZ/Pzwo0KbfXQF0KH
 dmTrnpNY/Yq71i1ow6LTdYZ3hq7ZGztaXEEkl7udNvK97pP0UR0=
 =Jlak
 -----END PGP SIGNATURE-----

Merge tag 'nfs-rdma-for-5.5-1' of git://git.linux-nfs.org/projects/anna/linux-nfs

NFSoRDMA Client Updates for Linux 5.5

New Features:
- New tracepoints for congestion control and Local Invalidate WRs

Bugfixes and Cleanups:
- Eliminate log noise in call_reserveresult
- Fix unstable connections after a reconnect
- Clean up some code duplication
- Close race between waking a sender and posting a receive
- Fix MR list corruption, and clean up MR usage
- Remove unused rpcrdma_sendctx fields
- Try to avoid DMA mapping pages if it is too costly
- Wake pending tasks if connection fails
- Replace some dprintk()s with tracepoints
This commit is contained in:
Trond Myklebust 2019-11-18 10:55:55 +01:00
commit 4e121fcae8
10 changed files with 661 additions and 409 deletions

View File

@ -85,6 +85,44 @@ DECLARE_EVENT_CLASS(xprtrdma_rxprt,
), \
TP_ARGS(r_xprt))
DECLARE_EVENT_CLASS(xprtrdma_connect_class,
TP_PROTO(
const struct rpcrdma_xprt *r_xprt,
int rc
),
TP_ARGS(r_xprt, rc),
TP_STRUCT__entry(
__field(const void *, r_xprt)
__field(int, rc)
__field(int, connect_status)
__string(addr, rpcrdma_addrstr(r_xprt))
__string(port, rpcrdma_portstr(r_xprt))
),
TP_fast_assign(
__entry->r_xprt = r_xprt;
__entry->rc = rc;
__entry->connect_status = r_xprt->rx_ep.rep_connected;
__assign_str(addr, rpcrdma_addrstr(r_xprt));
__assign_str(port, rpcrdma_portstr(r_xprt));
),
TP_printk("peer=[%s]:%s r_xprt=%p: rc=%d connect status=%d",
__get_str(addr), __get_str(port), __entry->r_xprt,
__entry->rc, __entry->connect_status
)
);
#define DEFINE_CONN_EVENT(name) \
DEFINE_EVENT(xprtrdma_connect_class, xprtrdma_##name, \
TP_PROTO( \
const struct rpcrdma_xprt *r_xprt, \
int rc \
), \
TP_ARGS(r_xprt, rc))
DECLARE_EVENT_CLASS(xprtrdma_rdch_event,
TP_PROTO(
const struct rpc_task *task,
@ -333,47 +371,81 @@ TRACE_EVENT(xprtrdma_cm_event,
)
);
TRACE_EVENT(xprtrdma_disconnect,
TRACE_EVENT(xprtrdma_inline_thresh,
TP_PROTO(
const struct rpcrdma_xprt *r_xprt,
int status
const struct rpcrdma_xprt *r_xprt
),
TP_ARGS(r_xprt, status),
TP_ARGS(r_xprt),
TP_STRUCT__entry(
__field(const void *, r_xprt)
__field(int, status)
__field(int, connected)
__field(unsigned int, inline_send)
__field(unsigned int, inline_recv)
__field(unsigned int, max_send)
__field(unsigned int, max_recv)
__string(addr, rpcrdma_addrstr(r_xprt))
__string(port, rpcrdma_portstr(r_xprt))
),
TP_fast_assign(
const struct rpcrdma_ep *ep = &r_xprt->rx_ep;
__entry->r_xprt = r_xprt;
__entry->inline_send = ep->rep_inline_send;
__entry->inline_recv = ep->rep_inline_recv;
__entry->max_send = ep->rep_max_inline_send;
__entry->max_recv = ep->rep_max_inline_recv;
__assign_str(addr, rpcrdma_addrstr(r_xprt));
__assign_str(port, rpcrdma_portstr(r_xprt));
),
TP_printk("peer=[%s]:%s r_xprt=%p neg send/recv=%u/%u, calc send/recv=%u/%u",
__get_str(addr), __get_str(port), __entry->r_xprt,
__entry->inline_send, __entry->inline_recv,
__entry->max_send, __entry->max_recv
)
);
DEFINE_CONN_EVENT(connect);
DEFINE_CONN_EVENT(disconnect);
DEFINE_RXPRT_EVENT(xprtrdma_create);
DEFINE_RXPRT_EVENT(xprtrdma_op_destroy);
DEFINE_RXPRT_EVENT(xprtrdma_remove);
DEFINE_RXPRT_EVENT(xprtrdma_reinsert);
DEFINE_RXPRT_EVENT(xprtrdma_op_inject_dsc);
DEFINE_RXPRT_EVENT(xprtrdma_op_close);
DEFINE_RXPRT_EVENT(xprtrdma_op_setport);
TRACE_EVENT(xprtrdma_op_connect,
TP_PROTO(
const struct rpcrdma_xprt *r_xprt,
unsigned long delay
),
TP_ARGS(r_xprt, delay),
TP_STRUCT__entry(
__field(const void *, r_xprt)
__field(unsigned long, delay)
__string(addr, rpcrdma_addrstr(r_xprt))
__string(port, rpcrdma_portstr(r_xprt))
),
TP_fast_assign(
__entry->r_xprt = r_xprt;
__entry->status = status;
__entry->connected = r_xprt->rx_ep.rep_connected;
__entry->delay = delay;
__assign_str(addr, rpcrdma_addrstr(r_xprt));
__assign_str(port, rpcrdma_portstr(r_xprt));
),
TP_printk("peer=[%s]:%s r_xprt=%p: status=%d %sconnected",
__get_str(addr), __get_str(port),
__entry->r_xprt, __entry->status,
__entry->connected == 1 ? "still " : "dis"
TP_printk("peer=[%s]:%s r_xprt=%p delay=%lu",
__get_str(addr), __get_str(port), __entry->r_xprt,
__entry->delay
)
);
DEFINE_RXPRT_EVENT(xprtrdma_conn_start);
DEFINE_RXPRT_EVENT(xprtrdma_conn_tout);
DEFINE_RXPRT_EVENT(xprtrdma_create);
DEFINE_RXPRT_EVENT(xprtrdma_op_destroy);
DEFINE_RXPRT_EVENT(xprtrdma_remove);
DEFINE_RXPRT_EVENT(xprtrdma_reinsert);
DEFINE_RXPRT_EVENT(xprtrdma_reconnect);
DEFINE_RXPRT_EVENT(xprtrdma_op_inject_dsc);
DEFINE_RXPRT_EVENT(xprtrdma_op_close);
DEFINE_RXPRT_EVENT(xprtrdma_op_connect);
TRACE_EVENT(xprtrdma_op_set_cto,
TP_PROTO(
@ -532,6 +604,8 @@ DEFINE_WRCH_EVENT(write);
DEFINE_WRCH_EVENT(reply);
TRACE_DEFINE_ENUM(rpcrdma_noch);
TRACE_DEFINE_ENUM(rpcrdma_noch_pullup);
TRACE_DEFINE_ENUM(rpcrdma_noch_mapped);
TRACE_DEFINE_ENUM(rpcrdma_readch);
TRACE_DEFINE_ENUM(rpcrdma_areadch);
TRACE_DEFINE_ENUM(rpcrdma_writech);
@ -540,6 +614,8 @@ TRACE_DEFINE_ENUM(rpcrdma_replych);
#define xprtrdma_show_chunktype(x) \
__print_symbolic(x, \
{ rpcrdma_noch, "inline" }, \
{ rpcrdma_noch_pullup, "pullup" }, \
{ rpcrdma_noch_mapped, "mapped" }, \
{ rpcrdma_readch, "read list" }, \
{ rpcrdma_areadch, "*read list" }, \
{ rpcrdma_writech, "write list" }, \
@ -667,9 +743,8 @@ TRACE_EVENT(xprtrdma_post_send,
__entry->client_id = rqst->rq_task->tk_client ?
rqst->rq_task->tk_client->cl_clid : -1;
__entry->req = req;
__entry->num_sge = req->rl_sendctx->sc_wr.num_sge;
__entry->signaled = req->rl_sendctx->sc_wr.send_flags &
IB_SEND_SIGNALED;
__entry->num_sge = req->rl_wr.num_sge;
__entry->signaled = req->rl_wr.send_flags & IB_SEND_SIGNALED;
__entry->status = status;
),
@ -735,6 +810,31 @@ TRACE_EVENT(xprtrdma_post_recvs,
)
);
TRACE_EVENT(xprtrdma_post_linv,
TP_PROTO(
const struct rpcrdma_req *req,
int status
),
TP_ARGS(req, status),
TP_STRUCT__entry(
__field(const void *, req)
__field(int, status)
__field(u32, xid)
),
TP_fast_assign(
__entry->req = req;
__entry->status = status;
__entry->xid = be32_to_cpu(req->rl_slot.rq_xid);
),
TP_printk("req=%p xid=0x%08x status=%d",
__entry->req, __entry->xid, __entry->status
)
);
/**
** Completion events
**/
@ -1021,66 +1121,32 @@ DEFINE_REPLY_EVENT(xprtrdma_reply_hdr);
TRACE_EVENT(xprtrdma_fixup,
TP_PROTO(
const struct rpc_rqst *rqst,
int len,
int hdrlen
unsigned long fixup
),
TP_ARGS(rqst, len, hdrlen),
TP_ARGS(rqst, fixup),
TP_STRUCT__entry(
__field(unsigned int, task_id)
__field(unsigned int, client_id)
__field(const void *, base)
__field(int, len)
__field(int, hdrlen)
__field(unsigned long, fixup)
__field(size_t, headlen)
__field(unsigned int, pagelen)
__field(size_t, taillen)
),
TP_fast_assign(
__entry->task_id = rqst->rq_task->tk_pid;
__entry->client_id = rqst->rq_task->tk_client->cl_clid;
__entry->base = rqst->rq_rcv_buf.head[0].iov_base;
__entry->len = len;
__entry->hdrlen = hdrlen;
__entry->fixup = fixup;
__entry->headlen = rqst->rq_rcv_buf.head[0].iov_len;
__entry->pagelen = rqst->rq_rcv_buf.page_len;
__entry->taillen = rqst->rq_rcv_buf.tail[0].iov_len;
),
TP_printk("task:%u@%u base=%p len=%d hdrlen=%d",
__entry->task_id, __entry->client_id,
__entry->base, __entry->len, __entry->hdrlen
)
);
TRACE_EVENT(xprtrdma_fixup_pg,
TP_PROTO(
const struct rpc_rqst *rqst,
int pageno,
const void *pos,
int len,
int curlen
),
TP_ARGS(rqst, pageno, pos, len, curlen),
TP_STRUCT__entry(
__field(unsigned int, task_id)
__field(unsigned int, client_id)
__field(const void *, pos)
__field(int, pageno)
__field(int, len)
__field(int, curlen)
),
TP_fast_assign(
__entry->task_id = rqst->rq_task->tk_pid;
__entry->client_id = rqst->rq_task->tk_client->cl_clid;
__entry->pos = pos;
__entry->pageno = pageno;
__entry->len = len;
__entry->curlen = curlen;
),
TP_printk("task:%u@%u pageno=%d pos=%p len=%d curlen=%d",
__entry->task_id, __entry->client_id,
__entry->pageno, __entry->pos, __entry->len, __entry->curlen
TP_printk("task:%u@%u fixup=%lu xdr=%zu/%u/%zu",
__entry->task_id, __entry->client_id, __entry->fixup,
__entry->headlen, __entry->pagelen, __entry->taillen
)
);

View File

@ -777,6 +777,99 @@ TRACE_EVENT(xprt_ping,
__get_str(addr), __get_str(port), __entry->status)
);
DECLARE_EVENT_CLASS(xprt_writelock_event,
TP_PROTO(
const struct rpc_xprt *xprt, const struct rpc_task *task
),
TP_ARGS(xprt, task),
TP_STRUCT__entry(
__field(unsigned int, task_id)
__field(unsigned int, client_id)
__field(unsigned int, snd_task_id)
),
TP_fast_assign(
if (task) {
__entry->task_id = task->tk_pid;
__entry->client_id = task->tk_client ?
task->tk_client->cl_clid : -1;
} else {
__entry->task_id = -1;
__entry->client_id = -1;
}
__entry->snd_task_id = xprt->snd_task ?
xprt->snd_task->tk_pid : -1;
),
TP_printk("task:%u@%u snd_task:%u",
__entry->task_id, __entry->client_id,
__entry->snd_task_id)
);
#define DEFINE_WRITELOCK_EVENT(name) \
DEFINE_EVENT(xprt_writelock_event, xprt_##name, \
TP_PROTO( \
const struct rpc_xprt *xprt, \
const struct rpc_task *task \
), \
TP_ARGS(xprt, task))
DEFINE_WRITELOCK_EVENT(reserve_xprt);
DEFINE_WRITELOCK_EVENT(release_xprt);
DECLARE_EVENT_CLASS(xprt_cong_event,
TP_PROTO(
const struct rpc_xprt *xprt, const struct rpc_task *task
),
TP_ARGS(xprt, task),
TP_STRUCT__entry(
__field(unsigned int, task_id)
__field(unsigned int, client_id)
__field(unsigned int, snd_task_id)
__field(unsigned long, cong)
__field(unsigned long, cwnd)
__field(bool, wait)
),
TP_fast_assign(
if (task) {
__entry->task_id = task->tk_pid;
__entry->client_id = task->tk_client ?
task->tk_client->cl_clid : -1;
} else {
__entry->task_id = -1;
__entry->client_id = -1;
}
__entry->snd_task_id = xprt->snd_task ?
xprt->snd_task->tk_pid : -1;
__entry->cong = xprt->cong;
__entry->cwnd = xprt->cwnd;
__entry->wait = test_bit(XPRT_CWND_WAIT, &xprt->state);
),
TP_printk("task:%u@%u snd_task:%u cong=%lu cwnd=%lu%s",
__entry->task_id, __entry->client_id,
__entry->snd_task_id, __entry->cong, __entry->cwnd,
__entry->wait ? " (wait)" : "")
);
#define DEFINE_CONG_EVENT(name) \
DEFINE_EVENT(xprt_cong_event, xprt_##name, \
TP_PROTO( \
const struct rpc_xprt *xprt, \
const struct rpc_task *task \
), \
TP_ARGS(xprt, task))
DEFINE_CONG_EVENT(reserve_cong);
DEFINE_CONG_EVENT(release_cong);
DEFINE_CONG_EVENT(get_cong);
DEFINE_CONG_EVENT(put_cong);
TRACE_EVENT(xs_stream_read_data,
TP_PROTO(struct rpc_xprt *xprt, ssize_t err, size_t total),

View File

@ -1679,8 +1679,6 @@ call_reserveresult(struct rpc_task *task)
return;
}
printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
__func__, status);
rpc_call_rpcerror(task, -EIO);
return;
}
@ -1689,11 +1687,8 @@ call_reserveresult(struct rpc_task *task)
* Even though there was an error, we may have acquired
* a request slot somehow. Make sure not to leak it.
*/
if (task->tk_rqstp) {
printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
__func__, status);
if (task->tk_rqstp)
xprt_release(task);
}
switch (status) {
case -ENOMEM:
@ -1702,14 +1697,9 @@ call_reserveresult(struct rpc_task *task)
case -EAGAIN: /* woken up; retry */
task->tk_action = call_retry_reserve;
return;
case -EIO: /* probably a shutdown */
break;
default:
printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
__func__, status);
break;
rpc_call_rpcerror(task, status);
}
rpc_call_rpcerror(task, status);
}
/*

View File

@ -205,20 +205,20 @@ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
if (task == xprt->snd_task)
return 1;
goto out_locked;
goto out_sleep;
}
if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
goto out_unlock;
xprt->snd_task = task;
out_locked:
trace_xprt_reserve_xprt(xprt, task);
return 1;
out_unlock:
xprt_clear_locked(xprt);
out_sleep:
dprintk("RPC: %5u failed to lock transport %p\n",
task->tk_pid, xprt);
task->tk_status = -EAGAIN;
if (RPC_IS_SOFT(task))
rpc_sleep_on_timeout(&xprt->sending, task, NULL,
@ -269,23 +269,22 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
if (task == xprt->snd_task)
return 1;
goto out_locked;
goto out_sleep;
}
if (req == NULL) {
xprt->snd_task = task;
return 1;
goto out_locked;
}
if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
goto out_unlock;
if (!xprt_need_congestion_window_wait(xprt)) {
xprt->snd_task = task;
return 1;
goto out_locked;
}
out_unlock:
xprt_clear_locked(xprt);
out_sleep:
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
task->tk_status = -EAGAIN;
if (RPC_IS_SOFT(task))
rpc_sleep_on_timeout(&xprt->sending, task, NULL,
@ -293,6 +292,9 @@ out_sleep:
else
rpc_sleep_on(&xprt->sending, task, NULL);
return 0;
out_locked:
trace_xprt_reserve_cong(xprt, task);
return 1;
}
EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
@ -357,6 +359,7 @@ void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
xprt_clear_locked(xprt);
__xprt_lock_write_next(xprt);
}
trace_xprt_release_xprt(xprt, task);
}
EXPORT_SYMBOL_GPL(xprt_release_xprt);
@ -374,6 +377,7 @@ void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
xprt_clear_locked(xprt);
__xprt_lock_write_next_cong(xprt);
}
trace_xprt_release_cong(xprt, task);
}
EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
@ -395,8 +399,7 @@ __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
if (req->rq_cong)
return 1;
dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
req->rq_task->tk_pid, xprt->cong, xprt->cwnd);
trace_xprt_get_cong(xprt, req->rq_task);
if (RPCXPRT_CONGESTED(xprt)) {
xprt_set_congestion_window_wait(xprt);
return 0;
@ -418,6 +421,7 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
req->rq_cong = 0;
xprt->cong -= RPC_CWNDSCALE;
xprt_test_and_clear_congestion_window_wait(xprt);
trace_xprt_put_cong(xprt, req->rq_task);
__xprt_lock_write_next_cong(xprt);
}

View File

@ -79,7 +79,7 @@ static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
*p = xdr_zero;
if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
&rqst->rq_snd_buf, rpcrdma_noch))
&rqst->rq_snd_buf, rpcrdma_noch_pullup))
return -EIO;
trace_xprtrdma_cb_reply(rqst);

View File

@ -36,8 +36,8 @@
* connect worker from running concurrently.
*
* When the underlying transport disconnects, MRs that are in flight
* are flushed and are likely unusable. Thus all flushed MRs are
* destroyed. New MRs are created on demand.
* are flushed and are likely unusable. Thus all MRs are destroyed.
* New MRs are created on demand.
*/
#include <linux/sunrpc/rpc_rdma.h>
@ -88,8 +88,10 @@ void frwr_release_mr(struct rpcrdma_mr *mr)
kfree(mr);
}
static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
static void frwr_mr_recycle(struct rpcrdma_mr *mr)
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
trace_xprtrdma_mr_recycle(mr);
if (mr->mr_dir != DMA_NONE) {
@ -107,32 +109,6 @@ static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
frwr_release_mr(mr);
}
/* MRs are dynamically allocated, so simply clean up and release the MR.
* A replacement MR will subsequently be allocated on demand.
*/
static void
frwr_mr_recycle_worker(struct work_struct *work)
{
struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr,
mr_recycle);
frwr_mr_recycle(mr->mr_xprt, mr);
}
/* frwr_recycle - Discard MRs
* @req: request to reset
*
* Used after a reconnect. These MRs could be in flight, we can't
* tell. Safe thing to do is release them.
*/
void frwr_recycle(struct rpcrdma_req *req)
{
struct rpcrdma_mr *mr;
while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
frwr_mr_recycle(mr->mr_xprt, mr);
}
/* frwr_reset - Place MRs back on the free list
* @req: request to reset
*
@ -166,9 +142,6 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
struct ib_mr *frmr;
int rc;
/* NB: ib_alloc_mr and device drivers typically allocate
* memory with GFP_KERNEL.
*/
frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
if (IS_ERR(frmr))
goto out_mr_err;
@ -180,7 +153,6 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
mr->frwr.fr_mr = frmr;
mr->mr_dir = DMA_NONE;
INIT_LIST_HEAD(&mr->mr_list);
INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
init_completion(&mr->frwr.fr_linv_done);
sg_init_table(sg, depth);
@ -424,7 +396,7 @@ int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
struct ib_send_wr *post_wr;
struct rpcrdma_mr *mr;
post_wr = &req->rl_sendctx->sc_wr;
post_wr = &req->rl_wr;
list_for_each_entry(mr, &req->rl_registered, mr_list) {
struct rpcrdma_frwr *frwr;
@ -440,9 +412,6 @@ int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
post_wr = &frwr->fr_regwr.wr;
}
/* If ib_post_send fails, the next ->send_request for
* @req will queue these MRs for recovery.
*/
return ib_post_send(ia->ri_id->qp, post_wr, NULL);
}
@ -468,7 +437,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
{
if (wc->status != IB_WC_SUCCESS)
rpcrdma_mr_recycle(mr);
frwr_mr_recycle(mr);
else
rpcrdma_mr_put(mr);
}
@ -570,7 +539,6 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
bad_wr = NULL;
rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
trace_xprtrdma_post_send(req, rc);
/* The final LOCAL_INV WR in the chain is supposed to
* do the wake. If it was never posted, the wake will
@ -583,6 +551,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/* Recycle MRs in the LOCAL_INV chain that did not get posted.
*/
trace_xprtrdma_post_linv(req, rc);
while (bad_wr) {
frwr = container_of(bad_wr, struct rpcrdma_frwr,
fr_invwr);
@ -590,7 +559,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
bad_wr = bad_wr->next;
list_del_init(&mr->mr_list);
rpcrdma_mr_recycle(mr);
frwr_mr_recycle(mr);
}
}
@ -673,18 +642,18 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
bad_wr = NULL;
rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
trace_xprtrdma_post_send(req, rc);
if (!rc)
return;
/* Recycle MRs in the LOCAL_INV chain that did not get posted.
*/
trace_xprtrdma_post_linv(req, rc);
while (bad_wr) {
frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
mr = container_of(frwr, struct rpcrdma_mr, frwr);
bad_wr = bad_wr->next;
rpcrdma_mr_recycle(mr);
frwr_mr_recycle(mr);
}
/* The final LOCAL_INV WR in the chain is supposed to

View File

@ -78,8 +78,6 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
size += rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
dprintk("RPC: %s: max call header size = %u\n",
__func__, size);
return size;
}
@ -100,8 +98,6 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
dprintk("RPC: %s: max reply header size = %u\n",
__func__, size);
return size;
}
@ -363,8 +359,7 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
out_getmr_err:
trace_xprtrdma_nomrs(req);
xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
if (r_xprt->rx_ep.rep_connected != -ENODEV)
schedule_work(&r_xprt->rx_buf.rb_refresh_worker);
rpcrdma_mrs_refresh(r_xprt);
return ERR_PTR(-EAGAIN);
}
@ -393,7 +388,7 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
unsigned int pos;
int nsegs;
if (rtype == rpcrdma_noch)
if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped)
goto done;
pos = rqst->rq_snd_buf.head[0].iov_len;
@ -565,6 +560,7 @@ static void rpcrdma_sendctx_done(struct kref *kref)
*/
void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
{
struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf;
struct ib_sge *sge;
if (!sc->sc_unmap_count)
@ -576,7 +572,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
*/
for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
++sge, --sc->sc_unmap_count)
ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length,
DMA_TO_DEVICE);
kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
@ -589,149 +585,228 @@ static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
struct ib_sge *sge = sc->sc_sges;
struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
goto out_regbuf;
return false;
sge->addr = rdmab_addr(rb);
sge->length = len;
sge->lkey = rdmab_lkey(rb);
ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
DMA_TO_DEVICE);
sc->sc_wr.num_sge++;
return true;
}
/* The head iovec is straightforward, as it is usually already
* DMA-mapped. Sync the content that has changed.
*/
static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, unsigned int len)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
struct rpcrdma_regbuf *rb = req->rl_sendbuf;
if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
return false;
sge->addr = rdmab_addr(rb);
sge->length = len;
sge->lkey = rdmab_lkey(rb);
ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
DMA_TO_DEVICE);
return true;
}
/* If there is a page list present, DMA map and prepare an
* SGE for each page to be sent.
*/
static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req,
struct xdr_buf *xdr)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
struct rpcrdma_regbuf *rb = req->rl_sendbuf;
unsigned int page_base, len, remaining;
struct page **ppages;
struct ib_sge *sge;
ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
page_base = offset_in_page(xdr->page_base);
remaining = xdr->page_len;
while (remaining) {
sge = &sc->sc_sges[req->rl_wr.num_sge++];
len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages,
page_base, len, DMA_TO_DEVICE);
if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
goto out_mapping_err;
sge->length = len;
sge->lkey = rdmab_lkey(rb);
sc->sc_unmap_count++;
ppages++;
remaining -= len;
page_base = 0;
}
return true;
out_regbuf:
pr_err("rpcrdma: failed to DMA map a Send buffer\n");
out_mapping_err:
trace_xprtrdma_dma_maperr(sge->addr);
return false;
}
/* Prepare the Send SGEs. The head and tail iovec, and each entry
* in the page list, gets its own SGE.
/* The tail iovec may include an XDR pad for the page list,
* as well as additional content, and may not reside in the
* same page as the head iovec.
*/
static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req,
struct xdr_buf *xdr,
enum rpcrdma_chunktype rtype)
unsigned int page_base, unsigned int len)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
unsigned int sge_no, page_base, len, remaining;
struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
struct rpcrdma_regbuf *rb = req->rl_sendbuf;
struct ib_sge *sge = sc->sc_sges;
struct page *page, **ppages;
struct page *page = virt_to_page(xdr->tail[0].iov_base);
/* The head iovec is straightforward, as it is already
* DMA-mapped. Sync the content that has changed.
*/
if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
goto out_regbuf;
sc->sc_device = rdmab_device(rb);
sge_no = 1;
sge[sge_no].addr = rdmab_addr(rb);
sge[sge_no].length = xdr->head[0].iov_len;
sge[sge_no].lkey = rdmab_lkey(rb);
ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
sge[sge_no].length, DMA_TO_DEVICE);
sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len,
DMA_TO_DEVICE);
if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
goto out_mapping_err;
/* If there is a Read chunk, the page list is being handled
* via explicit RDMA, and thus is skipped here. However, the
* tail iovec may include an XDR pad for the page list, as
* well as additional content, and may not reside in the
* same page as the head iovec.
*/
if (rtype == rpcrdma_readch) {
len = xdr->tail[0].iov_len;
/* Do not include the tail if it is only an XDR pad */
if (len < 4)
goto out;
page = virt_to_page(xdr->tail[0].iov_base);
page_base = offset_in_page(xdr->tail[0].iov_base);
/* If the content in the page list is an odd length,
* xdr_write_pages() has added a pad at the beginning
* of the tail iovec. Force the tail's non-pad content
* to land at the next XDR position in the Send message.
*/
page_base += len & 3;
len -= len & 3;
goto map_tail;
}
/* If there is a page list present, temporarily DMA map
* and prepare an SGE for each page to be sent.
*/
if (xdr->page_len) {
ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
page_base = offset_in_page(xdr->page_base);
remaining = xdr->page_len;
while (remaining) {
sge_no++;
if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
goto out_mapping_overflow;
len = min_t(u32, PAGE_SIZE - page_base, remaining);
sge[sge_no].addr =
ib_dma_map_page(rdmab_device(rb), *ppages,
page_base, len, DMA_TO_DEVICE);
if (ib_dma_mapping_error(rdmab_device(rb),
sge[sge_no].addr))
goto out_mapping_err;
sge[sge_no].length = len;
sge[sge_no].lkey = rdmab_lkey(rb);
sc->sc_unmap_count++;
ppages++;
remaining -= len;
page_base = 0;
}
}
/* The tail iovec is not always constructed in the same
* page where the head iovec resides (see, for example,
* gss_wrap_req_priv). To neatly accommodate that case,
* DMA map it separately.
*/
if (xdr->tail[0].iov_len) {
page = virt_to_page(xdr->tail[0].iov_base);
page_base = offset_in_page(xdr->tail[0].iov_base);
len = xdr->tail[0].iov_len;
map_tail:
sge_no++;
sge[sge_no].addr =
ib_dma_map_page(rdmab_device(rb), page, page_base, len,
DMA_TO_DEVICE);
if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr))
goto out_mapping_err;
sge[sge_no].length = len;
sge[sge_no].lkey = rdmab_lkey(rb);
sc->sc_unmap_count++;
}
out:
sc->sc_wr.num_sge += sge_no;
if (sc->sc_unmap_count)
kref_get(&req->rl_kref);
sge->length = len;
sge->lkey = rdmab_lkey(rb);
++sc->sc_unmap_count;
return true;
out_regbuf:
pr_err("rpcrdma: failed to DMA map a Send buffer\n");
return false;
out_mapping_overflow:
rpcrdma_sendctx_unmap(sc);
pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
return false;
out_mapping_err:
rpcrdma_sendctx_unmap(sc);
trace_xprtrdma_dma_maperr(sge[sge_no].addr);
trace_xprtrdma_dma_maperr(sge->addr);
return false;
}
/* Copy the tail to the end of the head buffer.
*/
static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct xdr_buf *xdr)
{
unsigned char *dst;
dst = (unsigned char *)xdr->head[0].iov_base;
dst += xdr->head[0].iov_len + xdr->page_len;
memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len;
}
/* Copy pagelist content into the head buffer.
*/
static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct xdr_buf *xdr)
{
unsigned int len, page_base, remaining;
struct page **ppages;
unsigned char *src, *dst;
dst = (unsigned char *)xdr->head[0].iov_base;
dst += xdr->head[0].iov_len;
ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
page_base = offset_in_page(xdr->page_base);
remaining = xdr->page_len;
while (remaining) {
src = page_address(*ppages);
src += page_base;
len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
memcpy(dst, src, len);
r_xprt->rx_stats.pullup_copy_count += len;
ppages++;
dst += len;
remaining -= len;
page_base = 0;
}
}
/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it.
* When the head, pagelist, and tail are small, a pull-up copy
* is considerably less costly than DMA mapping the components
* of @xdr.
*
* Assumptions:
* - the caller has already verified that the total length
* of the RPC Call body will fit into @rl_sendbuf.
*/
static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct xdr_buf *xdr)
{
if (unlikely(xdr->tail[0].iov_len))
rpcrdma_pullup_tail_iov(r_xprt, req, xdr);
if (unlikely(xdr->page_len))
rpcrdma_pullup_pagelist(r_xprt, req, xdr);
/* The whole RPC message resides in the head iovec now */
return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len);
}
static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct xdr_buf *xdr)
{
struct kvec *tail = &xdr->tail[0];
if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
return false;
if (xdr->page_len)
if (!rpcrdma_prepare_pagelist(req, xdr))
return false;
if (tail->iov_len)
if (!rpcrdma_prepare_tail_iov(req, xdr,
offset_in_page(tail->iov_base),
tail->iov_len))
return false;
if (req->rl_sendctx->sc_unmap_count)
kref_get(&req->rl_kref);
return true;
}
static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct xdr_buf *xdr)
{
if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
return false;
/* If there is a Read chunk, the page list is being handled
* via explicit RDMA, and thus is skipped here.
*/
/* Do not include the tail if it is only an XDR pad */
if (xdr->tail[0].iov_len > 3) {
unsigned int page_base, len;
/* If the content in the page list is an odd length,
* xdr_write_pages() adds a pad at the beginning of
* the tail iovec. Force the tail's non-pad content to
* land at the next XDR position in the Send message.
*/
page_base = offset_in_page(xdr->tail[0].iov_base);
len = xdr->tail[0].iov_len;
page_base += len & 3;
len -= len & 3;
if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len))
return false;
kref_get(&req->rl_kref);
}
return true;
}
/**
* rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
* @r_xprt: controlling transport
@ -742,31 +817,53 @@ out_mapping_err:
*
* Returns 0 on success; otherwise a negative errno is returned.
*/
int
rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr,
enum rpcrdma_chunktype rtype)
{
int ret;
ret = -EAGAIN;
req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
if (!req->rl_sendctx)
goto err;
req->rl_sendctx->sc_wr.num_sge = 0;
goto out_nosc;
req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req;
kref_init(&req->rl_kref);
req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe;
req->rl_wr.sg_list = req->rl_sendctx->sc_sges;
req->rl_wr.num_sge = 0;
req->rl_wr.opcode = IB_WR_SEND;
ret = -EIO;
if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
goto err;
if (rtype != rpcrdma_areadch)
if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
goto err;
goto out_unmap;
switch (rtype) {
case rpcrdma_noch_pullup:
if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr))
goto out_unmap;
break;
case rpcrdma_noch_mapped:
if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr))
goto out_unmap;
break;
case rpcrdma_readch:
if (!rpcrdma_prepare_readch(r_xprt, req, xdr))
goto out_unmap;
break;
case rpcrdma_areadch:
break;
default:
goto out_unmap;
}
return 0;
err:
out_unmap:
rpcrdma_sendctx_unmap(req->rl_sendctx);
out_nosc:
trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
return ret;
}
@ -796,6 +893,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct xdr_stream *xdr = &req->rl_stream;
enum rpcrdma_chunktype rtype, wtype;
struct xdr_buf *buf = &rqst->rq_snd_buf;
bool ddp_allowed;
__be32 *p;
int ret;
@ -853,8 +951,9 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
*/
if (rpcrdma_args_inline(r_xprt, rqst)) {
*p++ = rdma_msg;
rtype = rpcrdma_noch;
} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = buf->len < rdmab_length(req->rl_sendbuf) ?
rpcrdma_noch_pullup : rpcrdma_noch_mapped;
} else if (ddp_allowed && buf->flags & XDRBUF_WRITE) {
*p++ = rdma_msg;
rtype = rpcrdma_readch;
} else {
@ -863,12 +962,6 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
rtype = rpcrdma_areadch;
}
/* If this is a retransmit, discard previously registered
* chunks. Very likely the connection has been replaced,
* so these registrations are invalid and unusable.
*/
frwr_recycle(req);
/* This implementation supports the following combinations
* of chunk lists in one RPC-over-RDMA Call message:
*
@ -902,7 +995,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
goto out_err;
ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
&rqst->rq_snd_buf, rtype);
buf, rtype);
if (ret)
goto out_err;
@ -916,6 +1009,40 @@ out_err:
return ret;
}
static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt,
struct rpcrdma_buffer *buf,
u32 grant)
{
buf->rb_credits = grant;
xprt->cwnd = grant << RPC_CWNDSHIFT;
}
static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant)
{
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
spin_lock(&xprt->transport_lock);
__rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant);
spin_unlock(&xprt->transport_lock);
}
/**
* rpcrdma_reset_cwnd - Reset the xprt's congestion window
* @r_xprt: controlling transport instance
*
* Prepare @r_xprt for the next connection by reinitializing
* its credit grant to one (see RFC 8166, Section 3.3.3).
*/
void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt)
{
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
spin_lock(&xprt->transport_lock);
xprt->cong = 0;
__rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1);
spin_unlock(&xprt->transport_lock);
}
/**
* rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
* @rqst: controlling RPC request
@ -955,7 +1082,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
curlen = rqst->rq_rcv_buf.head[0].iov_len;
if (curlen > copy_len)
curlen = copy_len;
trace_xprtrdma_fixup(rqst, copy_len, curlen);
srcp += curlen;
copy_len -= curlen;
@ -975,8 +1101,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
if (curlen > pagelist_len)
curlen = pagelist_len;
trace_xprtrdma_fixup_pg(rqst, i, srcp,
copy_len, curlen);
destp = kmap_atomic(ppages[i]);
memcpy(destp + page_base, srcp, curlen);
flush_dcache_page(ppages[i]);
@ -1008,6 +1132,8 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
rqst->rq_private_buf.tail[0].iov_base = srcp;
}
if (fixup_copy_count)
trace_xprtrdma_fixup(rqst, fixup_copy_count);
return fixup_copy_count;
}
@ -1356,12 +1482,9 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
credits = 1; /* don't deadlock */
else if (credits > buf->rb_max_requests)
credits = buf->rb_max_requests;
if (buf->rb_credits != credits) {
spin_lock(&xprt->transport_lock);
buf->rb_credits = credits;
xprt->cwnd = credits << RPC_CWNDSHIFT;
spin_unlock(&xprt->transport_lock);
}
if (buf->rb_credits != credits)
rpcrdma_update_cwnd(r_xprt, credits);
rpcrdma_post_recvs(r_xprt, false);
req = rpcr_to_rdmar(rqst);
if (req->rl_reply) {

View File

@ -243,16 +243,13 @@ xprt_rdma_connect_worker(struct work_struct *work)
rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
xprt_clear_connecting(xprt);
if (r_xprt->rx_ep.rep_connected > 0) {
if (!xprt_test_and_set_connected(xprt)) {
xprt->stat.connect_count++;
xprt->stat.connect_time += (long)jiffies -
xprt->stat.connect_start;
xprt_wake_pending_tasks(xprt, -EAGAIN);
}
} else {
if (xprt_test_and_clear_connected(xprt))
xprt_wake_pending_tasks(xprt, rc);
xprt->stat.connect_count++;
xprt->stat.connect_time += (long)jiffies -
xprt->stat.connect_start;
xprt_set_connected(xprt);
rc = -EAGAIN;
}
xprt_wake_pending_tasks(xprt, rc);
}
/**
@ -425,12 +422,6 @@ void xprt_rdma_close(struct rpc_xprt *xprt)
return;
rpcrdma_ep_disconnect(ep, ia);
/* Prepare @xprt for the next connection by reinitializing
* its credit grant to one (see RFC 8166, Section 3.3.3).
*/
r_xprt->rx_buf.rb_credits = 1;
xprt->cwnd = RPC_CWNDSHIFT;
out:
xprt->reestablish_timeout = 0;
++xprt->connect_cookie;
@ -450,12 +441,6 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
struct sockaddr *sap = (struct sockaddr *)&xprt->addr;
char buf[8];
dprintk("RPC: %s: setting port for xprt %p (%s:%s) to %u\n",
__func__, xprt,
xprt->address_strings[RPC_DISPLAY_ADDR],
xprt->address_strings[RPC_DISPLAY_PORT],
port);
rpc_set_port(sap, port);
kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
@ -465,6 +450,9 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
snprintf(buf, sizeof(buf), "%4hx", port);
xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
trace_xprtrdma_op_setport(container_of(xprt, struct rpcrdma_xprt,
rx_xprt));
}
/**
@ -536,13 +524,12 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
unsigned long delay;
trace_xprtrdma_op_connect(r_xprt);
delay = 0;
if (r_xprt->rx_ep.rep_connected != 0) {
delay = xprt_reconnect_delay(xprt);
xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
}
trace_xprtrdma_op_connect(r_xprt, delay);
queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
delay);
}

View File

@ -74,17 +74,17 @@
/*
* internal functions
*/
static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_sendctx *sc);
static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags);
static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
/* Wait for outstanding transport work to finish. ib_drain_qp
* handles the drains in the wrong order for us, so open code
@ -125,7 +125,7 @@ rpcrdma_qp_event_handler(struct ib_event *event, void *context)
/**
* rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
* @cq: completion queue (ignored)
* @cq: completion queue
* @wc: completed WR
*
*/
@ -138,7 +138,7 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_send(sc, wc);
rpcrdma_sendctx_put_locked(sc);
rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc);
}
/**
@ -170,7 +170,6 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
rdmab_addr(rep->rr_rdmabuf),
wc->byte_len, DMA_FROM_DEVICE);
rpcrdma_post_recvs(r_xprt, false);
rpcrdma_reply_handler(rep);
return;
@ -178,11 +177,11 @@ out_flushed:
rpcrdma_recv_buffer_put(rep);
}
static void
rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
struct rdma_conn_param *param)
static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt,
struct rdma_conn_param *param)
{
const struct rpcrdma_connect_private *pmsg = param->private_data;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
unsigned int rsize, wsize;
/* Default settings for RPC-over-RDMA Version One */
@ -198,13 +197,11 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
}
if (rsize < r_xprt->rx_ep.rep_inline_recv)
r_xprt->rx_ep.rep_inline_recv = rsize;
if (wsize < r_xprt->rx_ep.rep_inline_send)
r_xprt->rx_ep.rep_inline_send = wsize;
dprintk("RPC: %s: max send %u, max recv %u\n", __func__,
r_xprt->rx_ep.rep_inline_send,
r_xprt->rx_ep.rep_inline_recv);
if (rsize < ep->rep_inline_recv)
ep->rep_inline_recv = rsize;
if (wsize < ep->rep_inline_send)
ep->rep_inline_send = wsize;
rpcrdma_set_max_header_sizes(r_xprt);
}
@ -258,7 +255,8 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
case RDMA_CM_EVENT_ESTABLISHED:
++xprt->connect_cookie;
ep->rep_connected = 1;
rpcrdma_update_connect_private(r_xprt, &event->param.conn);
rpcrdma_update_cm_private(r_xprt, &event->param.conn);
trace_xprtrdma_inline_thresh(r_xprt);
wake_up_all(&ep->rep_connect_wait);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
@ -298,8 +296,6 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
struct rdma_cm_id *id;
int rc;
trace_xprtrdma_conn_start(xprt);
init_completion(&ia->ri_done);
init_completion(&ia->ri_remove_done);
@ -315,10 +311,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
if (rc)
goto out;
rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
if (rc < 0) {
trace_xprtrdma_conn_tout(xprt);
if (rc < 0)
goto out;
}
rc = ia->ri_async_rc;
if (rc)
@ -329,10 +323,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
if (rc)
goto out;
rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
if (rc < 0) {
trace_xprtrdma_conn_tout(xprt);
if (rc < 0)
goto out;
}
rc = ia->ri_async_rc;
if (rc)
goto out;
@ -409,8 +401,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
cancel_work_sync(&buf->rb_refresh_worker);
/* This is similar to rpcrdma_ep_destroy, but:
* - Don't cancel the connect worker.
* - Don't call rpcrdma_ep_disconnect, which waits
@ -437,7 +427,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
}
rpcrdma_mrs_destroy(buf);
rpcrdma_mrs_destroy(r_xprt);
ib_dealloc_pd(ia->ri_pd);
ia->ri_pd = NULL;
@ -522,7 +512,7 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
init_waitqueue_head(&ep->rep_connect_wait);
ep->rep_receive_count = 0;
sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt,
ep->rep_attr.cap.max_send_wr + 1,
IB_POLL_WORKQUEUE);
if (IS_ERR(sendcq)) {
@ -630,8 +620,6 @@ static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
goto out3;
}
rpcrdma_mrs_create(r_xprt);
return 0;
out3:
@ -649,8 +637,6 @@ static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
struct rdma_cm_id *id, *old;
int err, rc;
trace_xprtrdma_reconnect(r_xprt);
rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
rc = -EHOSTUNREACH;
@ -705,7 +691,6 @@ retry:
memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
switch (ep->rep_connected) {
case 0:
dprintk("RPC: %s: connecting...\n", __func__);
rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
if (rc) {
rc = -ENETUNREACH;
@ -726,6 +711,7 @@ retry:
ep->rep_connected = 0;
xprt_clear_connected(xprt);
rpcrdma_reset_cwnd(r_xprt);
rpcrdma_post_recvs(r_xprt, true);
rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
@ -742,13 +728,14 @@ retry:
goto out;
}
dprintk("RPC: %s: connected\n", __func__);
rpcrdma_mrs_create(r_xprt);
out:
if (rc)
ep->rep_connected = rc;
out_noupdate:
trace_xprtrdma_connect(r_xprt, rc);
return rc;
}
@ -757,11 +744,8 @@ out_noupdate:
* @ep: endpoint to disconnect
* @ia: associated interface adapter
*
* This is separate from destroy to facilitate the ability
* to reconnect without recreating the endpoint.
*
* This call is not reentrant, and must not be made in parallel
* on the same endpoint.
* Caller serializes. Either the transport send lock is held,
* or we're being called to destroy the transport.
*/
void
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
@ -780,6 +764,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
trace_xprtrdma_disconnect(r_xprt, rc);
rpcrdma_xprt_drain(r_xprt);
rpcrdma_reqs_reset(r_xprt);
rpcrdma_mrs_destroy(r_xprt);
}
/* Fixed-size circular FIFO queue. This implementation is wait-free and
@ -817,9 +803,6 @@ static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
if (!sc)
return NULL;
sc->sc_wr.wr_cqe = &sc->sc_cqe;
sc->sc_wr.sg_list = sc->sc_sges;
sc->sc_wr.opcode = IB_WR_SEND;
sc->sc_cqe.done = rpcrdma_wc_send;
return sc;
}
@ -847,7 +830,6 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
if (!sc)
return -ENOMEM;
sc->sc_xprt = r_xprt;
buf->rb_sc_ctxs[i] = sc;
}
@ -910,6 +892,7 @@ out_emptyq:
/**
* rpcrdma_sendctx_put_locked - Release a send context
* @r_xprt: controlling transport instance
* @sc: send context to release
*
* Usage: Called from Send completion to return a sendctxt
@ -917,10 +900,10 @@ out_emptyq:
*
* The caller serializes calls to this function (per transport).
*/
static void
rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_sendctx *sc)
{
struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
unsigned long next_tail;
/* Unmap SGEs of previously completed but unsignaled
@ -938,7 +921,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
/* Paired with READ_ONCE */
smp_store_release(&buf->rb_sc_tail, next_tail);
xprt_write_space(&sc->sc_xprt->rx_xprt);
xprt_write_space(&r_xprt->rx_xprt);
}
static void
@ -965,7 +948,7 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
mr->mr_xprt = r_xprt;
spin_lock(&buf->rb_lock);
list_add(&mr->mr_list, &buf->rb_mrs);
rpcrdma_mr_push(mr, &buf->rb_mrs);
list_add(&mr->mr_all, &buf->rb_all_mrs);
spin_unlock(&buf->rb_lock);
}
@ -986,6 +969,28 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
xprt_write_space(&r_xprt->rx_xprt);
}
/**
* rpcrdma_mrs_refresh - Wake the MR refresh worker
* @r_xprt: controlling transport instance
*
*/
void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
/* If there is no underlying device, it's no use to
* wake the refresh worker.
*/
if (ep->rep_connected != -ENODEV) {
/* The work is scheduled on a WQ_MEM_RECLAIM
* workqueue in order to prevent MR allocation
* from recursing into NFS during direct reclaim.
*/
queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
}
}
/**
* rpcrdma_req_create - Allocate an rpcrdma_req object
* @r_xprt: controlling r_xprt
@ -1042,6 +1047,26 @@ out1:
return NULL;
}
/**
* rpcrdma_reqs_reset - Reset all reqs owned by a transport
* @r_xprt: controlling transport instance
*
* ASSUMPTION: the rb_allreqs list is stable for the duration,
* and thus can be walked without holding rb_lock. Eg. the
* caller is holding the transport send lock to exclude
* device removal or disconnection.
*/
static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
/* Credits are valid only for one connection */
req->rl_slot.rq_cong = 0;
}
}
static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
bool temp)
{
@ -1125,8 +1150,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
INIT_LIST_HEAD(&buf->rb_all_mrs);
INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
rpcrdma_mrs_create(r_xprt);
INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
@ -1134,14 +1157,13 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
GFP_KERNEL);
if (!req)
goto out;
list_add(&req->rl_list, &buf->rb_send_bufs);
}
buf->rb_credits = 1;
init_llist_head(&buf->rb_free_reps);
rc = rpcrdma_sendctxs_create(r_xprt);
@ -1158,15 +1180,24 @@ out:
* rpcrdma_req_destroy - Destroy an rpcrdma_req object
* @req: unused object to be destroyed
*
* This function assumes that the caller prevents concurrent device
* unload and transport tear-down.
* Relies on caller holding the transport send lock to protect
* removing req->rl_all from buf->rb_all_reqs safely.
*/
void rpcrdma_req_destroy(struct rpcrdma_req *req)
{
struct rpcrdma_mr *mr;
list_del(&req->rl_all);
while (!list_empty(&req->rl_free_mrs))
rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
spin_lock(&buf->rb_lock);
list_del(&mr->mr_all);
spin_unlock(&buf->rb_lock);
frwr_release_mr(mr);
}
rpcrdma_regbuf_free(req->rl_recvbuf);
rpcrdma_regbuf_free(req->rl_sendbuf);
@ -1174,28 +1205,33 @@ void rpcrdma_req_destroy(struct rpcrdma_req *req)
kfree(req);
}
static void
rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
/**
* rpcrdma_mrs_destroy - Release all of a transport's MRs
* @r_xprt: controlling transport instance
*
* Relies on caller holding the transport send lock to protect
* removing mr->mr_list from req->rl_free_mrs safely.
*/
static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
rx_buf);
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_mr *mr;
unsigned int count;
count = 0;
cancel_work_sync(&buf->rb_refresh_worker);
spin_lock(&buf->rb_lock);
while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
struct rpcrdma_mr,
mr_all)) != NULL) {
list_del(&mr->mr_list);
list_del(&mr->mr_all);
spin_unlock(&buf->rb_lock);
frwr_release_mr(mr);
count++;
spin_lock(&buf->rb_lock);
}
spin_unlock(&buf->rb_lock);
r_xprt->rx_stats.mrs_allocated = 0;
}
/**
@ -1209,8 +1245,6 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
cancel_work_sync(&buf->rb_refresh_worker);
rpcrdma_sendctxs_destroy(buf);
rpcrdma_reps_destroy(buf);
@ -1222,8 +1256,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
list_del(&req->rl_list);
rpcrdma_req_destroy(req);
}
rpcrdma_mrs_destroy(buf);
}
/**
@ -1264,17 +1296,6 @@ void rpcrdma_mr_put(struct rpcrdma_mr *mr)
rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
}
static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
mr->mr_req = NULL;
spin_lock(&buf->rb_lock);
rpcrdma_mr_push(mr, &buf->rb_mrs);
spin_unlock(&buf->rb_lock);
}
/**
* rpcrdma_buffer_get - Get a request buffer
* @buffers: Buffer pool from which to obtain a buffer
@ -1437,7 +1458,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
struct ib_send_wr *send_wr = &req->rl_wr;
int rc;
if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
@ -1455,8 +1476,13 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
return 0;
}
static void
rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
/**
* rpcrdma_post_recvs - Refill the Receive Queue
* @r_xprt: controlling transport instance
* @temp: mark Receive buffers to be deleted after use
*
*/
void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;

View File

@ -218,12 +218,8 @@ enum {
/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
*/
struct rpcrdma_req;
struct rpcrdma_xprt;
struct rpcrdma_sendctx {
struct ib_send_wr sc_wr;
struct ib_cqe sc_cqe;
struct ib_device *sc_device;
struct rpcrdma_xprt *sc_xprt;
struct rpcrdma_req *sc_req;
unsigned int sc_unmap_count;
struct ib_sge sc_sges[];
@ -257,7 +253,6 @@ struct rpcrdma_mr {
u32 mr_handle;
u32 mr_length;
u64 mr_offset;
struct work_struct mr_recycle;
struct list_head mr_all;
};
@ -318,6 +313,7 @@ struct rpcrdma_req {
struct rpcrdma_rep *rl_reply;
struct xdr_stream rl_stream;
struct xdr_buf rl_hdrbuf;
struct ib_send_wr rl_wr;
struct rpcrdma_sendctx *rl_sendctx;
struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */
@ -474,6 +470,7 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
struct rpcrdma_req *);
void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
/*
* Buffer calls - xprtrdma/verbs.c
@ -487,12 +484,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt);
void rpcrdma_mr_put(struct rpcrdma_mr *mr);
static inline void
rpcrdma_mr_recycle(struct rpcrdma_mr *mr)
{
schedule_work(&mr->mr_recycle);
}
void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
@ -542,7 +534,6 @@ rpcrdma_data_dir(bool writing)
/* Memory registration calls xprtrdma/frwr_ops.c
*/
bool frwr_is_supported(struct ib_device *device);
void frwr_recycle(struct rpcrdma_req *req);
void frwr_reset(struct rpcrdma_req *req);
int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep);
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
@ -563,6 +554,8 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
enum rpcrdma_chunktype {
rpcrdma_noch = 0,
rpcrdma_noch_pullup,
rpcrdma_noch_mapped,
rpcrdma_readch,
rpcrdma_areadch,
rpcrdma_writech,
@ -576,6 +569,7 @@ int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc);
int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
void rpcrdma_reply_handler(struct rpcrdma_rep *rep);