Thread (31 messages) 31 messages, 4 authors, 2021-05-11

Re: [PATCH RFC 05/15] fsnotify: Support event submission through ring buffer

From: Amir Goldstein <amir73il@gmail.com>
Date: 2021-04-27 05:39:53
Also in: linux-fsdevel

On Mon, Apr 26, 2021 at 9:42 PM Gabriel Krisman Bertazi
[off-list ref] wrote:
quoted hunk ↗ jump to hunk
In order to support file system health/error reporting over fanotify,
fsnotify needs to expose a submission path that doesn't allow sleeping.
The only problem I identified with the current submission path is the
need to dynamically allocate memory for the event queue.

This patch avoids the problem by introducing a new mode in fsnotify,
where a ring buffer is used to submit events for a group.  Each group
has its own ring buffer, and error notifications are submitted
exclusively through it.

Signed-off-by: Gabriel Krisman Bertazi <redacted>
---
 fs/notify/Makefile               |   2 +-
 fs/notify/group.c                |  12 +-
 fs/notify/notification.c         |  10 ++
 fs/notify/ring.c                 | 199 +++++++++++++++++++++++++++++++
 include/linux/fsnotify_backend.h |  37 +++++-
 5 files changed, 255 insertions(+), 5 deletions(-)
 create mode 100644 fs/notify/ring.c
diff --git a/fs/notify/Makefile b/fs/notify/Makefile
index 63a4b8828df4..61dae1e90f2d 100644
--- a/fs/notify/Makefile
+++ b/fs/notify/Makefile
@@ -1,6 +1,6 @@
 # SPDX-License-Identifier: GPL-2.0
 obj-$(CONFIG_FSNOTIFY)         += fsnotify.o notification.o group.o mark.o \
-                                  fdinfo.o
+                                  fdinfo.o ring.o

 obj-y                  += dnotify/
 obj-y                  += inotify/
diff --git a/fs/notify/group.c b/fs/notify/group.c
index 08acb1afc0c2..b99b3de36696 100644
--- a/fs/notify/group.c
+++ b/fs/notify/group.c
@@ -81,7 +81,10 @@ void fsnotify_destroy_group(struct fsnotify_group *group)
         * notification against this group. So clearing the notification queue
         * of all events is reliable now.
         */
-       fsnotify_flush_notify(group);
+       if (group->flags & FSN_SUBMISSION_RING_BUFFER)
+               fsnotify_free_ring_buffer(group);
+       else
+               fsnotify_flush_notify(group);

        /*
         * Destroy overflow event (we cannot use fsnotify_destroy_event() as
@@ -136,6 +139,13 @@ static struct fsnotify_group *__fsnotify_alloc_group(
        group->ops = ops;
        group->flags = flags;

+       if (group->flags & FSN_SUBMISSION_RING_BUFFER) {
+               if (fsnotify_create_ring_buffer(group)) {
+                       kfree(group);
+                       return ERR_PTR(-ENOMEM);
+               }
+       }
+
        return group;
 }
diff --git a/fs/notify/notification.c b/fs/notify/notification.c
index 75d79d6d3ef0..32f97e7b7a80 100644
--- a/fs/notify/notification.c
+++ b/fs/notify/notification.c
@@ -51,6 +51,10 @@ EXPORT_SYMBOL_GPL(fsnotify_get_cookie);
 bool fsnotify_notify_queue_is_empty(struct fsnotify_group *group)
 {
        assert_spin_locked(&group->notification_lock);
+
+       if (group->flags & FSN_SUBMISSION_RING_BUFFER)
+               return fsnotify_ring_notify_queue_is_empty(group);
+
        return list_empty(&group->notification_list) ? true : false;
 }
@@ -132,6 +136,9 @@ void fsnotify_remove_queued_event(struct fsnotify_group *group,
                                  struct fsnotify_event *event)
 {
        assert_spin_locked(&group->notification_lock);
+
+       if (group->flags & FSN_SUBMISSION_RING_BUFFER)
+               return;
        /*
         * We need to init list head for the case of overflow event so that
         * check in fsnotify_add_event() works
@@ -166,6 +173,9 @@ struct fsnotify_event *fsnotify_peek_first_event(struct fsnotify_group *group)
 {
        assert_spin_locked(&group->notification_lock);

+       if (group->flags & FSN_SUBMISSION_RING_BUFFER)
+               return fsnotify_ring_peek_first_event(group);
+
        return list_first_entry(&group->notification_list,
                                struct fsnotify_event, list);
 }
diff --git a/fs/notify/ring.c b/fs/notify/ring.c
new file mode 100644
index 000000000000..75e8af1f8d80
--- /dev/null
+++ b/fs/notify/ring.c
@@ -0,0 +1,199 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <linux/types.h>
+#include <linux/fsnotify.h>
+#include <linux/memcontrol.h>
+
+#define INVALID_RING_SLOT -1
+
+#define FSNOTIFY_RING_PAGES 16
+
+#define NEXT_SLOT(cur, len, ring_size) ((cur + len) & (ring_size-1))
+#define NEXT_PAGE(cur, ring_size) (round_up(cur, PAGE_SIZE) & (ring_size-1))
+
+bool fsnotify_ring_notify_queue_is_empty(struct fsnotify_group *group)
+{
+       assert_spin_locked(&group->notification_lock);
+
+       if (group->ring_buffer.tail == group->ring_buffer.head)
+               return true;
+       return false;
+}
+
+struct fsnotify_event *fsnotify_ring_peek_first_event(struct fsnotify_group *group)
+{
+       u64 ring_size = group->ring_buffer.nr_pages << PAGE_SHIFT;
+       struct fsnotify_event *fsn;
+       char *kaddr;
+       u64 tail;
+
+       assert_spin_locked(&group->notification_lock);
+
+again:
+       tail = group->ring_buffer.tail;
+
+       if ((PAGE_SIZE - (tail & (PAGE_SIZE-1))) < sizeof(struct fsnotify_event)) {
+               group->ring_buffer.tail = NEXT_PAGE(tail, ring_size);
+               goto again;
+       }
+
+       kaddr = kmap_atomic(group->ring_buffer.pages[tail / PAGE_SIZE]);
+       if (!kaddr)
+               return NULL;
+       fsn = (struct fsnotify_event *) (kaddr + (tail & (PAGE_SIZE-1)));
+
+       if (fsn->slot_len == INVALID_RING_SLOT) {
+               group->ring_buffer.tail = NEXT_PAGE(tail, ring_size);
+               kunmap_atomic(kaddr);
+               goto again;
+       }
+
+       /* will be unmapped when entry is consumed. */
+       return fsn;
+}
+
+void fsnotify_ring_buffer_consume_event(struct fsnotify_group *group,
+                                       struct fsnotify_event *event)
+{
+       u64 ring_size = group->ring_buffer.nr_pages << PAGE_SHIFT;
+       u64 new_tail = NEXT_SLOT(group->ring_buffer.tail, event->slot_len, ring_size);
+
+       kunmap_atomic(event);
+
+       pr_debug("%s: group=%p tail=%llx->%llx ring_size=%llu\n", __func__,
+                group, group->ring_buffer.tail, new_tail, ring_size);
+
+       WRITE_ONCE(group->ring_buffer.tail, new_tail);
+}
+
+struct fsnotify_event *fsnotify_ring_alloc_event_slot(struct fsnotify_group *group,
+                                                     size_t size)
+       __acquires(&group->notification_lock)
+{
+       struct fsnotify_event *fsn;
+       u64 head, tail;
+       u64 ring_size = group->ring_buffer.nr_pages << PAGE_SHIFT;
+       u64 new_head;
+       void *kaddr;
+
+       if (WARN_ON(!(group->flags & FSN_SUBMISSION_RING_BUFFER) || size > PAGE_SIZE))
+               return ERR_PTR(-EINVAL);
+
+       pr_debug("%s: start group=%p ring_size=%llu, requested=%lu\n", __func__, group,
+                ring_size, size);
+
+       spin_lock(&group->notification_lock);
+again:
+       head = group->ring_buffer.head;
+       tail = group->ring_buffer.tail;
+       new_head = NEXT_SLOT(head, size, ring_size);
+
+       /* head would catch up to tail, corrupting an entry. */
+       if ((head < tail && new_head > tail) || (head > new_head && new_head > tail)) {
+               fsn = ERR_PTR(-ENOMEM);
+               goto err;
+       }
+
+       /*
+        * Not event a skip message fits in the page. We can detect the
+        * lack of space. Move on to the next page.
+        */
+       if ((PAGE_SIZE - (head & (PAGE_SIZE-1))) < sizeof(struct fsnotify_event)) {
+               /* Start again on next page */
+               group->ring_buffer.head = NEXT_PAGE(head, ring_size);
+               goto again;
+       }
+
+       kaddr = kmap_atomic(group->ring_buffer.pages[head / PAGE_SIZE]);
+       if (!kaddr) {
+               fsn = ERR_PTR(-EFAULT);
+               goto err;
+       }
+
+       fsn = (struct fsnotify_event *) (kaddr + (head & (PAGE_SIZE-1)));
+
+       if ((head >> PAGE_SHIFT) != (new_head >> PAGE_SHIFT)) {
+               /*
+                * No room in the current page.  Add a fake entry
+                * consuming the end the page to avoid splitting event
+                * structure.
+                */
+               fsn->slot_len = INVALID_RING_SLOT;
+               kunmap_atomic(kaddr);
+               /* Start again on the next page */
+               group->ring_buffer.head = NEXT_PAGE(head, ring_size);
+
+               goto again;
+       }
+       fsn->slot_len = size;
+
+       return fsn;
+
+err:
+       spin_unlock(&group->notification_lock);
+       return fsn;
+}
+
+void fsnotify_ring_commit_slot(struct fsnotify_group *group, struct fsnotify_event *fsn)
+       __releases(&group->notification_lock)
+{
+       u64 ring_size = group->ring_buffer.nr_pages << PAGE_SHIFT;
+       u64 head = group->ring_buffer.head;
+       u64 new_head = NEXT_SLOT(head, fsn->slot_len, ring_size);
+
+       pr_debug("%s: group=%p head=%llx->%llx ring_size=%llu\n", __func__,
+                group, head, new_head, ring_size);
+
+       kunmap_atomic(fsn);
+       group->ring_buffer.head = new_head;
+
+       spin_unlock(&group->notification_lock);
+
+       wake_up(&group->notification_waitq);
+       kill_fasync(&group->fsn_fa, SIGIO, POLL_IN);
+
+}
+
+void fsnotify_free_ring_buffer(struct fsnotify_group *group)
+{
+       int i;
+
+       for (i = 0; i < group->ring_buffer.nr_pages; i++)
+               __free_page(group->ring_buffer.pages[i]);
+       kfree(group->ring_buffer.pages);
+       group->ring_buffer.nr_pages = 0;
+}
+
+int fsnotify_create_ring_buffer(struct fsnotify_group *group)
+{
+       int nr_pages = FSNOTIFY_RING_PAGES;
+       int i;
+
+       pr_debug("%s: group=%p pages=%d\n", __func__, group, nr_pages);
+
+       group->ring_buffer.pages = kmalloc_array(nr_pages, sizeof(struct pages *),
+                                                GFP_KERNEL);
+       if (!group->ring_buffer.pages)
+               return -ENOMEM;
+
+       group->ring_buffer.head = 0;
+       group->ring_buffer.tail = 0;
+
+       for (i = 0; i < nr_pages; i++) {
+               group->ring_buffer.pages[i] = alloc_pages(GFP_KERNEL, 1);
+               if (!group->ring_buffer.pages)
+                       goto err_dealloc;
+       }
+
+       group->ring_buffer.nr_pages = nr_pages;
+
+       return 0;
+
+err_dealloc:
+       for (--i; i >= 0; i--)
+               __free_page(group->ring_buffer.pages[i]);
+       kfree(group->ring_buffer.pages);
+       group->ring_buffer.nr_pages = 0;
+       return -ENOMEM;
+}
+
+
Nothing in this file is fsnotify specific.
Is there no kernel lib implementation for this already?
If there isn't (I'd be very surprised) please put this in lib/ and post it
for wider review including self tests.
quoted hunk ↗ jump to hunk
diff --git a/include/linux/fsnotify_backend.h b/include/linux/fsnotify_backend.h
index 190c6a402e98..a1a4dd69e5ed 100644
--- a/include/linux/fsnotify_backend.h
+++ b/include/linux/fsnotify_backend.h
@@ -74,6 +74,8 @@
 #define ALL_FSNOTIFY_PERM_EVENTS (FS_OPEN_PERM | FS_ACCESS_PERM | \
                                  FS_OPEN_EXEC_PERM)

+#define FSN_SUBMISSION_RING_BUFFER     0x00000080
FSNOTIFY_GROUP_FLAG_RING_BUFFER please (or FSN_GROUP_ if you must)
and please define this above struct fsnotify_group, even right above the flags
field like FSNOTIFY_CONN_FLAG_HAS_FSID

*IF* we go this way :)

Thanks,
Amir.
Keyboard shortcuts
hback out one level
jnext message in thread
kprevious message in thread
ldrill in
Escclose help / fold thread tree
?toggle this help