Thread (65 messages) 65 messages, 9 authors, 2019-06-17

Re: [PATCH 1/7] General notification queue with user mmap()'able ring buffer

From: Jann Horn <jannh@google.com>
Date: 2019-05-28 19:15:20
Also in: keyrings, linux-api, linux-block, linux-fsdevel, lkml

On Tue, May 28, 2019 at 6:03 PM David Howells [off-list ref] wrote:
Implement a misc device that implements a general notification queue as a
ring buffer that can be mmap()'d from userspace.
[...]
+receive notifications from the kernel.  This is can be used in conjunction
typo: s/is can/can/

[...]
+Overview
+========
+
+This facility appears as a misc device file that is opened and then mapped and
+polled.  Each time it is opened, it creates a new buffer specific to the
+returned file descriptor.  Then, when the opening process sets watches, it
+indicates that particular buffer it wants notifications from that watch to be
+written into. Note that there are no read() and write() methods (except for
s/that particular buffer/the particular buffer/
+debugging).  The user is expected to access the ring directly and to use poll
+to wait for new data.
[...]
+/**
+ * __post_watch_notification - Post an event notification
+ * @wlist: The watch list to post the event to.
+ * @n: The notification record to post.
+ * @cred: The creds of the process that triggered the notification.
+ * @id: The ID to match on the watch.
+ *
+ * Post a notification of an event into a set of watch queues and let the users
+ * know.
+ *
+ * If @n is NULL then WATCH_INFO_LENGTH will be set on the next event posted.
+ *
+ * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and
+ * should be in units of sizeof(*n).
+ */
+void __post_watch_notification(struct watch_list *wlist,
+                              struct watch_notification *n,
+                              const struct cred *cred,
+                              u64 id)
+{
+       const struct watch_filter *wf;
+       struct watch_queue *wqueue;
+       struct watch *watch;
+
+       rcu_read_lock();
+
+       hlist_for_each_entry_rcu(watch, &wlist->watchers, list_node) {
+               if (watch->id != id)
+                       continue;
+               n->info &= ~(WATCH_INFO_ID | WATCH_INFO_OVERRUN);
+               n->info |= watch->info_id;
+
+               wqueue = rcu_dereference(watch->queue);
+               wf = rcu_dereference(wqueue->filter);
+               if (wf && !filter_watch_notification(wf, n))
+                       continue;
+
+               post_one_notification(wqueue, n, cred);
+       }
+
+       rcu_read_unlock();
+}
+EXPORT_SYMBOL(__post_watch_notification);
[...]
+static vm_fault_t watch_queue_fault(struct vm_fault *vmf)
+{
+       struct watch_queue *wqueue = vmf->vma->vm_file->private_data;
+       struct page *page;
+
+       page = wqueue->pages[vmf->pgoff];
I don't see you setting any special properties on the VMA that would
prevent userspace from extending its size via mremap() - no
VM_DONTEXPAND or VM_PFNMAP. So I think you might get an out-of-bounds
access here?
+       get_page(page);
+       if (!lock_page_or_retry(page, vmf->vma->vm_mm, vmf->flags)) {
+               put_page(page);
+               return VM_FAULT_RETRY;
+       }
+       vmf->page = page;
+       return VM_FAULT_LOCKED;
+}
+
+static void watch_queue_map_pages(struct vm_fault *vmf,
+                                 pgoff_t start_pgoff, pgoff_t end_pgoff)
+{
+       struct watch_queue *wqueue = vmf->vma->vm_file->private_data;
+       struct page *page;
+
+       rcu_read_lock();
+
+       do {
+               page = wqueue->pages[start_pgoff];
Same as above.
+               if (trylock_page(page)) {
+                       vm_fault_t ret;
+                       get_page(page);
+                       ret = alloc_set_pte(vmf, NULL, page);
+                       if (ret != 0)
+                               put_page(page);
+
+                       unlock_page(page);
+               }
+       } while (++start_pgoff < end_pgoff);
+
+       rcu_read_unlock();
+}
[...]
+static int watch_queue_mmap(struct file *file, struct vm_area_struct *vma)
+{
+       struct watch_queue *wqueue = file->private_data;
+
+       if (vma->vm_pgoff != 0 ||
+           vma->vm_end - vma->vm_start > wqueue->nr_pages * PAGE_SIZE ||
+           !(pgprot_val(vma->vm_page_prot) & pgprot_val(PAGE_SHARED)))
+               return -EINVAL;
This thing should probably have locking against concurrent
watch_queue_set_size()?
+       vma->vm_ops = &watch_queue_vm_ops;
+
+       vma_interval_tree_insert(vma, &wqueue->mapping.i_mmap);
+       return 0;
+}
+
+/*
+ * Allocate the required number of pages.
+ */
+static long watch_queue_set_size(struct watch_queue *wqueue, unsigned long nr_pages)
+{
+       struct watch_queue_buffer *buf;
+       u32 len;
+       int i;
+
+       if (nr_pages == 0 ||
+           nr_pages > 16 || /* TODO: choose a better hard limit */
+           !is_power_of_2(nr_pages))
+               return -EINVAL;
+
+       wqueue->pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
+       if (!wqueue->pages)
+               goto err;
+
+       for (i = 0; i < nr_pages; i++) {
+               wqueue->pages[i] = alloc_page(GFP_KERNEL | __GFP_ZERO);
+               if (!wqueue->pages[i])
+                       goto err_some_pages;
+               wqueue->pages[i]->mapping = &wqueue->mapping;
+               SetPageUptodate(wqueue->pages[i]);
+       }
+
+       buf = vmap(wqueue->pages, nr_pages, VM_MAP, PAGE_SHARED);
+       if (!buf)
+               goto err_some_pages;
+
+       wqueue->buffer = buf;
+       wqueue->nr_pages = nr_pages;
+       wqueue->size = ((nr_pages * PAGE_SIZE) / sizeof(struct watch_notification));
+
+       /* The first four slots in the buffer contain metadata about the ring,
+        * including the head and tail indices and mask.
+        */
+       len = sizeof(buf->meta) / sizeof(buf->slots[0]);
+       buf->meta.watch.info    = len << WATCH_LENGTH_SHIFT;
+       buf->meta.watch.type    = WATCH_TYPE_META;
+       buf->meta.watch.subtype = WATCH_META_SKIP_NOTIFICATION;
+       buf->meta.tail          = len;
+       buf->meta.mask          = wqueue->size - 1;
+       smp_store_release(&buf->meta.head, len);
Why is this an smp_store_release()? The entire buffer should not be visible to
userspace before this setup is complete, right?
+       return 0;
+
+err_some_pages:
+       for (i--; i >= 0; i--) {
+               ClearPageUptodate(wqueue->pages[i]);
+               wqueue->pages[i]->mapping = NULL;
+               put_page(wqueue->pages[i]);
+       }
+
+       kfree(wqueue->pages);
+       wqueue->pages = NULL;
+err:
+       return -ENOMEM;
+}
[...]
+
+/*
+ * Set parameters.
+ */
+static long watch_queue_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
+{
+       struct watch_queue *wqueue = file->private_data;
+       struct inode *inode = file_inode(file);
+       long ret;
+
+       switch (cmd) {
+       case IOC_WATCH_QUEUE_SET_SIZE:
+               if (wqueue->buffer)
+                       return -EBUSY;
The preceding check occurs without any locks held and therefore has no
reliable effect. It should probably be moved below the
inode_lock(...).
+               inode_lock(inode);
+               ret = watch_queue_set_size(wqueue, arg);
+               inode_unlock(inode);
+               return ret;
+
+       case IOC_WATCH_QUEUE_SET_FILTER:
+               inode_lock(inode);
+               ret = watch_queue_set_filter(
+                       inode, wqueue,
+                       (struct watch_notification_filter __user *)arg);
+               inode_unlock(inode);
+               return ret;
+
+       default:
+               return -EOPNOTSUPP;
+       }
+}
[...]
+static void free_watch(struct rcu_head *rcu)
+{
+       struct watch *watch = container_of(rcu, struct watch, rcu);
+
+       put_watch_queue(rcu_access_pointer(watch->queue));
This should be rcu_dereference_protected(..., 1).
+/**
+ * remove_watch_from_object - Remove a watch or all watches from an object.
+ * @wlist: The watch list to remove from
+ * @wq: The watch queue of interest (ignored if @all is true)
+ * @id: The ID of the watch to remove (ignored if @all is true)
+ * @all: True to remove all objects
+ *
+ * Remove a specific watch or all watches from an object.  A notification is
+ * sent to the watcher to tell them that this happened.
+ */
+int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq,
+                            u64 id, bool all)
+{
+       struct watch_notification n;
+       struct watch_queue *wqueue;
+       struct watch *watch;
+       int ret = -EBADSLT;
+
+       rcu_read_lock();
+
+again:
+       spin_lock(&wlist->lock);
+       hlist_for_each_entry(watch, &wlist->watchers, list_node) {
+               if (all ||
+                   (watch->id == id && rcu_access_pointer(watch->queue) == wq))
+                       goto found;
+       }
+       spin_unlock(&wlist->lock);
+       goto out;
+
+found:
+       ret = 0;
+       hlist_del_init_rcu(&watch->list_node);
+       rcu_assign_pointer(watch->watch_list, NULL);
+       spin_unlock(&wlist->lock);
+
+       n.type = WATCH_TYPE_META;
+       n.subtype = WATCH_META_REMOVAL_NOTIFICATION;
+       n.info = watch->info_id | sizeof(n);
+
+       wqueue = rcu_dereference(watch->queue);
+       post_one_notification(wqueue, &n, wq ? wq->cred : NULL);
+
+       /* We don't need the watch list lock for the next bit as RCU is
+        * protecting everything from being deallocated.
Does "everything" mean "the wqueue" or more than that?
+        */
+       if (wqueue) {
+               spin_lock_bh(&wqueue->lock);
+
+               if (!hlist_unhashed(&watch->queue_node)) {
+                       hlist_del_init_rcu(&watch->queue_node);
+                       put_watch(watch);
+               }
+
+               spin_unlock_bh(&wqueue->lock);
+       }
+
+       if (wlist->release_watch) {
+               rcu_read_unlock();
+               wlist->release_watch(wlist, watch);
+               rcu_read_lock();
+       }
+       put_watch(watch);
+
+       if (all && !hlist_empty(&wlist->watchers))
+               goto again;
+out:
+       rcu_read_unlock();
+       return ret;
+}
+EXPORT_SYMBOL(remove_watch_from_object);
+
+/*
+ * Remove all the watches that are contributory to a queue.  This will
+ * potentially race with removal of the watches by the destruction of the
+ * objects being watched or the distribution of notifications.
+ */
+static void watch_queue_clear(struct watch_queue *wqueue)
+{
+       struct watch_list *wlist;
+       struct watch *watch;
+       bool release;
+
+       rcu_read_lock();
+       spin_lock_bh(&wqueue->lock);
+
+       /* Prevent new additions and prevent notifications from happening */
+       wqueue->defunct = true;
+
+       while (!hlist_empty(&wqueue->watches)) {
+               watch = hlist_entry(wqueue->watches.first, struct watch, queue_node);
+               hlist_del_init_rcu(&watch->queue_node);
+               spin_unlock_bh(&wqueue->lock);
+
+               /* We can't do the next bit under the queue lock as we need to
+                * get the list lock - which would cause a deadlock if someone
+                * was removing from the opposite direction at the same time or
+                * posting a notification.
+                */
+               wlist = rcu_dereference(watch->watch_list);
+               if (wlist) {
+                       spin_lock(&wlist->lock);
+
+                       release = !hlist_unhashed(&watch->list_node);
+                       if (release) {
+                               hlist_del_init_rcu(&watch->list_node);
+                               rcu_assign_pointer(watch->watch_list, NULL);
+                       }
+
+                       spin_unlock(&wlist->lock);
+
+                       if (release) {
+                               if (wlist->release_watch) {
+                                       rcu_read_unlock();
+                                       /* This might need to call dput(), so
+                                        * we have to drop all the locks.
+                                        */
+                                       wlist->release_watch(wlist, watch);
How are you holding a reference to `wlist` here? You got the reference through
rcu_dereference(), you've dropped the RCU read lock, and I don't see anything
that stabilizes the reference.
+                                       rcu_read_lock();
+                               }
+                               put_watch(watch);
+                       }
+               }
+
+               put_watch(watch);
+               spin_lock_bh(&wqueue->lock);
+       }
+
+       spin_unlock_bh(&wqueue->lock);
+       rcu_read_unlock();
+}
+
+/*
+ * Release the file.
+ */
+static int watch_queue_release(struct inode *inode, struct file *file)
+{
+       struct watch_filter *wfilter;
+       struct watch_queue *wqueue = file->private_data;
+       int i, pgref;
+
+       watch_queue_clear(wqueue);
+
+       if (wqueue->pages && wqueue->pages[0])
+               WARN_ON(page_ref_count(wqueue->pages[0]) != 1);
Is there a reason why there couldn't still be references to the pages
from get_user_pages()/get_user_pages_fast()?
+       if (wqueue->buffer)
+               vfree(wqueue->buffer);
+       for (i = 0; i < wqueue->nr_pages; i++) {
+               ClearPageUptodate(wqueue->pages[i]);
+               wqueue->pages[i]->mapping = NULL;
+               pgref = page_ref_count(wqueue->pages[i]);
+               WARN(pgref != 1,
+                    "FREE PAGE[%d] refcount %d\n", i, page_ref_count(wqueue->pages[i]));
+               __free_page(wqueue->pages[i]);
+       }
+
+       wfilter = rcu_access_pointer(wqueue->filter);
Again, rcu_dereference_protected(..., 1).
+       if (wfilter)
+               kfree_rcu(wfilter, rcu);
+       kfree(wqueue->pages);
+       put_cred(wqueue->cred);
+       put_watch_queue(wqueue);
+       return 0;
+}
+
+#ifdef DEBUG_WITH_WRITE
+static ssize_t watch_queue_write(struct file *file,
+                                const char __user *_buf, size_t len, loff_t *pos)
+{
+       struct watch_notification *n;
+       struct watch_queue *wqueue = file->private_data;
+       ssize_t ret;
+
+       if (!wqueue->buffer)
+               return -ENOBUFS;
+
+       if (len & ~WATCH_INFO_LENGTH || len == 0 || !_buf)
+               return -EINVAL;
+
+       n = memdup_user(_buf, len);
+       if (IS_ERR(n))
+               return PTR_ERR(n);
+
+       ret = -EINVAL;
+       if ((n->info & WATCH_INFO_LENGTH) != len)
+               goto error;
+       n->info &= (WATCH_INFO_LENGTH | WATCH_INFO_TYPE_FLAGS | WATCH_INFO_ID);
Should the non-atomic modification of n->info (and perhaps also the
following uses of ->debug) be protected by some lock?
+       if (post_one_notification(wqueue, n, file->f_cred))
+               wqueue->debug = 0;
+       else
+               wqueue->debug++;
+       ret = len;
+       if (wqueue->debug > 20)
+               ret = -EIO;
+
+error:
+       kfree(n);
+       return ret;
+}
+#endif
[...]
+#define IOC_WATCH_QUEUE_SET_SIZE       _IO('s', 0x01)  /* Set the size in pages */
+#define IOC_WATCH_QUEUE_SET_FILTER     _IO('s', 0x02)  /* Set the filter */
Should these ioctl numbers be registered in
Documentation/ioctl/ioctl-number.txt?
Keyboard shortcuts
hback out one level
jnext message in thread
kprevious message in thread
ldrill in
Escclose help / fold thread tree
?toggle this help