Thread (78 messages) 78 messages, 4 authors, 2022-11-10

Re: [PATCH 02/17] powerpc/qspinlock: add mcs queueing for contended waiters

From: Jordan Niethe <hidden>
Date: 2022-11-10 00:37:18

On Thu, 2022-07-28 at 16:31 +1000, Nicholas Piggin wrote:
<snip>
[resend as utf-8, not utf-7]
 
+/*
+ * Bitfields in the atomic value:
+ *
+ *     0: locked bit
+ * 16-31: tail cpu (+1)
+ */
+#define	_Q_SET_MASK(type)	(((1U << _Q_ ## type ## _BITS) - 1)\
+				      << _Q_ ## type ## _OFFSET)
+#define _Q_LOCKED_OFFSET	0
+#define _Q_LOCKED_BITS		1
+#define _Q_LOCKED_MASK		_Q_SET_MASK(LOCKED)
+#define _Q_LOCKED_VAL		(1U << _Q_LOCKED_OFFSET)
+
+#define _Q_TAIL_CPU_OFFSET	16
+#define _Q_TAIL_CPU_BITS	(32 - _Q_TAIL_CPU_OFFSET)
+#define _Q_TAIL_CPU_MASK	_Q_SET_MASK(TAIL_CPU)
+
Just to state the obvious this is:

#define _Q_LOCKED_OFFSET	0
#define _Q_LOCKED_BITS		1
#define _Q_LOCKED_MASK		0x00000001
#define _Q_LOCKED_VAL		1

#define _Q_TAIL_CPU_OFFSET	16
#define _Q_TAIL_CPU_BITS	16
#define _Q_TAIL_CPU_MASK	0xffff0000
quoted hunk ↗ jump to hunk
+#if CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)
+#error "qspinlock does not support such large CONFIG_NR_CPUS"
+#endif
+
 #endif /* _ASM_POWERPC_QSPINLOCK_TYPES_H */
diff --git a/arch/powerpc/lib/qspinlock.c b/arch/powerpc/lib/qspinlock.c
index 8dbce99a373c..5ebb88d95636 100644
--- a/arch/powerpc/lib/qspinlock.c
+++ b/arch/powerpc/lib/qspinlock.c
@@ -1,12 +1,172 @@
 // SPDX-License-Identifier: GPL-2.0-or-later
+#include <linux/atomic.h>
+#include <linux/bug.h>
+#include <linux/compiler.h>
 #include <linux/export.h>
-#include <linux/processor.h>
+#include <linux/percpu.h>
+#include <linux/smp.h>
 #include <asm/qspinlock.h>
 
-void queued_spin_lock_slowpath(struct qspinlock *lock)
+#define MAX_NODES	4
+
+struct qnode {
+	struct qnode	*next;
+	struct qspinlock *lock;
+	u8		locked; /* 1 if lock acquired */
+};
+
+struct qnodes {
+	int		count;
+	struct qnode nodes[MAX_NODES];
+};
I think it could be worth commenting why qnodes::count instead _Q_TAIL_IDX_OFFSET.
+
+static DEFINE_PER_CPU_ALIGNED(struct qnodes, qnodes);
+
+static inline int encode_tail_cpu(void)
I think the generic version that takes smp_processor_id() as a parameter is clearer - at least with this function name.
+{
+	return (smp_processor_id() + 1) << _Q_TAIL_CPU_OFFSET;
+}
+
+static inline int get_tail_cpu(int val)
It seems like there should be a "decode" function to pair up with the "encode" function.
+{
+	return (val >> _Q_TAIL_CPU_OFFSET) - 1;
+}
+
+/* Take the lock by setting the bit, no other CPUs may concurrently lock it. */
Does that comment mean it is not necessary to use an atomic_or here?
+static __always_inline void lock_set_locked(struct qspinlock *lock)
nit: could just be called set_locked()
+{
+	atomic_or(_Q_LOCKED_VAL, &lock->val);
+	__atomic_acquire_fence();
+}
+
+/* Take lock, clearing tail, cmpxchg with val (which must not be locked) */
+static __always_inline int trylock_clear_tail_cpu(struct qspinlock *lock, int val)
+{
+	int newval = _Q_LOCKED_VAL;
+
+	if (atomic_cmpxchg_acquire(&lock->val, val, newval) == val)
+		return 1;
+	else
+		return 0;
same optional style nit: return (atomic_cmpxchg_acquire(&lock->val, val, newval) == val);
+}
+
+/*
+ * Publish our tail, replacing previous tail. Return previous value.
+ *
+ * This provides a release barrier for publishing node, and an acquire barrier
+ * for getting the old node.
+ */
+static __always_inline int publish_tail_cpu(struct qspinlock *lock, int tail)
Did you change from the xchg_tail() name in the generic version because of the release and acquire barriers this provides?
Does "publish" generally imply the old value will be returned?
 {
-	while (!queued_spin_trylock(lock))
+	for (;;) {
+		int val = atomic_read(&lock->val);
+		int newval = (val & ~_Q_TAIL_CPU_MASK) | tail;
+		int old;
+
+		old = atomic_cmpxchg(&lock->val, val, newval);
+		if (old == val)
+			return old;
+	}
+}
+
+static struct qnode *get_tail_qnode(struct qspinlock *lock, int val)
+{
+	int cpu = get_tail_cpu(val);
+	struct qnodes *qnodesp = per_cpu_ptr(&qnodes, cpu);
+	int idx;
+
+	for (idx = 0; idx < MAX_NODES; idx++) {
+		struct qnode *qnode = &qnodesp->nodes[idx];
+		if (qnode->lock == lock)
+			return qnode;
+	}
In case anyone else is confused by this, Nick explained each cpu can only queue on a unique spinlock once regardless of "idx" level.
+
+	BUG();
+}
+
+static inline void queued_spin_lock_mcs_queue(struct qspinlock *lock)
+{
+	struct qnodes *qnodesp;
+	struct qnode *next, *node;
+	int val, old, tail;
+	int idx;
+
+	BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
+
+	qnodesp = this_cpu_ptr(&qnodes);
+	if (unlikely(qnodesp->count == MAX_NODES)) {
The comparison is >= in the generic, I guess we've no nested NMI so this is safe?
+		while (!queued_spin_trylock(lock))
+			cpu_relax();
+		return;
+	}
+
+	idx = qnodesp->count++;
+	/*
+	 * Ensure that we increment the head node->count before initialising
+	 * the actual node. If the compiler is kind enough to reorder these
+	 * stores, then an IRQ could overwrite our assignments.
+	 */
+	barrier();
+	node = &qnodesp->nodes[idx];
+	node->next = NULL;
+	node->lock = lock;
+	node->locked = 0;
+
+	tail = encode_tail_cpu();
+
+	old = publish_tail_cpu(lock, tail);
+
+	/*
+	 * If there was a previous node; link it and wait until reaching the
+	 * head of the waitqueue.
+	 */
+	if (old & _Q_TAIL_CPU_MASK) {
+		struct qnode *prev = get_tail_qnode(lock, old);
+
+		/* Link @node into the waitqueue. */
+		WRITE_ONCE(prev->next, node);
+
+		/* Wait for mcs node lock to be released */
+		while (!node->locked)
+			cpu_relax();
+
+		smp_rmb(); /* acquire barrier for the mcs lock */
+	}
+
+	/* We're at the head of the waitqueue, wait for the lock. */
+	while ((val = atomic_read(&lock->val)) & _Q_LOCKED_VAL)
+		cpu_relax();
+
+	/* If we're the last queued, must clean up the tail. */
+	if ((val & _Q_TAIL_CPU_MASK) == tail) {
+		if (trylock_clear_tail_cpu(lock, val))
+			goto release;
+		/* Another waiter must have enqueued */
+	}
+
+	/* We must be the owner, just set the lock bit and acquire */
+	lock_set_locked(lock);
+
+	/* contended path; must wait for next != NULL (MCS protocol) */
+	while (!(next = READ_ONCE(node->next)))
 		cpu_relax();
+
+	/*
+	 * Unlock the next mcs waiter node. Release barrier is not required
+	 * here because the acquirer is only accessing the lock word, and
+	 * the acquire barrier we took the lock with orders that update vs
+	 * this store to locked. The corresponding barrier is the smp_rmb()
+	 * acquire barrier for mcs lock, above.
+	 */
+	WRITE_ONCE(next->locked, 1);
+
+release:
+	qnodesp->count--; /* release the node */
+}
+
+void queued_spin_lock_slowpath(struct qspinlock *lock)
+{
+	queued_spin_lock_mcs_queue(lock);
 }
 EXPORT_SYMBOL(queued_spin_lock_slowpath);
 
  
Keyboard shortcuts
hback out one level
jnext message in thread
kprevious message in thread
ldrill in
Escclose help / fold thread tree
?toggle this help