Re: [dpdk-dev] [PATCH v5 4/5] eventdev/rx_adapter: implement per queue event buffer
From: Naga Harish K, S V <hidden>
Date: 2021-10-05 14:47:46
Hi Jay,
quoted hunk ↗ jump to hunk
-----Original Message----- From: Jayatheerthan, Jay <redacted> Sent: Tuesday, October 5, 2021 1:26 PM To: Naga Harish K, S V <redacted>; jerinj@marvell.com Cc: dev@dpdk.org Subject: RE: [PATCH v5 4/5] eventdev/rx_adapter: implement per queue event bufferquoted
-----Original Message----- From: Naga Harish K, S V <redacted> Sent: Monday, October 4, 2021 11:11 AM To: jerinj@marvell.com; Jayatheerthan, Jay [off-list ref] Cc: dev@dpdk.org Subject: [PATCH v5 4/5] eventdev/rx_adapter: implement per queueeventquoted
buffer this patch implement the per queue event buffer with required validations. Signed-off-by: Naga Harish K S V <redacted> --- lib/eventdev/rte_event_eth_rx_adapter.c | 187 +++++++++++++++++------- 1 file changed, 138 insertions(+), 49 deletions(-)diff --git a/lib/eventdev/rte_event_eth_rx_adapter.cb/lib/eventdev/rte_event_eth_rx_adapter.c index 606db241b8..b61af0e75e 100644--- a/lib/eventdev/rte_event_eth_rx_adapter.c +++ b/lib/eventdev/rte_event_eth_rx_adapter.c@@ -102,10 +102,12 @@ struct rte_event_eth_rx_adapter { uint8_t rss_key_be[RSS_KEY_SIZE]; /* Event device identifier */ uint8_t eventdev_id; - /* Per ethernet device structure */ - struct eth_device_info *eth_devices; /* Event port identifier */ uint8_t event_port_id; + /* Flag indicating per rxq event buffer */ + bool use_queue_event_buf; + /* Per ethernet device structure */ + struct eth_device_info *eth_devices; /* Lock to serialize config updates with service function */ rte_spinlock_t rx_lock; /* Max mbufs processed in any service function invocation */ @@ -241,6 +243,7 @@ struct eth_rx_queue_info { uint32_t flow_id_mask; /* Set to ~0 if app provides flow idelse 0 */quoted
uint64_t event; struct eth_rx_vector_data vector_data; + struct rte_eth_event_enqueue_buffer *event_buf; }; static struct rte_event_eth_rx_adapter **event_eth_rx_adapter; @@ -767,10 +770,9 @@ rxa_enq_block_end_ts(structrte_event_eth_rx_adapterquoted
*rx_adapter, /* Enqueue buffered events to event device */ static inline uint16_t -rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter) +rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter, + struct rte_eth_event_enqueue_buffer *buf) { - struct rte_eth_event_enqueue_buffer *buf = - &rx_adapter->event_enqueue_buffer; struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats; uint16_t count = buf->last ? buf->last - buf->head : buf->count;@@ -888,15 +890,14 @@ rxa_buffer_mbufs(structrte_event_eth_rx_adapter *rx_adapter,quoted
uint16_t eth_dev_id, uint16_t rx_queue_id, struct rte_mbuf **mbufs, - uint16_t num) + uint16_t num, + struct rte_eth_event_enqueue_buffer *buf) { uint32_t i; struct eth_device_info *dev_info = &rx_adapter- eth_devices[eth_dev_id]; struct eth_rx_queue_info *eth_rx_queue_info = &dev_info- rx_queue[rx_queue_id]; - struct rte_eth_event_enqueue_buffer *buf = - &rx_adapter- event_enqueue_buffer; uint16_t new_tail = buf->tail; uint64_t event = eth_rx_queue_info->event; uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask; @@ -995,11quoted
+996,10 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, uint16_t queue_id, uint32_t rx_count, uint32_t max_rx, - int *rxq_empty) + int *rxq_empty, + struct rte_eth_event_enqueue_buffer *buf) { struct rte_mbuf *mbufs[BATCH_SIZE]; - struct rte_eth_event_enqueue_buffer *buf = - &rx_adapter- event_enqueue_buffer; struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats; uint16_t n;@@ -1012,7 +1012,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter*rx_adapter,quoted
*/ while (rxa_pkt_buf_available(buf)) { if (buf->count >= BATCH_SIZE) - rxa_flush_event_buffer(rx_adapter); + rxa_flush_event_buffer(rx_adapter, buf); stats->rx_poll_count++; n = rte_eth_rx_burst(port_id, queue_id, mbufs,BATCH_SIZE); @@quoted
-1021,14 +1021,14 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter*rx_adapter,quoted
*rxq_empty = 1; break; } - rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n); + rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n,buf);quoted
nb_rx += n; if (rx_count + nb_rx > max_rx) break; } if (buf->count > 0) - rxa_flush_event_buffer(rx_adapter); + rxa_flush_event_buffer(rx_adapter, buf); return nb_rx; }@@ -1169,7 +1169,7 @@ rxa_intr_ring_dequeue(structrte_event_eth_rx_adapter *rx_adapter)quoted
ring_lock = &rx_adapter->intr_ring_lock; if (buf->count >= BATCH_SIZE) - rxa_flush_event_buffer(rx_adapter); + rxa_flush_event_buffer(rx_adapter, buf); while (rxa_pkt_buf_available(buf)) { struct eth_device_info *dev_info;@@ -1221,7 +1221,7 @@ rxa_intr_ring_dequeue(structrte_event_eth_rx_adapter *rx_adapter)quoted
continue; n = rxa_eth_rx(rx_adapter, port, i, nb_rx, rx_adapter->max_nb_rx, - &rxq_empty); + &rxq_empty, buf); nb_rx += n; enq_buffer_full = !rxq_empty && n == 0;@@ -1242,7 +1242,7 @@quoted
rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter) } else { n = rxa_eth_rx(rx_adapter, port, queue, nb_rx, rx_adapter->max_nb_rx, - &rxq_empty); + &rxq_empty, buf); rx_adapter->qd_valid = !rxq_empty; nb_rx += n; if (nb_rx > rx_adapter->max_nb_rx) @@ -1273,13+1273,12 @@quoted
rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) { uint32_t num_queue; uint32_t nb_rx = 0; - struct rte_eth_event_enqueue_buffer *buf; + struct rte_eth_event_enqueue_buffer *buf = NULL; uint32_t wrr_pos; uint32_t max_nb_rx; wrr_pos = rx_adapter->wrr_pos; max_nb_rx = rx_adapter->max_nb_rx; - buf = &rx_adapter->event_enqueue_buffer; /* Iterate through a WRR sequence */ for (num_queue = 0; num_queue < rx_adapter->wrr_len;num_queue++) {quoted
@@ -1287,24 +1286,36 @@ rxa_poll(struct rte_event_eth_rx_adapter*rx_adapter)quoted
uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid; uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id; + if (rx_adapter->use_queue_event_buf) { + struct eth_device_info *dev_info = + &rx_adapter->eth_devices[d]; + buf = dev_info->rx_queue[qid].event_buf; + } else + buf = &rx_adapter->event_enqueue_buffer; + /* Don't do a batch dequeue from the rx queue if there isn't * enough space in the enqueue buffer. */ if (buf->count >= BATCH_SIZE) - rxa_flush_event_buffer(rx_adapter); + rxa_flush_event_buffer(rx_adapter, buf); if (!rxa_pkt_buf_available(buf)) { - rx_adapter->wrr_pos = wrr_pos; - return nb_rx; + if (rx_adapter->use_queue_event_buf) + goto poll_next_entry; + else { + rx_adapter->wrr_pos = wrr_pos; + return nb_rx; + } } nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx, - NULL); + NULL, buf); if (nb_rx > max_nb_rx) { rx_adapter->wrr_pos = (wrr_pos + 1) % rx_adapter->wrr_len; break; } +poll_next_entry: if (++wrr_pos == rx_adapter->wrr_len) wrr_pos = 0; }@@ -1315,12 +1326,18 @@ static void rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg) { struct rte_event_eth_rx_adapter *rx_adapter = arg; - struct rte_eth_event_enqueue_buffer *buf = - &rx_adapter->event_enqueue_buffer; + struct rte_eth_event_enqueue_buffer *buf = NULL; struct rte_event *ev; + if (rx_adapter->use_queue_event_buf) { + struct eth_device_info *dev_info = + &rx_adapter->eth_devices[vec->port]; + buf = dev_info->rx_queue[vec->queue].event_buf; + } else + buf = &rx_adapter->event_enqueue_buffer; +The above code to get the buffer can be made an inline function since it is needed in more than one place.
Added new inline function to get event buffer pointer in v6 patch set.
quoted
if (buf->count) - rxa_flush_event_buffer(rx_adapter); + rxa_flush_event_buffer(rx_adapter, buf); if (vec->vector_ev->nb_elem == 0) return;@@ -1947,9 +1964,16 @@ rxa_sw_del(struct rte_event_eth_rx_adapter*rx_adapter,quoted
rx_adapter->num_rx_intr -= intrq; dev_info->nb_rx_intr -= intrq; dev_info->nb_shared_intr -= intrq && sintrq; + if (rx_adapter->use_queue_event_buf) { + struct rte_eth_event_enqueue_buffer *event_buf = + dev_info->rx_queue[rx_queue_id].event_buf; + rte_free(event_buf->events); + rte_free(event_buf); + dev_info->rx_queue[rx_queue_id].event_buf = NULL; + } } -static void +static int rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter, struct eth_device_info *dev_info, int32_t rx_queue_id,@@ -1961,15 +1985,21 @@ rxa_add_queue(structrte_event_eth_rx_adapter *rx_adapter,quoted
int intrq; int sintrq; struct rte_event *qi_ev; + struct rte_eth_event_enqueue_buffer *new_rx_buf = NULL; + uint16_t eth_dev_id = dev_info->dev->data->port_id; + int ret; if (rx_queue_id == -1) { uint16_t nb_rx_queues; uint16_t i; nb_rx_queues = dev_info->dev->data->nb_rx_queues; - for (i = 0; i < nb_rx_queues; i++) - rxa_add_queue(rx_adapter, dev_info, i, conf); - return; + for (i = 0; i < nb_rx_queues; i++) { + ret = rxa_add_queue(rx_adapter, dev_info, i, conf); + if (ret) + return ret; + } + return 0; } pollq = rxa_polled_queue(dev_info, rx_queue_id); @@ -2032,6+2062,37quoted
@@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter, dev_info->next_q_idx = 0; } } + + if (!rx_adapter->use_queue_event_buf) + return 0; + + new_rx_buf = rte_zmalloc_socket("rx_buffer_meta", + sizeof(*new_rx_buf), 0, + rte_eth_dev_socket_id(eth_dev_id)); + if (new_rx_buf == NULL) { + RTE_EDEV_LOG_ERR("Failed to allocate event buffer metafor "quoted
+ "dev_id: %d queue_id: %d", + eth_dev_id, rx_queue_id); + return -ENOMEM; + } + + new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size,BATCH_SIZE);quoted
+ new_rx_buf->events_size += (2 * BATCH_SIZE); + new_rx_buf->events = rte_zmalloc_socket("rx_buffer", + sizeof(struct rte_event) * + new_rx_buf->events_size, 0, + rte_eth_dev_socket_id(eth_dev_id)); + if (new_rx_buf->events == NULL) { + rte_free(new_rx_buf); + RTE_EDEV_LOG_ERR("Failed to allocate event buffer for " + "dev_id: %d queue_id: %d", + eth_dev_id, rx_queue_id); + return -ENOMEM; + } + + queue_info->event_buf = new_rx_buf; + + return 0; } static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, @@ -2060,6 +2121,16 @@ static int rxa_sw_add(structrte_event_eth_rx_adapter *rx_adapter,quoted
temp_conf.servicing_weight = 1; } queue_conf = &temp_conf; + + if (queue_conf->servicing_weight == 0 && + rx_adapter->use_queue_event_buf) { + + RTE_EDEV_LOG_ERR("Use of queue level eventbuffer "quoted
+ "not supported for interrupt queues"quoted
+ "dev_id: %d queue_id: %d", + eth_dev_id, rx_queue_id); + return -EINVAL; + } } nb_rx_queues = dev_info->dev->data->nb_rx_queues;@@ -2139,7 +2210,9 @@ static int rxa_sw_add(structrte_event_eth_rx_adapter *rx_adapter, - rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf); + ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id,queue_conf);quoted
+ if (ret) + goto err_free_rxqueue; rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr); rte_free(rx_adapter->eth_rx_poll);@@ -2160,7 +2233,7 @@ static int rxa_sw_add(structrte_event_eth_rx_adapter *rx_adapter,quoted
rte_free(rx_poll); rte_free(rx_wrr); - return 0; + return ret; } static int@@ -2286,20 +2359,26 @@ rxa_create(uint8_t id, uint8_t dev_id, rx_adapter->eth_devices[i].dev = &rte_eth_devices[i]; /* Rx adapter event buffer allocation */ - buf = &rx_adapter->event_enqueue_buffer; - buf->events_size = RTE_ALIGN(rxa_params->event_buf_size,BATCH_SIZE);quoted
- - events = rte_zmalloc_socket(rx_adapter->mem_name, - buf->events_size * sizeof(*events), - 0, socket_id); - if (events == NULL) { - RTE_EDEV_LOG_ERR("Failed to allocate mem for eventbuffer\n");quoted
- rte_free(rx_adapter->eth_devices); - rte_free(rx_adapter); - return -ENOMEM; - } + rx_adapter->use_queue_event_buf = rxa_params- use_queue_event_buf; + + if (!rx_adapter->use_queue_event_buf) { + buf = &rx_adapter->event_enqueue_buffer; + buf->events_size = RTE_ALIGN(rxa_params- event_buf_size, + BATCH_SIZE); + + events = rte_zmalloc_socket(rx_adapter->mem_name, + buf->events_size *sizeof(*events),quoted
+ 0, socket_id); + if (events == NULL) { + RTE_EDEV_LOG_ERR("Failed to allocate memory " + "for adapter event buffer"); + rte_free(rx_adapter->eth_devices); + rte_free(rx_adapter); + return -ENOMEM; + } - rx_adapter->event_enqueue_buffer.events = events; + rx_adapter->event_enqueue_buffer.events = events; + } event_eth_rx_adapter[id] = rx_adapter;@@ -2327,6 +2406,7 @@ rte_event_eth_rx_adapter_create_ext(uint8_tid,quoted
uint8_t dev_id, /* use default values for adapter params */ rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE; + rxa_params.use_queue_event_buf = false; return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg); }@@quoted
-2347,9 +2427,9 @@rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,quoted
if (rxa_params == NULL) { rxa_params = &temp_params; rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE; - } - - if (rxa_params->event_buf_size == 0) + rxa_params->use_queue_event_buf = false; + } else if ((!rxa_params->use_queue_event_buf && + rxa_params->event_buf_size == 0)) return -EINVAL; pc = rte_malloc(NULL, sizeof(*pc), 0); @@ -2418,7 +2498,8 @@ rte_event_eth_rx_adapter_free(uint8_t id) if (rx_adapter->default_cb_arg) rte_free(rx_adapter->conf_arg); rte_free(rx_adapter->eth_devices); - rte_free(rx_adapter->event_enqueue_buffer.events); + if (!rx_adapter->use_queue_event_buf) + rte_free(rx_adapter->event_enqueue_buffer.events); rte_free(rx_adapter); event_eth_rx_adapter[id] = NULL;@@ -2522,6 +2603,14 @@ rte_event_eth_rx_adapter_queue_add(uint8_tid,quoted
return -EINVAL; } + if ((rx_adapter->use_queue_event_buf && + queue_conf->event_buf_size == 0) || + (!rx_adapter->use_queue_event_buf && + queue_conf->event_buf_size != 0)) { + RTE_EDEV_LOG_ERR("Invalid Event buffer size for thequeue");quoted
+ return -EINVAL; + } +Another error case is configuring both - rx_adapter->use_queue_event_buf = true and queue_conf->event_buf_size != 0.
This is valid case.
quoted
dev_info = &rx_adapter->eth_devices[eth_dev_id]; if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) { -- 2.25.1