wireguard: receive: use ring buffer for incoming handshakes

Apparently the spinlock on incoming_handshake's skb_queue is highly
contended, and a torrent of handshake or cookie packets can bring the
data plane to its knees, simply by virtue of enqueueing the handshake
packets to be processed asynchronously. So, we try switching this to a
ring buffer to hopefully have less lock contention. This alleviates the
problem somewhat, though it still isn't perfect, so future patches will
have to improve this further. However, it at least doesn't completely
diminish the data plane.

Reported-by: Streun Fabio <fstreun@student.ethz.ch>
Reported-by: Joel Wanner <joel.wanner@inf.ethz.ch>
Fixes: e7096c131e ("net: WireGuard secure network tunnel")
Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
This commit is contained in:
Jason A. Donenfeld 2021-11-29 10:39:26 -05:00 committed by Jakub Kicinski
parent 20ae1d6aa1
commit 886fcee939
5 changed files with 37 additions and 43 deletions

View File

@ -98,6 +98,7 @@ static int wg_stop(struct net_device *dev)
{
struct wg_device *wg = netdev_priv(dev);
struct wg_peer *peer;
struct sk_buff *skb;
mutex_lock(&wg->device_update_lock);
list_for_each_entry(peer, &wg->peer_list, peer_list) {
@ -108,7 +109,9 @@ static int wg_stop(struct net_device *dev)
wg_noise_reset_last_sent_handshake(&peer->last_sent_handshake);
}
mutex_unlock(&wg->device_update_lock);
skb_queue_purge(&wg->incoming_handshakes);
while ((skb = ptr_ring_consume(&wg->handshake_queue.ring)) != NULL)
kfree_skb(skb);
atomic_set(&wg->handshake_queue_len, 0);
wg_socket_reinit(wg, NULL, NULL);
return 0;
}
@ -235,14 +238,13 @@ static void wg_destruct(struct net_device *dev)
destroy_workqueue(wg->handshake_receive_wq);
destroy_workqueue(wg->handshake_send_wq);
destroy_workqueue(wg->packet_crypt_wq);
wg_packet_queue_free(&wg->decrypt_queue);
wg_packet_queue_free(&wg->encrypt_queue);
wg_packet_queue_free(&wg->handshake_queue, true);
wg_packet_queue_free(&wg->decrypt_queue, false);
wg_packet_queue_free(&wg->encrypt_queue, false);
rcu_barrier(); /* Wait for all the peers to be actually freed. */
wg_ratelimiter_uninit();
memzero_explicit(&wg->static_identity, sizeof(wg->static_identity));
skb_queue_purge(&wg->incoming_handshakes);
free_percpu(dev->tstats);
free_percpu(wg->incoming_handshakes_worker);
kvfree(wg->index_hashtable);
kvfree(wg->peer_hashtable);
mutex_unlock(&wg->device_update_lock);
@ -298,7 +300,6 @@ static int wg_newlink(struct net *src_net, struct net_device *dev,
init_rwsem(&wg->static_identity.lock);
mutex_init(&wg->socket_update_lock);
mutex_init(&wg->device_update_lock);
skb_queue_head_init(&wg->incoming_handshakes);
wg_allowedips_init(&wg->peer_allowedips);
wg_cookie_checker_init(&wg->cookie_checker, wg);
INIT_LIST_HEAD(&wg->peer_list);
@ -316,16 +317,10 @@ static int wg_newlink(struct net *src_net, struct net_device *dev,
if (!dev->tstats)
goto err_free_index_hashtable;
wg->incoming_handshakes_worker =
wg_packet_percpu_multicore_worker_alloc(
wg_packet_handshake_receive_worker, wg);
if (!wg->incoming_handshakes_worker)
goto err_free_tstats;
wg->handshake_receive_wq = alloc_workqueue("wg-kex-%s",
WQ_CPU_INTENSIVE | WQ_FREEZABLE, 0, dev->name);
if (!wg->handshake_receive_wq)
goto err_free_incoming_handshakes;
goto err_free_tstats;
wg->handshake_send_wq = alloc_workqueue("wg-kex-%s",
WQ_UNBOUND | WQ_FREEZABLE, 0, dev->name);
@ -347,10 +342,15 @@ static int wg_newlink(struct net *src_net, struct net_device *dev,
if (ret < 0)
goto err_free_encrypt_queue;
ret = wg_ratelimiter_init();
ret = wg_packet_queue_init(&wg->handshake_queue, wg_packet_handshake_receive_worker,
MAX_QUEUED_INCOMING_HANDSHAKES);
if (ret < 0)
goto err_free_decrypt_queue;
ret = wg_ratelimiter_init();
if (ret < 0)
goto err_free_handshake_queue;
ret = register_netdevice(dev);
if (ret < 0)
goto err_uninit_ratelimiter;
@ -367,18 +367,18 @@ static int wg_newlink(struct net *src_net, struct net_device *dev,
err_uninit_ratelimiter:
wg_ratelimiter_uninit();
err_free_handshake_queue:
wg_packet_queue_free(&wg->handshake_queue, false);
err_free_decrypt_queue:
wg_packet_queue_free(&wg->decrypt_queue);
wg_packet_queue_free(&wg->decrypt_queue, false);
err_free_encrypt_queue:
wg_packet_queue_free(&wg->encrypt_queue);
wg_packet_queue_free(&wg->encrypt_queue, false);
err_destroy_packet_crypt:
destroy_workqueue(wg->packet_crypt_wq);
err_destroy_handshake_send:
destroy_workqueue(wg->handshake_send_wq);
err_destroy_handshake_receive:
destroy_workqueue(wg->handshake_receive_wq);
err_free_incoming_handshakes:
free_percpu(wg->incoming_handshakes_worker);
err_free_tstats:
free_percpu(dev->tstats);
err_free_index_hashtable:

View File

@ -39,21 +39,18 @@ struct prev_queue {
struct wg_device {
struct net_device *dev;
struct crypt_queue encrypt_queue, decrypt_queue;
struct crypt_queue encrypt_queue, decrypt_queue, handshake_queue;
struct sock __rcu *sock4, *sock6;
struct net __rcu *creating_net;
struct noise_static_identity static_identity;
struct workqueue_struct *handshake_receive_wq, *handshake_send_wq;
struct workqueue_struct *packet_crypt_wq;
struct sk_buff_head incoming_handshakes;
int incoming_handshake_cpu;
struct multicore_worker __percpu *incoming_handshakes_worker;
struct workqueue_struct *packet_crypt_wq,*handshake_receive_wq, *handshake_send_wq;
struct cookie_checker cookie_checker;
struct pubkey_hashtable *peer_hashtable;
struct index_hashtable *index_hashtable;
struct allowedips peer_allowedips;
struct mutex device_update_lock, socket_update_lock;
struct list_head device_list, peer_list;
atomic_t handshake_queue_len;
unsigned int num_peers, device_update_gen;
u32 fwmark;
u16 incoming_port;

View File

@ -38,11 +38,11 @@ int wg_packet_queue_init(struct crypt_queue *queue, work_func_t function,
return 0;
}
void wg_packet_queue_free(struct crypt_queue *queue)
void wg_packet_queue_free(struct crypt_queue *queue, bool purge)
{
free_percpu(queue->worker);
WARN_ON(!__ptr_ring_empty(&queue->ring));
ptr_ring_cleanup(&queue->ring, NULL);
WARN_ON(!purge && !__ptr_ring_empty(&queue->ring));
ptr_ring_cleanup(&queue->ring, purge ? (void(*)(void*))kfree_skb : NULL);
}
#define NEXT(skb) ((skb)->prev)

View File

@ -23,7 +23,7 @@ struct sk_buff;
/* queueing.c APIs: */
int wg_packet_queue_init(struct crypt_queue *queue, work_func_t function,
unsigned int len);
void wg_packet_queue_free(struct crypt_queue *queue);
void wg_packet_queue_free(struct crypt_queue *queue, bool purge);
struct multicore_worker __percpu *
wg_packet_percpu_multicore_worker_alloc(work_func_t function, void *ptr);

View File

@ -116,8 +116,8 @@ static void wg_receive_handshake_packet(struct wg_device *wg,
return;
}
under_load = skb_queue_len(&wg->incoming_handshakes) >=
MAX_QUEUED_INCOMING_HANDSHAKES / 8;
under_load = atomic_read(&wg->handshake_queue_len) >=
MAX_QUEUED_INCOMING_HANDSHAKES / 8;
if (under_load) {
last_under_load = ktime_get_coarse_boottime_ns();
} else if (last_under_load) {
@ -212,13 +212,14 @@ static void wg_receive_handshake_packet(struct wg_device *wg,
void wg_packet_handshake_receive_worker(struct work_struct *work)
{
struct wg_device *wg = container_of(work, struct multicore_worker,
work)->ptr;
struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->ptr;
struct wg_device *wg = container_of(queue, struct wg_device, handshake_queue);
struct sk_buff *skb;
while ((skb = skb_dequeue(&wg->incoming_handshakes)) != NULL) {
while ((skb = ptr_ring_consume_bh(&queue->ring)) != NULL) {
wg_receive_handshake_packet(wg, skb);
dev_kfree_skb(skb);
atomic_dec(&wg->handshake_queue_len);
cond_resched();
}
}
@ -554,21 +555,17 @@ void wg_packet_receive(struct wg_device *wg, struct sk_buff *skb)
case cpu_to_le32(MESSAGE_HANDSHAKE_RESPONSE):
case cpu_to_le32(MESSAGE_HANDSHAKE_COOKIE): {
int cpu;
if (skb_queue_len(&wg->incoming_handshakes) >
MAX_QUEUED_INCOMING_HANDSHAKES ||
unlikely(!rng_is_initialized())) {
if (unlikely(!rng_is_initialized() ||
ptr_ring_produce_bh(&wg->handshake_queue.ring, skb))) {
net_dbg_skb_ratelimited("%s: Dropping handshake packet from %pISpfsc\n",
wg->dev->name, skb);
goto err;
}
skb_queue_tail(&wg->incoming_handshakes, skb);
/* Queues up a call to packet_process_queued_handshake_
* packets(skb):
*/
cpu = wg_cpumask_next_online(&wg->incoming_handshake_cpu);
atomic_inc(&wg->handshake_queue_len);
cpu = wg_cpumask_next_online(&wg->handshake_queue.last_cpu);
/* Queues up a call to packet_process_queued_handshake_packets(skb): */
queue_work_on(cpu, wg->handshake_receive_wq,
&per_cpu_ptr(wg->incoming_handshakes_worker, cpu)->work);
&per_cpu_ptr(wg->handshake_queue.worker, cpu)->work);
break;
}
case cpu_to_le32(MESSAGE_DATA):