Re: [PATCH bpf-next 1/6] bpf: implement BPF ring buffer and verifier support for it
From: Alexei Starovoitov <hidden>
Date: 2020-05-14 19:07:03
Also in:
bpf
On Wed, May 13, 2020 at 12:25:27PM -0700, Andrii Nakryiko wrote:
+
+/* Given pointer to ring buffer record metadata, restore pointer to struct
+ * bpf_ringbuf itself by using page offset stored at offset 4
+ */
+static struct bpf_ringbuf *bpf_ringbuf_restore_from_rec(void *meta_ptr)
+{
+ unsigned long addr = (unsigned long)meta_ptr;
+ unsigned long off = *(u32 *)(meta_ptr + 4) << PAGE_SHIFT;Looking at the further code it seems this one should be READ_ONCE, but...
+
+ return (void*)((addr & PAGE_MASK) - off);
+}
+
+static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
+{
+ unsigned long cons_pos, prod_pos, new_prod_pos, flags;
+ u32 len, pg_off;
+ void *meta_ptr;
+
+ if (unlikely(size > UINT_MAX))
+ return NULL;
+
+ len = round_up(size + RINGBUF_META_SZ, 8);it may overflow despite the check above.
+ cons_pos = READ_ONCE(rb->consumer_pos);
+
+ if (in_nmi()) {
+ if (!spin_trylock_irqsave(&rb->spinlock, flags))
+ return NULL;
+ } else {
+ spin_lock_irqsave(&rb->spinlock, flags);
+ }
+
+ prod_pos = rb->producer_pos;
+ new_prod_pos = prod_pos + len;
+
+ /* check for out of ringbuf space by ensuring producer position
+ * doesn't advance more than (ringbuf_size - 1) ahead
+ */
+ if (new_prod_pos - cons_pos > rb->mask) {
+ spin_unlock_irqrestore(&rb->spinlock, flags);
+ return NULL;
+ }
+
+ meta_ptr = rb->data + (prod_pos & rb->mask);
+ pg_off = bpf_ringbuf_rec_pg_off(rb, meta_ptr);
+
+ WRITE_ONCE(*(u32 *)meta_ptr, RINGBUF_BUSY_BIT | size);
+ WRITE_ONCE(*(u32 *)(meta_ptr + 4), pg_off);it doens't match to few other places where normal read is done. But why WRITE_ONCE here? How does it race with anything? producer_pos is updated later.
+ + /* ensure length prefix is written before updating producer positions */ + smp_wmb();
this barrier is enough to make sure meta_ptr and meta_ptr+4 init is visible before producer_pos is updated below.
+ WRITE_ONCE(rb->producer_pos, new_prod_pos);
+
+ spin_unlock_irqrestore(&rb->spinlock, flags);
+
+ return meta_ptr + RINGBUF_META_SZ;
+}
+
+BPF_CALL_3(bpf_ringbuf_reserve, struct bpf_map *, map, u64, size, u64, flags)
+{
+ struct bpf_ringbuf_map *rb_map;
+
+ if (unlikely(flags))
+ return -EINVAL;
+
+ rb_map = container_of(map, struct bpf_ringbuf_map, map);
+ return (unsigned long)__bpf_ringbuf_reserve(rb_map->rb, size);
+}
+
+const struct bpf_func_proto bpf_ringbuf_reserve_proto = {
+ .func = bpf_ringbuf_reserve,
+ .ret_type = RET_PTR_TO_ALLOC_MEM_OR_NULL,
+ .arg1_type = ARG_CONST_MAP_PTR,
+ .arg2_type = ARG_CONST_ALLOC_SIZE_OR_ZERO,
+ .arg3_type = ARG_ANYTHING,
+};
+
+static void bpf_ringbuf_commit(void *sample, bool discard)
+{
+ unsigned long rec_pos, cons_pos;
+ u32 new_meta, old_meta;
+ void *meta_ptr;
+ struct bpf_ringbuf *rb;
+
+ meta_ptr = sample - RINGBUF_META_SZ;
+ rb = bpf_ringbuf_restore_from_rec(meta_ptr);
+ old_meta = *(u32 *)meta_ptr;I think this one will race with user space and should be READ_ONCE.
+ new_meta = old_meta ^ RINGBUF_BUSY_BIT; + if (discard) + new_meta |= RINGBUF_DISCARD_BIT; + + /* update metadata header with correct final size prefix */ + xchg((u32 *)meta_ptr, new_meta); + + /* if consumer caught up and is waiting for our record, notify about + * new data availability + */ + rec_pos = (void *)meta_ptr - (void *)rb->data; + cons_pos = smp_load_acquire(&rb->consumer_pos) & rb->mask;
hmm. Earlier WRITE_ONCE(rb->producer_pos) is used, but here it's load_acquire. Please be consistent with pairing.
+ if (cons_pos == rec_pos) + wake_up_all(&rb->waitq);
Is it legal to do from preempt_disabled region?