Thread (81 messages) 81 messages, 4 authors, 2021-10-07

Re: [dpdk-dev] [PATCH v5 4/5] eventdev/rx_adapter: implement per queue event buffer

From: Naga Harish K, S V <hidden>
Date: 2021-10-05 14:47:46

Hi Jay,
quoted hunk ↗ jump to hunk
-----Original Message-----
From: Jayatheerthan, Jay <redacted>
Sent: Tuesday, October 5, 2021 1:26 PM
To: Naga Harish K, S V <redacted>; jerinj@marvell.com
Cc: dev@dpdk.org
Subject: RE: [PATCH v5 4/5] eventdev/rx_adapter: implement per queue
event buffer
quoted
-----Original Message-----
From: Naga Harish K, S V <redacted>
Sent: Monday, October 4, 2021 11:11 AM
To: jerinj@marvell.com; Jayatheerthan, Jay
[off-list ref]
Cc: dev@dpdk.org
Subject: [PATCH v5 4/5] eventdev/rx_adapter: implement per queue
event
quoted
buffer

this patch implement the per queue event buffer with required
validations.

Signed-off-by: Naga Harish K S V <redacted>
---
 lib/eventdev/rte_event_eth_rx_adapter.c | 187
+++++++++++++++++-------
 1 file changed, 138 insertions(+), 49 deletions(-)
diff --git a/lib/eventdev/rte_event_eth_rx_adapter.c
b/lib/eventdev/rte_event_eth_rx_adapter.c
index 606db241b8..b61af0e75e 100644
--- a/lib/eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/eventdev/rte_event_eth_rx_adapter.c
@@ -102,10 +102,12 @@ struct rte_event_eth_rx_adapter {
 	uint8_t rss_key_be[RSS_KEY_SIZE];
 	/* Event device identifier */
 	uint8_t eventdev_id;
-	/* Per ethernet device structure */
-	struct eth_device_info *eth_devices;
 	/* Event port identifier */
 	uint8_t event_port_id;
+	/* Flag indicating per rxq event buffer */
+	bool use_queue_event_buf;
+	/* Per ethernet device structure */
+	struct eth_device_info *eth_devices;
 	/* Lock to serialize config updates with service function */
 	rte_spinlock_t rx_lock;
 	/* Max mbufs processed in any service function invocation */ @@
-241,6 +243,7 @@ struct eth_rx_queue_info {
 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id
else 0 */
quoted
 	uint64_t event;
 	struct eth_rx_vector_data vector_data;
+	struct rte_eth_event_enqueue_buffer *event_buf;
 };

 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter; @@
-767,10 +770,9 @@ rxa_enq_block_end_ts(struct
rte_event_eth_rx_adapter
quoted
*rx_adapter,

 /* Enqueue buffered events to event device */  static inline uint16_t
-rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
+		       struct rte_eth_event_enqueue_buffer *buf)
 {
-	struct rte_eth_event_enqueue_buffer *buf =
-	    &rx_adapter->event_enqueue_buffer;
 	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
 	uint16_t count = buf->last ? buf->last - buf->head : buf->count;
@@ -888,15 +890,14 @@ rxa_buffer_mbufs(struct
rte_event_eth_rx_adapter *rx_adapter,
quoted
 		uint16_t eth_dev_id,
 		uint16_t rx_queue_id,
 		struct rte_mbuf **mbufs,
-		uint16_t num)
+		uint16_t num,
+		struct rte_eth_event_enqueue_buffer *buf)
 {
 	uint32_t i;
 	struct eth_device_info *dev_info =
 					&rx_adapter-
eth_devices[eth_dev_id];
 	struct eth_rx_queue_info *eth_rx_queue_info =
 					&dev_info-
rx_queue[rx_queue_id];
-	struct rte_eth_event_enqueue_buffer *buf =
-					&rx_adapter-
event_enqueue_buffer;
 	uint16_t new_tail = buf->tail;
 	uint64_t event = eth_rx_queue_info->event;
 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask; @@ -
995,11
quoted
+996,10 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
 	uint16_t queue_id,
 	uint32_t rx_count,
 	uint32_t max_rx,
-	int *rxq_empty)
+	int *rxq_empty,
+	struct rte_eth_event_enqueue_buffer *buf)
 {
 	struct rte_mbuf *mbufs[BATCH_SIZE];
-	struct rte_eth_event_enqueue_buffer *buf =
-					&rx_adapter-
event_enqueue_buffer;
 	struct rte_event_eth_rx_adapter_stats *stats =
 					&rx_adapter->stats;
 	uint16_t n;
@@ -1012,7 +1012,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter
*rx_adapter,
quoted
 	 */
 	while (rxa_pkt_buf_available(buf)) {
 		if (buf->count >= BATCH_SIZE)
-			rxa_flush_event_buffer(rx_adapter);
+			rxa_flush_event_buffer(rx_adapter, buf);

 		stats->rx_poll_count++;
 		n = rte_eth_rx_burst(port_id, queue_id, mbufs,
BATCH_SIZE); @@
quoted
-1021,14 +1021,14 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter
*rx_adapter,
quoted
 				*rxq_empty = 1;
 			break;
 		}
-		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
+		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n,
buf);
quoted
 		nb_rx += n;
 		if (rx_count + nb_rx > max_rx)
 			break;
 	}

 	if (buf->count > 0)
-		rxa_flush_event_buffer(rx_adapter);
+		rxa_flush_event_buffer(rx_adapter, buf);

 	return nb_rx;
 }
@@ -1169,7 +1169,7 @@ rxa_intr_ring_dequeue(struct
rte_event_eth_rx_adapter *rx_adapter)
quoted
 	ring_lock = &rx_adapter->intr_ring_lock;

 	if (buf->count >= BATCH_SIZE)
-		rxa_flush_event_buffer(rx_adapter);
+		rxa_flush_event_buffer(rx_adapter, buf);

 	while (rxa_pkt_buf_available(buf)) {
 		struct eth_device_info *dev_info;
@@ -1221,7 +1221,7 @@ rxa_intr_ring_dequeue(struct
rte_event_eth_rx_adapter *rx_adapter)
quoted
 					continue;
 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
 					rx_adapter->max_nb_rx,
-					&rxq_empty);
+					&rxq_empty, buf);
 				nb_rx += n;

 				enq_buffer_full = !rxq_empty && n == 0;
@@ -1242,7 +1242,7 @@
quoted
rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
 		} else {
 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
 				rx_adapter->max_nb_rx,
-				&rxq_empty);
+				&rxq_empty, buf);
 			rx_adapter->qd_valid = !rxq_empty;
 			nb_rx += n;
 			if (nb_rx > rx_adapter->max_nb_rx) @@ -1273,13
+1273,12 @@
quoted
rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)  {
 	uint32_t num_queue;
 	uint32_t nb_rx = 0;
-	struct rte_eth_event_enqueue_buffer *buf;
+	struct rte_eth_event_enqueue_buffer *buf = NULL;
 	uint32_t wrr_pos;
 	uint32_t max_nb_rx;

 	wrr_pos = rx_adapter->wrr_pos;
 	max_nb_rx = rx_adapter->max_nb_rx;
-	buf = &rx_adapter->event_enqueue_buffer;

 	/* Iterate through a WRR sequence */
 	for (num_queue = 0; num_queue < rx_adapter->wrr_len;
num_queue++) {
quoted
@@ -1287,24 +1286,36 @@ rxa_poll(struct rte_event_eth_rx_adapter
*rx_adapter)
quoted
 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;

+		if (rx_adapter->use_queue_event_buf) {
+			struct eth_device_info *dev_info =
+				&rx_adapter->eth_devices[d];
+			buf = dev_info->rx_queue[qid].event_buf;
+		} else
+			buf = &rx_adapter->event_enqueue_buffer;
+
 		/* Don't do a batch dequeue from the rx queue if there isn't
 		 * enough space in the enqueue buffer.
 		 */
 		if (buf->count >= BATCH_SIZE)
-			rxa_flush_event_buffer(rx_adapter);
+			rxa_flush_event_buffer(rx_adapter, buf);
 		if (!rxa_pkt_buf_available(buf)) {
-			rx_adapter->wrr_pos = wrr_pos;
-			return nb_rx;
+			if (rx_adapter->use_queue_event_buf)
+				goto poll_next_entry;
+			else {
+				rx_adapter->wrr_pos = wrr_pos;
+				return nb_rx;
+			}
 		}

 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
-				NULL);
+				NULL, buf);
 		if (nb_rx > max_nb_rx) {
 			rx_adapter->wrr_pos =
 				    (wrr_pos + 1) % rx_adapter->wrr_len;
 			break;
 		}

+poll_next_entry:
 		if (++wrr_pos == rx_adapter->wrr_len)
 			wrr_pos = 0;
 	}
@@ -1315,12 +1326,18 @@ static void
 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)  {
 	struct rte_event_eth_rx_adapter *rx_adapter = arg;
-	struct rte_eth_event_enqueue_buffer *buf =
-		&rx_adapter->event_enqueue_buffer;
+	struct rte_eth_event_enqueue_buffer *buf = NULL;
 	struct rte_event *ev;

+	if (rx_adapter->use_queue_event_buf) {
+		struct eth_device_info *dev_info =
+			&rx_adapter->eth_devices[vec->port];
+		buf = dev_info->rx_queue[vec->queue].event_buf;
+	} else
+		buf = &rx_adapter->event_enqueue_buffer;
+
The above code to get the buffer can be made an inline function since it is
needed in more than one place.
Added new inline function to get event buffer pointer in v6 patch set.
quoted
 	if (buf->count)
-		rxa_flush_event_buffer(rx_adapter);
+		rxa_flush_event_buffer(rx_adapter, buf);

 	if (vec->vector_ev->nb_elem == 0)
 		return;
@@ -1947,9 +1964,16 @@ rxa_sw_del(struct rte_event_eth_rx_adapter
*rx_adapter,
quoted
 	rx_adapter->num_rx_intr -= intrq;
 	dev_info->nb_rx_intr -= intrq;
 	dev_info->nb_shared_intr -= intrq && sintrq;
+	if (rx_adapter->use_queue_event_buf) {
+		struct rte_eth_event_enqueue_buffer *event_buf =
+			dev_info->rx_queue[rx_queue_id].event_buf;
+		rte_free(event_buf->events);
+		rte_free(event_buf);
+		dev_info->rx_queue[rx_queue_id].event_buf = NULL;
+	}
 }

-static void
+static int
 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct eth_device_info *dev_info,
 	int32_t rx_queue_id,
@@ -1961,15 +1985,21 @@ rxa_add_queue(struct
rte_event_eth_rx_adapter *rx_adapter,
quoted
 	int intrq;
 	int sintrq;
 	struct rte_event *qi_ev;
+	struct rte_eth_event_enqueue_buffer *new_rx_buf = NULL;
+	uint16_t eth_dev_id = dev_info->dev->data->port_id;
+	int ret;

 	if (rx_queue_id == -1) {
 		uint16_t nb_rx_queues;
 		uint16_t i;

 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
-		for (i = 0; i <	nb_rx_queues; i++)
-			rxa_add_queue(rx_adapter, dev_info, i, conf);
-		return;
+		for (i = 0; i <	nb_rx_queues; i++) {
+			ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
+			if (ret)
+				return ret;
+		}
+		return 0;
 	}

 	pollq = rxa_polled_queue(dev_info, rx_queue_id); @@ -2032,6
+2062,37
quoted
@@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 				dev_info->next_q_idx = 0;
 		}
 	}
+
+	if (!rx_adapter->use_queue_event_buf)
+		return 0;
+
+	new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
+				sizeof(*new_rx_buf), 0,
+				rte_eth_dev_socket_id(eth_dev_id));
+	if (new_rx_buf == NULL) {
+		RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta
for "
quoted
+				 "dev_id: %d queue_id: %d",
+				 eth_dev_id, rx_queue_id);
+		return -ENOMEM;
+	}
+
+	new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size,
BATCH_SIZE);
quoted
+	new_rx_buf->events_size += (2 * BATCH_SIZE);
+	new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
+				sizeof(struct rte_event) *
+				new_rx_buf->events_size, 0,
+				rte_eth_dev_socket_id(eth_dev_id));
+	if (new_rx_buf->events == NULL) {
+		rte_free(new_rx_buf);
+		RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
+				 "dev_id: %d queue_id: %d",
+				 eth_dev_id, rx_queue_id);
+		return -ENOMEM;
+	}
+
+	queue_info->event_buf = new_rx_buf;
+
+	return 0;
 }

 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, @@
-2060,6 +2121,16 @@ static int rxa_sw_add(struct
rte_event_eth_rx_adapter *rx_adapter,
quoted
 			temp_conf.servicing_weight = 1;
 		}
 		queue_conf = &temp_conf;
+
+		if (queue_conf->servicing_weight == 0 &&
+		    rx_adapter->use_queue_event_buf) {
+
+			RTE_EDEV_LOG_ERR("Use of queue level event
buffer "
quoted
+					 "not supported for interrupt queues
"
quoted
+					 "dev_id: %d queue_id: %d",
+					 eth_dev_id, rx_queue_id);
+			return -EINVAL;
+		}
 	}

 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
@@ -2139,7 +2210,9 @@ static int rxa_sw_add(struct
rte_event_eth_rx_adapter *rx_adapter,



-	rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
+	ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id,
queue_conf);
quoted
+	if (ret)
+		goto err_free_rxqueue;
 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);

 	rte_free(rx_adapter->eth_rx_poll);
@@ -2160,7 +2233,7 @@ static int rxa_sw_add(struct
rte_event_eth_rx_adapter *rx_adapter,
quoted
 	rte_free(rx_poll);
 	rte_free(rx_wrr);

-	return 0;
+	return ret;
 }

 static int
@@ -2286,20 +2359,26 @@ rxa_create(uint8_t id, uint8_t dev_id,
 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];

 	/* Rx adapter event buffer allocation */
-	buf = &rx_adapter->event_enqueue_buffer;
-	buf->events_size = RTE_ALIGN(rxa_params->event_buf_size,
BATCH_SIZE);
quoted
-
-	events = rte_zmalloc_socket(rx_adapter->mem_name,
-				    buf->events_size * sizeof(*events),
-				    0, socket_id);
-	if (events == NULL) {
-		RTE_EDEV_LOG_ERR("Failed to allocate mem for event
buffer\n");
quoted
-		rte_free(rx_adapter->eth_devices);
-		rte_free(rx_adapter);
-		return -ENOMEM;
-	}
+	rx_adapter->use_queue_event_buf = rxa_params-
use_queue_event_buf;
+
+	if (!rx_adapter->use_queue_event_buf) {
+		buf = &rx_adapter->event_enqueue_buffer;
+		buf->events_size = RTE_ALIGN(rxa_params-
event_buf_size,
+					     BATCH_SIZE);
+
+		events = rte_zmalloc_socket(rx_adapter->mem_name,
+					    buf->events_size *
sizeof(*events),
quoted
+					    0, socket_id);
+		if (events == NULL) {
+			RTE_EDEV_LOG_ERR("Failed to allocate memory "
+					 "for adapter event buffer");
+			rte_free(rx_adapter->eth_devices);
+			rte_free(rx_adapter);
+			return -ENOMEM;
+		}

-	rx_adapter->event_enqueue_buffer.events = events;
+		rx_adapter->event_enqueue_buffer.events = events;
+	}

 	event_eth_rx_adapter[id] = rx_adapter;
@@ -2327,6 +2406,7 @@ rte_event_eth_rx_adapter_create_ext(uint8_t
id,
quoted
uint8_t dev_id,

 	/* use default values for adapter params */
 	rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
+	rxa_params.use_queue_event_buf = false;

 	return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);  }
@@
quoted
-2347,9 +2427,9 @@
rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
quoted
 	if (rxa_params == NULL) {
 		rxa_params = &temp_params;
 		rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
-	}
-
-	if (rxa_params->event_buf_size == 0)
+		rxa_params->use_queue_event_buf = false;
+	} else if ((!rxa_params->use_queue_event_buf &&
+		    rxa_params->event_buf_size == 0))
 		return -EINVAL;

 	pc = rte_malloc(NULL, sizeof(*pc), 0); @@ -2418,7 +2498,8 @@
rte_event_eth_rx_adapter_free(uint8_t id)
 	if (rx_adapter->default_cb_arg)
 		rte_free(rx_adapter->conf_arg);
 	rte_free(rx_adapter->eth_devices);
-	rte_free(rx_adapter->event_enqueue_buffer.events);
+	if (!rx_adapter->use_queue_event_buf)
+		rte_free(rx_adapter->event_enqueue_buffer.events);
 	rte_free(rx_adapter);
 	event_eth_rx_adapter[id] = NULL;
@@ -2522,6 +2603,14 @@ rte_event_eth_rx_adapter_queue_add(uint8_t
id,
quoted
 		return -EINVAL;
 	}

+	if ((rx_adapter->use_queue_event_buf &&
+	     queue_conf->event_buf_size == 0) ||
+	    (!rx_adapter->use_queue_event_buf &&
+	     queue_conf->event_buf_size != 0)) {
+		RTE_EDEV_LOG_ERR("Invalid Event buffer size for the
queue");
quoted
+		return -EINVAL;
+	}
+
Another error case is configuring both - rx_adapter->use_queue_event_buf
= true and queue_conf->event_buf_size != 0.
This is valid case.
quoted
 	dev_info = &rx_adapter->eth_devices[eth_dev_id];

 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
--
2.25.1
  
Keyboard shortcuts
hback out one level
jnext message in thread
kprevious message in thread
ldrill in
Escclose help / fold thread tree
?toggle this help