RxRPC development

-----BEGIN PGP SIGNATURE-----
 
 iQIVAwUAWec8A/Sw1s6N8H32AQIr+Q/9ETxizxbjsjF45Ieudauw2Cd+ozXMh+va
 3j06OuVhc/YQMhnfHvFFyQIqRmammv84KIy3BIZP0NcNR6sMM9Q2OIoIDM6NfAXF
 qbxqCEI5Iyr6gXMk8bqpDndmVk1ev5X5odB92ISHTPUtB2ZcAjCLOmICgp4gUm5F
 DuTO0wFKZNdh1ydgVJhMcAAuYHFWpp1dV5w6palzwgMrDUj3PlzKPIMseBEiOi8Z
 ba4LjKSvaU/tf+N+QUH3jGRKPLmlOWJQHP1SawPAIxnECydedAHzAygg8f2Uko67
 j6eFSWvzMgnOOja3Y8IXaHWWkJ7R8T5/6t/qPit74psaNf9MjFjKBQQBlXWvVmyK
 Vgoxv1/7MY6KNB7da9zkDDsuWNfq1/t4qJsLWYbGj0jnaWCGmGJfGhqOE+LMsxUt
 4g3S7Vug0dxUJw+Zzf509xpaCtiOOZNPxxibh9oUIM8q/yt7uZXOXHWh7bR8Fvb9
 4XNbKm5aOtZse4PrDjggt8ClV+sw+Gx98KjO9lzQ/ONpUs63jJP7cq7GaAAEOuVL
 iG+6rX9RQKgZfRFxdixHQfyL+9NSYrqYIATTeoiAwv4i7lYsh58ZWgt4cRg+p0Xt
 q3m1kCpvbNYyxb5WRta1nU/ZgIPbmisnyoCPiw8QeJkrtGOSrFdKRods9oSsZaV8
 49qJ5M6fFLg=
 =YkMB
 -----END PGP SIGNATURE-----

Merge tag 'rxrpc-next-20171018' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Add bits for kernel services

Here are some patches that add a few things for kernel services to use:

 (1) Allow service upgrade to be requested and allow the resultant actual
     service ID to be obtained.

 (2) Allow the RTT time of a call to be obtained.

 (3) Allow a kernel service to find out if a call is still alive on a
     server between transmitting a request and getting the reply.

 (4) Allow data transmission to ignore signals if transmission progress is
     being made in reasonable time.  This is also usable by userspace by
     passing MSG_WAITALL to sendmsg()[*].

[*] I'm not sure this is the right interface for this or whether a sockopt
    should be used instead.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
David S. Miller 2017-10-20 08:42:09 +01:00
commit 9854d758f7
8 changed files with 211 additions and 41 deletions

View File

@ -280,6 +280,18 @@ Interaction with the user of the RxRPC socket:
nominated by a socket option.
Notes on sendmsg:
(*) MSG_WAITALL can be set to tell sendmsg to ignore signals if the peer is
making progress at accepting packets within a reasonable time such that we
manage to queue up all the data for transmission. This requires the
client to accept at least one packet per 2*RTT time period.
If this isn't set, sendmsg() will return immediately, either returning
EINTR/ERESTARTSYS if nothing was consumed or returning the amount of data
consumed.
Notes on recvmsg:
(*) If there's a sequence of data messages belonging to a particular call on
@ -782,7 +794,9 @@ The kernel interface functions are as follows:
struct key *key,
unsigned long user_call_ID,
s64 tx_total_len,
gfp_t gfp);
gfp_t gfp,
rxrpc_notify_rx_t notify_rx,
bool upgrade);
This allocates the infrastructure to make a new RxRPC call and assigns
call and connection numbers. The call will be made on the UDP port that
@ -803,6 +817,13 @@ The kernel interface functions are as follows:
allows the kernel to encrypt directly to the packet buffers, thereby
saving a copy. The value may not be less than -1.
notify_rx is a pointer to a function to be called when events such as
incoming data packets or remote aborts happen.
upgrade should be set to true if a client operation should request that
the server upgrade the service to a better one. The resultant service ID
is returned by rxrpc_kernel_recv_data().
If this function is successful, an opaque reference to the RxRPC call is
returned. The caller now holds a reference on this and it must be
properly ended.
@ -850,7 +871,8 @@ The kernel interface functions are as follows:
size_t size,
size_t *_offset,
bool want_more,
u32 *_abort)
u32 *_abort,
u16 *_service)
This is used to receive data from either the reply part of a client call
or the request part of a service call. buf and size specify how much
@ -873,6 +895,9 @@ The kernel interface functions are as follows:
If a remote ABORT is detected, the abort code received will be stored in
*_abort and ECONNABORTED will be returned.
The service ID that the call ended up with is returned into *_service.
This can be used to see if a call got a service upgrade.
(*) Abort a call.
void rxrpc_kernel_abort_call(struct socket *sock,
@ -1020,6 +1045,30 @@ The kernel interface functions are as follows:
It returns 0 if the call was requeued and an error otherwise.
(*) Get call RTT.
u64 rxrpc_kernel_get_rtt(struct socket *sock, struct rxrpc_call *call);
Get the RTT time to the peer in use by a call. The value returned is in
nanoseconds.
(*) Check call still alive.
u32 rxrpc_kernel_check_life(struct socket *sock,
struct rxrpc_call *call);
This returns a number that is updated when ACKs are received from the peer
(notably including PING RESPONSE ACKs which we can elicit by sending PING
ACKs to see if the call still exists on the server). The caller should
compare the numbers of two calls to see if the call is still alive after
waiting for a suitable interval.
This allows the caller to work out if the server is still contactable and
if the call is still alive on the server whilst waiting for the server to
process a client operation.
This function may transmit a PING ACK.
=======================
CONFIGURABLE PARAMETERS

View File

@ -100,6 +100,7 @@ struct afs_call {
bool send_pages; /* T if data from mapping should be sent */
bool need_attention; /* T if RxRPC poked us */
bool async; /* T if asynchronous */
bool upgrade; /* T to request service upgrade */
u16 service_id; /* RxRPC service ID to call */
__be16 port; /* target UDP port */
u32 operation_ID; /* operation ID for an incoming call */

View File

@ -387,7 +387,8 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
tx_total_len, gfp,
(async ?
afs_wake_up_async_call :
afs_wake_up_call_waiter));
afs_wake_up_call_waiter),
call->upgrade);
call->key = NULL;
if (IS_ERR(rxcall)) {
ret = PTR_ERR(rxcall);
@ -406,7 +407,7 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
call->request_size);
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = (call->send_pages ? MSG_MORE : 0);
msg.msg_flags = MSG_WAITALL | (call->send_pages ? MSG_MORE : 0);
/* We have to change the state *before* sending the last packet as
* rxrpc might give us the reply before it returns from sending the
@ -443,7 +444,7 @@ error_do_abort:
abort_code = 0;
offset = 0;
rxrpc_kernel_recv_data(afs_socket, rxcall, NULL, 0, &offset,
false, &abort_code);
false, &abort_code, &call->service_id);
ret = call->type->abort_to_error(abort_code);
}
error_kill_call:
@ -471,7 +472,8 @@ static void afs_deliver_to_call(struct afs_call *call)
size_t offset = 0;
ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall,
NULL, 0, &offset, false,
&call->abort_code);
&call->abort_code,
&call->service_id);
trace_afs_recv_data(call, 0, offset, false, ret);
if (ret == -EINPROGRESS || ret == -EAGAIN)
@ -536,15 +538,26 @@ call_complete:
*/
static int afs_wait_for_call_to_complete(struct afs_call *call)
{
signed long rtt2, timeout;
int ret;
u64 rtt;
u32 life, last_life;
DECLARE_WAITQUEUE(myself, current);
_enter("");
rtt = rxrpc_kernel_get_rtt(afs_socket, call->rxcall);
rtt2 = nsecs_to_jiffies64(rtt) * 2;
if (rtt2 < 2)
rtt2 = 2;
timeout = rtt2;
last_life = rxrpc_kernel_check_life(afs_socket, call->rxcall);
add_wait_queue(&call->waitq, &myself);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
set_current_state(TASK_UNINTERRUPTIBLE);
/* deliver any messages that are in the queue */
if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
@ -554,10 +567,20 @@ static int afs_wait_for_call_to_complete(struct afs_call *call)
continue;
}
if (call->state == AFS_CALL_COMPLETE ||
signal_pending(current))
if (call->state == AFS_CALL_COMPLETE)
break;
schedule();
life = rxrpc_kernel_check_life(afs_socket, call->rxcall);
if (timeout == 0 &&
life == last_life && signal_pending(current))
break;
if (life != last_life) {
timeout = rtt2;
last_life = life;
}
timeout = schedule_timeout(timeout);
}
remove_wait_queue(&call->waitq, &myself);
@ -851,7 +874,8 @@ int afs_extract_data(struct afs_call *call, void *buf, size_t count,
ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall,
buf, count, &call->offset,
want_more, &call->abort_code);
want_more, &call->abort_code,
&call->service_id);
trace_afs_recv_data(call, count, call->offset, want_more, ret);
if (ret == 0 || ret == -EAGAIN)
return ret;

View File

@ -49,17 +49,19 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
unsigned long,
s64,
gfp_t,
rxrpc_notify_rx_t);
rxrpc_notify_rx_t,
bool);
int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
struct msghdr *, size_t,
rxrpc_notify_end_tx_t);
int rxrpc_kernel_recv_data(struct socket *, struct rxrpc_call *,
void *, size_t, size_t *, bool, u32 *);
void *, size_t, size_t *, bool, u32 *, u16 *);
bool rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *,
u32, int, const char *);
void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *);
void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
struct sockaddr_rxrpc *);
u64 rxrpc_kernel_get_rtt(struct socket *, struct rxrpc_call *);
int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
rxrpc_user_attach_call_t, unsigned long, gfp_t);
void rxrpc_kernel_set_tx_length(struct socket *, struct rxrpc_call *, s64);
@ -67,5 +69,6 @@ int rxrpc_kernel_retry_call(struct socket *, struct rxrpc_call *,
struct sockaddr_rxrpc *, struct key *);
int rxrpc_kernel_check_call(struct socket *, struct rxrpc_call *,
enum rxrpc_call_completion *, u32 *);
u32 rxrpc_kernel_check_life(struct socket *, struct rxrpc_call *);
#endif /* _NET_RXRPC_H */

View File

@ -265,6 +265,7 @@ static int rxrpc_listen(struct socket *sock, int backlog)
* @tx_total_len: Total length of data to transmit during the call (or -1)
* @gfp: The allocation constraints
* @notify_rx: Where to send notifications instead of socket queue
* @upgrade: Request service upgrade for call
*
* Allow a kernel service to begin a call on the nominated socket. This just
* sets up all the internal tracking structures and allocates connection and
@ -279,7 +280,8 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
unsigned long user_call_ID,
s64 tx_total_len,
gfp_t gfp,
rxrpc_notify_rx_t notify_rx)
rxrpc_notify_rx_t notify_rx,
bool upgrade)
{
struct rxrpc_conn_parameters cp;
struct rxrpc_call *call;
@ -304,6 +306,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
cp.key = key;
cp.security_level = 0;
cp.exclusive = false;
cp.upgrade = upgrade;
cp.service_id = srx->srx_service;
call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, tx_total_len,
gfp);
@ -336,6 +339,25 @@ void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
}
EXPORT_SYMBOL(rxrpc_kernel_end_call);
/**
* rxrpc_kernel_check_life - Check to see whether a call is still alive
* @sock: The socket the call is on
* @call: The call to check
*
* Allow a kernel service to find out whether a call is still alive - ie. we're
* getting ACKs from the server. Returns a number representing the life state
* which can be compared to that returned by a previous call.
*
* If this is a client call, ping ACKs will be sent to the server to find out
* whether it's still responsive and whether the call is still alive on the
* server.
*/
u32 rxrpc_kernel_check_life(struct socket *sock, struct rxrpc_call *call)
{
return call->acks_latest;
}
EXPORT_SYMBOL(rxrpc_kernel_check_life);
/**
* rxrpc_kernel_check_call - Check a call's state
* @sock: The socket the call is on

View File

@ -411,3 +411,16 @@ void rxrpc_kernel_get_peer(struct socket *sock, struct rxrpc_call *call,
*_srx = call->peer->srx;
}
EXPORT_SYMBOL(rxrpc_kernel_get_peer);
/**
* rxrpc_kernel_get_rtt - Get a call's peer RTT
* @sock: The socket on which the call is in progress.
* @call: The call to query
*
* Get the call's peer RTT.
*/
u64 rxrpc_kernel_get_rtt(struct socket *sock, struct rxrpc_call *call)
{
return call->peer->rtt;
}
EXPORT_SYMBOL(rxrpc_kernel_get_rtt);

View File

@ -607,6 +607,7 @@ wait_error:
* @_offset: The running offset into the buffer.
* @want_more: True if more data is expected to be read
* @_abort: Where the abort code is stored if -ECONNABORTED is returned
* @_service: Where to store the actual service ID (may be upgraded)
*
* Allow a kernel service to receive data and pick up information about the
* state of a call. Returns 0 if got what was asked for and there's more
@ -624,7 +625,7 @@ wait_error:
*/
int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
void *buf, size_t size, size_t *_offset,
bool want_more, u32 *_abort)
bool want_more, u32 *_abort, u16 *_service)
{
struct iov_iter iter;
struct kvec iov;
@ -680,6 +681,8 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
read_phase_complete:
ret = 1;
out:
if (_service)
*_service = call->service_id;
mutex_unlock(&call->user_mutex);
_leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
return ret;

View File

@ -37,13 +37,87 @@ struct rxrpc_send_params {
bool upgrade; /* If the connection is upgradeable */
};
/*
* Wait for space to appear in the Tx queue or a signal to occur.
*/
static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
struct rxrpc_call *call,
long *timeo)
{
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (call->tx_top - call->tx_hard_ack <
min_t(unsigned int, call->tx_winsize,
call->cong_cwnd + call->cong_extra))
return 0;
if (call->state >= RXRPC_CALL_COMPLETE)
return call->error;
if (signal_pending(current))
return sock_intr_errno(*timeo);
trace_rxrpc_transmit(call, rxrpc_transmit_wait);
mutex_unlock(&call->user_mutex);
*timeo = schedule_timeout(*timeo);
if (mutex_lock_interruptible(&call->user_mutex) < 0)
return sock_intr_errno(*timeo);
}
}
/*
* Wait for space to appear in the Tx queue uninterruptibly, but with
* a timeout of 2*RTT if no progress was made and a signal occurred.
*/
static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
struct rxrpc_call *call)
{
rxrpc_seq_t tx_start, tx_win;
signed long rtt2, timeout;
u64 rtt;
rtt = READ_ONCE(call->peer->rtt);
rtt2 = nsecs_to_jiffies64(rtt) * 2;
if (rtt2 < 1)
rtt2 = 1;
timeout = rtt2;
tx_start = READ_ONCE(call->tx_hard_ack);
for (;;) {
set_current_state(TASK_UNINTERRUPTIBLE);
tx_win = READ_ONCE(call->tx_hard_ack);
if (call->tx_top - tx_win <
min_t(unsigned int, call->tx_winsize,
call->cong_cwnd + call->cong_extra))
return 0;
if (call->state >= RXRPC_CALL_COMPLETE)
return call->error;
if (timeout == 0 &&
tx_win == tx_start && signal_pending(current))
return -EINTR;
if (tx_win != tx_start) {
timeout = rtt2;
tx_start = tx_win;
}
trace_rxrpc_transmit(call, rxrpc_transmit_wait);
timeout = schedule_timeout(timeout);
}
}
/*
* wait for space to appear in the transmit/ACK window
* - caller holds the socket locked
*/
static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
struct rxrpc_call *call,
long *timeo)
long *timeo,
bool waitall)
{
DECLARE_WAITQUEUE(myself, current);
int ret;
@ -53,30 +127,10 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
add_wait_queue(&call->waitq, &myself);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
ret = 0;
if (call->tx_top - call->tx_hard_ack <
min_t(unsigned int, call->tx_winsize,
call->cong_cwnd + call->cong_extra))
break;
if (call->state >= RXRPC_CALL_COMPLETE) {
ret = call->error;
break;
}
if (signal_pending(current)) {
ret = sock_intr_errno(*timeo);
break;
}
trace_rxrpc_transmit(call, rxrpc_transmit_wait);
mutex_unlock(&call->user_mutex);
*timeo = schedule_timeout(*timeo);
if (mutex_lock_interruptible(&call->user_mutex) < 0) {
ret = sock_intr_errno(*timeo);
break;
}
}
if (waitall)
ret = rxrpc_wait_for_tx_window_nonintr(rx, call);
else
ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
remove_wait_queue(&call->waitq, &myself);
set_current_state(TASK_RUNNING);
@ -254,7 +308,8 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
if (msg->msg_flags & MSG_DONTWAIT)
goto maybe_error;
ret = rxrpc_wait_for_tx_window(rx, call,
&timeo);
&timeo,
msg->msg_flags & MSG_WAITALL);
if (ret < 0)
goto maybe_error;
}