summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/xdp/xsk_queue.h56
1 files changed, 52 insertions, 4 deletions
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 610c0bdc0c2b..88b9ae24658d 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -43,6 +43,48 @@ struct xsk_queue {
u64 invalid_descs;
};
+/* The structure of the shared state of the rings are the same as the
+ * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
+ * ring, the kernel is the producer and user space is the consumer. For
+ * the Tx and fill rings, the kernel is the consumer and user space is
+ * the producer.
+ *
+ * producer consumer
+ *
+ * if (LOAD ->consumer) { LOAD ->producer
+ * (A) smp_rmb() (C)
+ * STORE $data LOAD $data
+ * smp_wmb() (B) smp_mb() (D)
+ * STORE ->producer STORE ->consumer
+ * }
+ *
+ * (A) pairs with (D), and (B) pairs with (C).
+ *
+ * Starting with (B), it protects the data from being written after
+ * the producer pointer. If this barrier was missing, the consumer
+ * could observe the producer pointer being set and thus load the data
+ * before the producer has written the new data. The consumer would in
+ * this case load the old data.
+ *
+ * (C) protects the consumer from speculatively loading the data before
+ * the producer pointer actually has been read. If we do not have this
+ * barrier, some architectures could load old data as speculative loads
+ * are not discarded as the CPU does not know there is a dependency
+ * between ->producer and data.
+ *
+ * (A) is a control dependency that separates the load of ->consumer
+ * from the stores of $data. In case ->consumer indicates there is no
+ * room in the buffer to store $data we do not. So no barrier is needed.
+ *
+ * (D) protects the load of the data to be observed to happen after the
+ * store of the consumer pointer. If we did not have this memory
+ * barrier, the producer could observe the consumer pointer being set
+ * and overwrite the data with a new value before the consumer got the
+ * chance to read the old value. The consumer would thus miss reading
+ * the old entry and very likely read the new entry twice, once right
+ * now and again after circling through the ring.
+ */
+
/* Common functions operating for both RXTX and umem queues */
static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
@@ -106,6 +148,7 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr)
static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr)
{
if (q->cons_tail == q->cons_head) {
+ smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
@@ -128,10 +171,11 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
if (xskq_nb_free(q, q->prod_tail, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
ring->desc[q->prod_tail++ & q->ring_mask] = addr;
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
WRITE_ONCE(q->ring->producer, q->prod_tail);
return 0;
@@ -144,6 +188,7 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
return -ENOSPC;
+ /* A, matches D */
ring->desc[q->prod_head++ & q->ring_mask] = addr;
return 0;
}
@@ -152,7 +197,7 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
u32 nb_entries)
{
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
q->prod_tail += nb_entries;
WRITE_ONCE(q->ring->producer, q->prod_tail);
@@ -163,6 +208,7 @@ static inline int xskq_reserve_addr(struct xsk_queue *q)
if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
q->prod_head++;
return 0;
}
@@ -204,11 +250,12 @@ static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
struct xdp_desc *desc)
{
if (q->cons_tail == q->cons_head) {
+ smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
/* Order consumer and data */
- smp_rmb();
+ smp_rmb(); /* C, matches B */
}
return xskq_validate_desc(q, desc);
@@ -228,6 +275,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
idx = (q->prod_head++) & q->ring_mask;
ring->desc[idx].addr = addr;
ring->desc[idx].len = len;
@@ -238,7 +286,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
static inline void xskq_produce_flush_desc(struct xsk_queue *q)
{
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
q->prod_tail = q->prod_head,
WRITE_ONCE(q->ring->producer, q->prod_tail);