Re: [RFC PATCH 03/21] pipe: Use head and tail pointers for the ring, not cursor and length
From: Rasmus Villemoes <linux@rasmusvillemoes.dk>
Date: 2019-10-16 07:46:21
Also in:
keyrings, linux-block, linux-fsdevel, linux-security-module, linux-usb, lkml
On 15/10/2019 23.48, David Howells wrote:
Convert pipes to use head and tail pointers for the buffer ring rather than
pointer and length as the latter requires two atomic ops to update (or a
combined op) whereas the former only requires one.
(1) The head pointer is the point at which production occurs and points to
the slot in which the next buffer will be placed. This is equivalent
to pipe->curbuf + pipe->nrbufs.
The head pointer belongs to the write-side.
(2) The tail pointer is the point at which consumption occurs. It points
to the next slot to be consumed. This is equivalent to pipe->curbuf.
The tail pointer belongs to the read-side.
(3) head and tail are allowed to run to UINT_MAX and wrap naturally. They
are only masked off when the array is being accessed, e.g.:
pipe->bufs[head & mask]
This means that it is not necessary to have a dead slot in the ring as
head == tail isn't ambiguous.
(4) The ring is empty if "head == tail".
(5) The occupancy of the ring is "head - tail".
(6) The number of free slots in the ring is "(tail + pipe->ring_size) -
head".Seems an odd way of writing pipe->ring_size - (head - tail) ; i.e. obviously #free slots is #size minus #occupancy.
(7) The ring is full if "head >= (tail + pipe->ring_size)", which can also
be written as "head - tail >= pipe->ring_size".No it cannot, it _must_ be written in the latter form. Assuming sizeof(int)==1 for simplicity, consider ring_size = 16, tail = 240. Regardless whether head is 240, 241, ..., 255, 0, tail + ring_size wraps to 0, so the former expression states the ring is full in all cases. Better spell out somewhere that while head and tail are free-running, at any point in time they satisfy the invariant head - tail <= pipe_size (and also 0 <= head - tail, but that's a tautology for unsigned ints...). Then it's a matter of taste if one wants to write "full" as head-tail == pipe_size or head-tail >= pipe_size.
Also split pipe->buffers into pipe->ring_size (which indicates the size of the ring) and pipe->max_usage (which restricts the amount of ring that write() is allowed to fill). This allows for a pipe that is both writable by the kernel notification facility and by userspace, allowing plenty of ring space for notifications to be added whilst preventing userspace from being able to use up too much buffer space.
That seems like something that should be added in a separate patch - adding ->max_usage and switching appropriate users of ->ring_size over, so it's more clear where you're using one or the other.
quoted hunk ↗ jump to hunk
@@ -1949,8 +1950,12 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, pipe_lock(pipe); - bufs = kvmalloc_array(pipe->nrbufs, sizeof(struct pipe_buffer), - GFP_KERNEL); + head = pipe->head; + tail = pipe->tail; + mask = pipe->ring_size - 1; + count = head - tail; + + bufs = kvmalloc_array(count, sizeof(struct pipe_buffer), GFP_KERNEL); if (!bufs) { pipe_unlock(pipe); return -ENOMEM;@@ -1958,8 +1963,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, nbuf = 0; rem = 0; - for (idx = 0; idx < pipe->nrbufs && rem < len; idx++) - rem += pipe->bufs[(pipe->curbuf + idx) & (pipe->buffers - 1)].len; + for (idx = tail; idx < head && rem < len; idx++) + rem += pipe->bufs[idx & mask].len; ret = -EINVAL; if (rem < len)@@ -1970,16 +1975,16 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, struct pipe_buffer *ibuf; struct pipe_buffer *obuf; - BUG_ON(nbuf >= pipe->buffers); - BUG_ON(!pipe->nrbufs); - ibuf = &pipe->bufs[pipe->curbuf]; + BUG_ON(nbuf >= pipe->ring_size); + BUG_ON(tail == head); + ibuf = &pipe->bufs[tail];
I don't see where tail gets masked between tail = pipe->tail; above and here, but I may be missing it. In any case, how about seeding head and tail with something like 1<<20 when creating the pipe so bugs like that are hit more quickly.
quoted hunk ↗ jump to hunk
@@ -515,17 +525,19 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) static long pipe_ioctl(struct file *filp, unsigned int cmd, unsigned long arg) { struct pipe_inode_info *pipe = filp->private_data; - int count, buf, nrbufs; + int count, head, tail, mask; switch (cmd) { case FIONREAD: __pipe_lock(pipe); count = 0; - buf = pipe->curbuf; - nrbufs = pipe->nrbufs; - while (--nrbufs >= 0) { - count += pipe->bufs[buf].len; - buf = (buf+1) & (pipe->buffers - 1); + head = pipe->head; + tail = pipe->tail; + mask = pipe->ring_size - 1; + + while (tail < head) { + count += pipe->bufs[tail & mask].len; + tail++; }
This is broken if head has wrapped but tail has not. It has to be "while (head - tail)" or perhaps just "while (tail != head)" or something along those lines.
quoted hunk ↗ jump to hunk
@@ -1086,17 +1104,21 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg) } /* - * We can shrink the pipe, if arg >= pipe->nrbufs. Since we don't - * expect a lot of shrink+grow operations, just free and allocate - * again like we would do for growing. If the pipe currently + * We can shrink the pipe, if arg is greater than the ring occupancy. + * Since we don't expect a lot of shrink+grow operations, just free and + * allocate again like we would do for growing. If the pipe currently * contains more buffers than arg, then return busy. */ - if (nr_pages < pipe->nrbufs) { + mask = pipe->ring_size - 1; + head = pipe->head & mask; + tail = pipe->tail & mask; + n = pipe->head - pipe->tail;
I think it's confusing to "premask" head and tail here. Can you either drop that (pipe_set_size should hardly be a hot path?), or perhaps call them something else to avoid a future reader seeing an unmasked bufs[head] and thinking that's a bug?
quoted hunk ↗ jump to hunk
@@ -1254,9 +1290,10 @@ static ssize_t pipe_get_pages(struct iov_iter *i, struct page **pages, size_t maxsize, unsigned maxpages, size_t *start) { + unsigned int p_tail; + unsigned int i_head; unsigned npages; size_t capacity; - int idx; if (!maxsize) return 0;@@ -1264,12 +1301,15 @@ static ssize_t pipe_get_pages(struct iov_iter *i, if (!sanity(i)) return -EFAULT; - data_start(i, &idx, start); - /* some of this one + all after this one */ - npages = ((i->pipe->curbuf - idx - 1) & (i->pipe->buffers - 1)) + 1; - capacity = min(npages,maxpages) * PAGE_SIZE - *start; + data_start(i, &i_head, start); + p_tail = i->pipe->tail; + /* Amount of free space: some of this one + all after this one */ + npages = (p_tail + i->pipe->ring_size) - i_head;
Hm, it's not clear that this is equivalent to the old computation. Since it seems repeated in a few places, could it be factored to a little helper (before this patch) and the "some of this one + all after this one" comment perhaps expanded to explain what is going on? Rasmus