kcm: Send multiple frags in one sendmsg()

Rewrite the AF_KCM transmission loop to send all the fragments in a single
skb or frag_list-skb in one sendmsg() with MSG_SPLICE_PAGES set.  The list
of fragments in each skb is conveniently a bio_vec[] that can just be
attached to a BVEC iter.

Note: I'm working out the size of each fragment-skb by adding up bv_len for
all the bio_vecs in skb->frags[] - but surely this information is recorded
somewhere?  For the skbs in head->frag_list, this is equal to
skb->data_len, but not for the head.  head->data_len includes all the tail
frags too.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Tom Herbert <tom@herbertland.com>
cc: Tom Herbert <tom@quantonium.net>
cc: Jens Axboe <axboe@kernel.dk>
cc: Matthew Wilcox <willy@infradead.org>
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
This commit is contained in:
David Howells 2023-06-09 11:02:21 +01:00 committed by Jakub Kicinski
parent 264ba53fac
commit c31a25e1db
2 changed files with 49 additions and 75 deletions

View File

@ -47,9 +47,9 @@ struct kcm_stats {
struct kcm_tx_msg {
unsigned int sent;
unsigned int fragidx;
unsigned int frag_offset;
unsigned int msg_flags;
bool started_tx;
struct sk_buff *frag_skb;
struct sk_buff *last_skb;
};

View File

@ -581,12 +581,10 @@ static void kcm_report_tx_retry(struct kcm_sock *kcm)
*/
static int kcm_write_msgs(struct kcm_sock *kcm)
{
unsigned int total_sent = 0;
struct sock *sk = &kcm->sk;
struct kcm_psock *psock;
struct sk_buff *skb, *head;
struct kcm_tx_msg *txm;
unsigned short fragidx, frag_offset;
unsigned int sent, total_sent = 0;
struct sk_buff *head;
int ret = 0;
kcm->tx_wait_more = false;
@ -600,78 +598,57 @@ static int kcm_write_msgs(struct kcm_sock *kcm)
if (skb_queue_empty(&sk->sk_write_queue))
return 0;
kcm_tx_msg(skb_peek(&sk->sk_write_queue))->sent = 0;
} else if (skb_queue_empty(&sk->sk_write_queue)) {
return 0;
kcm_tx_msg(skb_peek(&sk->sk_write_queue))->started_tx = false;
}
head = skb_peek(&sk->sk_write_queue);
txm = kcm_tx_msg(head);
retry:
while ((head = skb_peek(&sk->sk_write_queue))) {
struct msghdr msg = {
.msg_flags = MSG_DONTWAIT | MSG_SPLICE_PAGES,
};
struct kcm_tx_msg *txm = kcm_tx_msg(head);
struct sk_buff *skb;
unsigned int msize;
int i;
if (txm->sent) {
/* Send of first skbuff in queue already in progress */
if (WARN_ON(!psock)) {
ret = -EINVAL;
goto out;
if (!txm->started_tx) {
psock = reserve_psock(kcm);
if (!psock)
goto out;
skb = head;
txm->frag_offset = 0;
txm->sent = 0;
txm->started_tx = true;
} else {
if (WARN_ON(!psock)) {
ret = -EINVAL;
goto out;
}
skb = txm->frag_skb;
}
sent = txm->sent;
frag_offset = txm->frag_offset;
fragidx = txm->fragidx;
skb = txm->frag_skb;
goto do_frag;
}
try_again:
psock = reserve_psock(kcm);
if (!psock)
goto out;
do {
skb = head;
txm = kcm_tx_msg(head);
sent = 0;
do_frag_list:
if (WARN_ON(!skb_shinfo(skb)->nr_frags)) {
ret = -EINVAL;
goto out;
}
for (fragidx = 0; fragidx < skb_shinfo(skb)->nr_frags;
fragidx++) {
struct bio_vec bvec;
struct msghdr msg = {
.msg_flags = MSG_DONTWAIT | MSG_SPLICE_PAGES,
};
skb_frag_t *frag;
msize = 0;
for (i = 0; i < skb_shinfo(skb)->nr_frags; i++)
msize += skb_shinfo(skb)->frags[i].bv_len;
frag_offset = 0;
do_frag:
frag = &skb_shinfo(skb)->frags[fragidx];
if (WARN_ON(!skb_frag_size(frag))) {
ret = -EINVAL;
goto out;
}
iov_iter_bvec(&msg.msg_iter, ITER_SOURCE,
skb_shinfo(skb)->frags, skb_shinfo(skb)->nr_frags,
msize);
iov_iter_advance(&msg.msg_iter, txm->frag_offset);
bvec_set_page(&bvec,
skb_frag_page(frag),
skb_frag_size(frag) - frag_offset,
skb_frag_off(frag) + frag_offset);
iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1,
bvec.bv_len);
do {
ret = sock_sendmsg(psock->sk->sk_socket, &msg);
if (ret <= 0) {
if (ret == -EAGAIN) {
/* Save state to try again when there's
* write space on the socket
*/
txm->sent = sent;
txm->frag_offset = frag_offset;
txm->fragidx = fragidx;
txm->frag_skb = skb;
ret = 0;
goto out;
}
@ -685,39 +662,36 @@ do_frag:
true);
unreserve_psock(kcm);
txm->sent = 0;
txm->started_tx = false;
kcm_report_tx_retry(kcm);
ret = 0;
goto try_again;
goto retry;
}
sent += ret;
frag_offset += ret;
txm->sent += ret;
txm->frag_offset += ret;
KCM_STATS_ADD(psock->stats.tx_bytes, ret);
if (frag_offset < skb_frag_size(frag)) {
/* Not finished with this frag */
goto do_frag;
}
}
} while (msg.msg_iter.count > 0);
if (skb == head) {
if (skb_has_frag_list(skb)) {
skb = skb_shinfo(skb)->frag_list;
goto do_frag_list;
txm->frag_skb = skb_shinfo(skb)->frag_list;
txm->frag_offset = 0;
continue;
}
} else if (skb->next) {
skb = skb->next;
goto do_frag_list;
txm->frag_skb = skb->next;
txm->frag_offset = 0;
continue;
}
/* Successfully sent the whole packet, account for it. */
sk->sk_wmem_queued -= txm->sent;
total_sent += txm->sent;
skb_dequeue(&sk->sk_write_queue);
kfree_skb(head);
sk->sk_wmem_queued -= sent;
total_sent += sent;
KCM_STATS_INCR(psock->stats.tx_msgs);
} while ((head = skb_peek(&sk->sk_write_queue)));
}
out:
if (!head) {
/* Done with all queued messages. */