mirror of
https://github.com/torvalds/linux.git
synced 2024-12-04 18:13:04 +00:00
Merge branch 'load-acquire/store-release barriers for'
Björn Töpel says:
====================
This two-patch series introduces load-acquire/store-release barriers
for the AF_XDP rings.
For most contemporary architectures, this is more effective than a
SPSC ring based on smp_{r,w,}mb() barriers. More importantly,
load-acquire/store-release semantics make the ring code easier to
follow.
This is effectively the change done in commit 6c43c091bd
("documentation: Update circular buffer for
load-acquire/store-release"), but for the AF_XDP rings.
Both libbpf and the kernel-side are updated.
Full details are outlined in the commits!
Thanks to the LKMM-folks (Paul/Alan/Will) for helping me out in this
complicated matter!
Changelog
v1[1]->v2:
* Expanded the commit message for patch 1, and included the LKMM
litmus tests. Hopefully this clear things up. (Daniel)
* Clarified why the smp_mb()/smp_load_acquire() is not needed in (A);
control dependency with load to store. (Toke)
[1] https://lore.kernel.org/bpf/20210301104318.263262-1-bjorn.topel@gmail.com/
Thanks,
Björn
====================
Signed-off-by: Andrii Nakryiko <andrii@kernel.org>
This commit is contained in:
commit
bbb41728e6
@ -47,19 +47,18 @@ struct xsk_queue {
|
||||
u64 queue_empty_descs;
|
||||
};
|
||||
|
||||
/* The structure of the shared state of the rings are the same as the
|
||||
* ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
|
||||
* ring, the kernel is the producer and user space is the consumer. For
|
||||
* the Tx and fill rings, the kernel is the consumer and user space is
|
||||
* the producer.
|
||||
/* The structure of the shared state of the rings are a simple
|
||||
* circular buffer, as outlined in
|
||||
* Documentation/core-api/circular-buffers.rst. For the Rx and
|
||||
* completion ring, the kernel is the producer and user space is the
|
||||
* consumer. For the Tx and fill rings, the kernel is the consumer and
|
||||
* user space is the producer.
|
||||
*
|
||||
* producer consumer
|
||||
*
|
||||
* if (LOAD ->consumer) { LOAD ->producer
|
||||
* (A) smp_rmb() (C)
|
||||
* if (LOAD ->consumer) { (A) LOAD.acq ->producer (C)
|
||||
* STORE $data LOAD $data
|
||||
* smp_wmb() (B) smp_mb() (D)
|
||||
* STORE ->producer STORE ->consumer
|
||||
* STORE.rel ->producer (B) STORE.rel ->consumer (D)
|
||||
* }
|
||||
*
|
||||
* (A) pairs with (D), and (B) pairs with (C).
|
||||
@ -78,7 +77,8 @@ struct xsk_queue {
|
||||
*
|
||||
* (A) is a control dependency that separates the load of ->consumer
|
||||
* from the stores of $data. In case ->consumer indicates there is no
|
||||
* room in the buffer to store $data we do not. So no barrier is needed.
|
||||
* room in the buffer to store $data we do not. The dependency will
|
||||
* order both of the stores after the loads. So no barrier is needed.
|
||||
*
|
||||
* (D) protects the load of the data to be observed to happen after the
|
||||
* store of the consumer pointer. If we did not have this memory
|
||||
@ -227,15 +227,13 @@ static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
|
||||
|
||||
static inline void __xskq_cons_release(struct xsk_queue *q)
|
||||
{
|
||||
smp_mb(); /* D, matches A */
|
||||
WRITE_ONCE(q->ring->consumer, q->cached_cons);
|
||||
smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
|
||||
}
|
||||
|
||||
static inline void __xskq_cons_peek(struct xsk_queue *q)
|
||||
{
|
||||
/* Refresh the local pointer */
|
||||
q->cached_prod = READ_ONCE(q->ring->producer);
|
||||
smp_rmb(); /* C, matches B */
|
||||
q->cached_prod = smp_load_acquire(&q->ring->producer); /* C, matches B */
|
||||
}
|
||||
|
||||
static inline void xskq_cons_get_entries(struct xsk_queue *q)
|
||||
@ -397,9 +395,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
|
||||
|
||||
static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
|
||||
{
|
||||
smp_wmb(); /* B, matches C */
|
||||
|
||||
WRITE_ONCE(q->ring->producer, idx);
|
||||
smp_store_release(&q->ring->producer, idx); /* B, matches C */
|
||||
}
|
||||
|
||||
static inline void xskq_prod_submit(struct xsk_queue *q)
|
||||
|
@ -5,6 +5,7 @@
|
||||
#define __LIBBPF_LIBBPF_UTIL_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <linux/compiler.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
@ -15,29 +16,56 @@ extern "C" {
|
||||
* application that uses libbpf.
|
||||
*/
|
||||
#if defined(__i386__) || defined(__x86_64__)
|
||||
# define libbpf_smp_rmb() asm volatile("" : : : "memory")
|
||||
# define libbpf_smp_wmb() asm volatile("" : : : "memory")
|
||||
# define libbpf_smp_mb() \
|
||||
asm volatile("lock; addl $0,-4(%%rsp)" : : : "memory", "cc")
|
||||
/* Hinders stores to be observed before older loads. */
|
||||
# define libbpf_smp_rwmb() asm volatile("" : : : "memory")
|
||||
# define libbpf_smp_store_release(p, v) \
|
||||
do { \
|
||||
asm volatile("" : : : "memory"); \
|
||||
WRITE_ONCE(*p, v); \
|
||||
} while (0)
|
||||
# define libbpf_smp_load_acquire(p) \
|
||||
({ \
|
||||
typeof(*p) ___p1 = READ_ONCE(*p); \
|
||||
asm volatile("" : : : "memory"); \
|
||||
___p1; \
|
||||
})
|
||||
#elif defined(__aarch64__)
|
||||
# define libbpf_smp_rmb() asm volatile("dmb ishld" : : : "memory")
|
||||
# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory")
|
||||
# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory")
|
||||
# define libbpf_smp_rwmb() libbpf_smp_mb()
|
||||
#elif defined(__arm__)
|
||||
/* These are only valid for armv7 and above */
|
||||
# define libbpf_smp_rmb() asm volatile("dmb ish" : : : "memory")
|
||||
# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory")
|
||||
# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory")
|
||||
# define libbpf_smp_rwmb() libbpf_smp_mb()
|
||||
#else
|
||||
/* Architecture missing native barrier functions. */
|
||||
# define libbpf_smp_rmb() __sync_synchronize()
|
||||
# define libbpf_smp_wmb() __sync_synchronize()
|
||||
# define libbpf_smp_mb() __sync_synchronize()
|
||||
# define libbpf_smp_rwmb() __sync_synchronize()
|
||||
# define libbpf_smp_store_release(p, v) \
|
||||
asm volatile ("stlr %w1, %0" : "=Q" (*p) : "r" (v) : "memory")
|
||||
# define libbpf_smp_load_acquire(p) \
|
||||
({ \
|
||||
typeof(*p) ___p1; \
|
||||
asm volatile ("ldar %w0, %1" \
|
||||
: "=r" (___p1) : "Q" (*p) : "memory"); \
|
||||
__p1; \
|
||||
})
|
||||
#elif defined(__riscv)
|
||||
# define libbpf_smp_store_release(p, v) \
|
||||
do { \
|
||||
asm volatile ("fence rw,w" : : : "memory"); \
|
||||
WRITE_ONCE(*p, v); \
|
||||
} while (0)
|
||||
# define libbpf_smp_load_acquire(p) \
|
||||
({ \
|
||||
typeof(*p) ___p1 = READ_ONCE(*p); \
|
||||
asm volatile ("fence r,rw" : : : "memory"); \
|
||||
___p1; \
|
||||
})
|
||||
#endif
|
||||
|
||||
#ifndef libbpf_smp_store_release
|
||||
#define libbpf_smp_store_release(p, v) \
|
||||
do { \
|
||||
__sync_synchronize(); \
|
||||
WRITE_ONCE(*p, v); \
|
||||
} while (0)
|
||||
#endif
|
||||
|
||||
#ifndef libbpf_smp_load_acquire
|
||||
#define libbpf_smp_load_acquire(p) \
|
||||
({ \
|
||||
typeof(*p) ___p1 = READ_ONCE(*p); \
|
||||
__sync_synchronize(); \
|
||||
___p1; \
|
||||
})
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -96,7 +96,8 @@ static inline __u32 xsk_prod_nb_free(struct xsk_ring_prod *r, __u32 nb)
|
||||
* this function. Without this optimization it whould have been
|
||||
* free_entries = r->cached_prod - r->cached_cons + r->size.
|
||||
*/
|
||||
r->cached_cons = *r->consumer + r->size;
|
||||
r->cached_cons = libbpf_smp_load_acquire(r->consumer);
|
||||
r->cached_cons += r->size;
|
||||
|
||||
return r->cached_cons - r->cached_prod;
|
||||
}
|
||||
@ -106,7 +107,7 @@ static inline __u32 xsk_cons_nb_avail(struct xsk_ring_cons *r, __u32 nb)
|
||||
__u32 entries = r->cached_prod - r->cached_cons;
|
||||
|
||||
if (entries == 0) {
|
||||
r->cached_prod = *r->producer;
|
||||
r->cached_prod = libbpf_smp_load_acquire(r->producer);
|
||||
entries = r->cached_prod - r->cached_cons;
|
||||
}
|
||||
|
||||
@ -129,9 +130,7 @@ static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, __u32 nb)
|
||||
/* Make sure everything has been written to the ring before indicating
|
||||
* this to the kernel by writing the producer pointer.
|
||||
*/
|
||||
libbpf_smp_wmb();
|
||||
|
||||
*prod->producer += nb;
|
||||
libbpf_smp_store_release(prod->producer, *prod->producer + nb);
|
||||
}
|
||||
|
||||
static inline __u32 xsk_ring_cons__peek(struct xsk_ring_cons *cons, __u32 nb, __u32 *idx)
|
||||
@ -139,11 +138,6 @@ static inline __u32 xsk_ring_cons__peek(struct xsk_ring_cons *cons, __u32 nb, __
|
||||
__u32 entries = xsk_cons_nb_avail(cons, nb);
|
||||
|
||||
if (entries > 0) {
|
||||
/* Make sure we do not speculatively read the data before
|
||||
* we have received the packet buffers from the ring.
|
||||
*/
|
||||
libbpf_smp_rmb();
|
||||
|
||||
*idx = cons->cached_cons;
|
||||
cons->cached_cons += entries;
|
||||
}
|
||||
@ -161,9 +155,8 @@ static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, __u32 nb)
|
||||
/* Make sure data has been read before indicating we are done
|
||||
* with the entries by updating the consumer pointer.
|
||||
*/
|
||||
libbpf_smp_rwmb();
|
||||
libbpf_smp_store_release(cons->consumer, *cons->consumer + nb);
|
||||
|
||||
*cons->consumer += nb;
|
||||
}
|
||||
|
||||
static inline void *xsk_umem__get_data(void *umem_area, __u64 addr)
|
||||
|
Loading…
Reference in New Issue
Block a user