Thread (131 messages) 131 messages, 12 authors, 2018-11-06

Re: [PATCH v2] ring: fix c11 memory ordering issue

From: He, Jia <hidden>
Date: 2018-08-07 05:56:36

Hi Gavin
-----Original Message-----
From: Gavin Hu [mailto:gavin.hu@arm.com]
Sent: 2018年8月7日 11:20
To: dev@dpdk.org
Cc: gavin.hu@arm.com; Honnappa.Nagarahalli@arm.com;
steve.capper@arm.com; Ola.Liljedahl@arm.com;
jerin.jacob@caviumnetworks.com; hemant.agrawal@nxp.com; He, Jia
[off-list ref]; stable@dpdk.org
Subject: [PATCH v2] ring: fix c11 memory ordering issue

This patch includes two bug fixes(#1 and 2) and two optimisations(#3 and #4).
Maybe you need to split this into small parts.
1) In update_tail, read ht->tail using __atomic_load.Although the
   compiler currently seems to be doing the right thing even without
   _atomic_load, we don't want to give the compiler freedom to optimise
   what should be an atomic load, it should not be arbitarily moved
   around.
2) Synchronize the load-acquire of the tail and the store-release
   within update_tail, the store release ensures all the ring operations,
   engqueu or dequeue are seen by the observers as soon as they see
   the updated tail. The load-acquire is required for correctly compu-
   tate the free_entries or avail_entries, respectively for enqueue and
   dequeue operations, the data dependency is not reliable for ordering
   as the compiler might break it by saving to temporary values to boost
   performance.
Could you describe the race condition in details?
e.g.
cpu 1			cpu2
code1
				code2

Cheers,
Jia
quoted hunk ↗ jump to hunk
3) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
   the do {} while loop as upon failure the old_head will be updated,
   another load is costy and not necessary.
4) When calling __atomic_compare_exchange_n, relaxed ordering for both
   success and failure, as multiple threads can work independently on
   the same end of the ring (either enqueue or dequeue) without
   synchronization, not as operating on tail, which has to be finished
   in sequence.

The patch was benchmarked with test/ring_perf_autotest and it decreases the
enqueue/dequeue latency by 5% ~ 24.6% with two lcores, the real gains are
dependent on the number of lcores, depth of the ring, SPSC or MPMC.
For 1 lcore, it also improves a little, about 3 ~ 4%.
It is a big improvement, in case of MPMC, with rings size of 32, it saves latency up
to (6.90-5.20)/6.90 = 24.6%.

Test result data:

SP/SC bulk enq/dequeue (size: 8): 13.19
MP/MC bulk enq/dequeue (size: 8): 25.79
SP/SC bulk enq/dequeue (size: 32): 3.85
MP/MC bulk enq/dequeue (size: 32): 6.90

SP/SC bulk enq/dequeue (size: 8): 12.05
MP/MC bulk enq/dequeue (size: 8): 23.06
SP/SC bulk enq/dequeue (size: 32): 3.62
MP/MC bulk enq/dequeue (size: 32): 5.20

Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
Cc: stable@dpdk.org

Signed-off-by: Gavin Hu <redacted>
Reviewed-by: Honnappa Nagarahalli <redacted>
Reviewed-by: Steve Capper <redacted>
Reviewed-by: Ola Liljedahl <redacted>
---
 lib/librte_ring/rte_ring_c11_mem.h | 38
+++++++++++++++++++++++++-------------
 1 file changed, 25 insertions(+), 13 deletions(-)
diff --git a/lib/librte_ring/rte_ring_c11_mem.h
b/lib/librte_ring/rte_ring_c11_mem.h
index 94df3c4a6..cfa3be4a7 100644
--- a/lib/librte_ring/rte_ring_c11_mem.h
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -21,7 +21,8 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
uint32_t new_val,
 	 * we need to wait for them to complete
 	 */
 	if (!single)
-		while (unlikely(ht->tail != old_val))
+		while (unlikely(old_val != __atomic_load_n(&ht->tail,
+						__ATOMIC_RELAXED)))
 			rte_pause();

 	__atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); @@ -60,20
+61,24 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 	unsigned int max = n;
 	int success;

+	*old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
 	do {
 		/* Reset n to the initial burst count */
 		n = max;

-		*old_head = __atomic_load_n(&r->prod.head,
-					__ATOMIC_ACQUIRE);

-		/*
-		 *  The subtraction is done between two unsigned 32bits value
+		/* load-acquire synchronize with store-release of ht->tail
+		 * in update_tail.
+		 */
+		const uint32_t cons_tail = __atomic_load_n(&r->cons.tail,
+							__ATOMIC_ACQUIRE);
+
+		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * *old_head > cons_tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*free_entries = (capacity + r->cons.tail - *old_head);
+		*free_entries = (capacity + cons_tail - *old_head);

 		/* check that we have enough room in ring */
 		if (unlikely(n > *free_entries))
@@ -87,9 +92,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned
int is_sp,
 		if (is_sp)
 			r->prod.head = *new_head, success = 1;
 		else
+			/* on failure, *old_head is updated */
 			success = __atomic_compare_exchange_n(&r->prod.head,
 					old_head, *new_head,
-					0, __ATOMIC_ACQUIRE,
+					/*weak=*/0, __ATOMIC_RELAXED,
 					__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
@@ -128,18 +134,23 @@ __rte_ring_move_cons_head(struct rte_ring *r, int
is_sc,
 	int success;

 	/* move cons.head atomically */
+	*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
 	do {
 		/* Restore n as it may change every loop */
 		n = max;
-		*old_head = __atomic_load_n(&r->cons.head,
-					__ATOMIC_ACQUIRE);
+
+		/* this load-acquire synchronize with store-release of ht->tail
+		 * in update_tail.
+		 */
+		const uint32_t prod_tail = __atomic_load_n(&r->prod.tail,
+							__ATOMIC_ACQUIRE);

 		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * cons_head > prod_tail). So 'entries' is always between 0
 		 * and size(ring)-1.
 		 */
-		*entries = (r->prod.tail - *old_head);
+		*entries = (prod_tail - *old_head);

 		/* Set the actual entries for dequeue */
 		if (n > *entries)
@@ -152,10 +163,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int
is_sc,
 		if (is_sc)
 			r->cons.head = *new_head, success = 1;
 		else
+			/* on failure, *old_head will be updated */
 			success = __atomic_compare_exchange_n(&r->cons.head,
-							old_head, *new_head,
-							0, __ATOMIC_ACQUIRE,
-							__ATOMIC_RELAXED);
+						old_head, *new_head,
+						/*weak=*/0, __ATOMIC_RELAXED,
+						__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
 }
--
2.11.0
  
Keyboard shortcuts
hback out one level
jnext message in thread
kprevious message in thread
ldrill in
Escclose help / fold thread tree
?toggle this help