Re: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter event vector support
From: Pavan Nikhilesh Bhagavatula <hidden>
Date: 2021-03-26 09:00:26
quoted
From: Pavan Nikhilesh Bhagavatula <redacted> Sent: Thursday, March 25, 2021 6:44 PM To: Jayatheerthan, Jay <redacted>; Jerin JacobKollanukkaran [off-list ref]; Carrillo, Erik Gquoted
[off-list ref]; Gujjar, Abhinandan S[off-list ref]; McDaniel, Timothyquoted
[off-list ref]; hemant.agrawal@nxp.com; VanHaaren, Harry [off-list ref]; mattias.ronnblomquoted
[off-list ref]; Ma, Liang J[off-list ref]quoted
Cc: dev@dpdk.org Subject: RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapterevent vector supportquoted
quoted
-----Original Message----- From: Jayatheerthan, Jay <redacted> Sent: Thursday, March 25, 2021 4:07 PM To: Pavan Nikhilesh Bhagavatula <redacted>;Jerinquoted
quoted
Jacob Kollanukkaran [off-list ref]; Carrillo, Erik G [off-list ref]; Gujjar, Abhinandan S [off-list ref]; McDaniel, Timothy [off-list ref]; hemant.agrawal@nxp.com; Van Haaren, Harry [off-list ref]; mattias.ronnblom [off-list ref]; Ma, Liang J [off-list ref] Cc: dev@dpdk.org Subject: [EXT] RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rxadapterquoted
quoted
event vector support External Email ----------------------------------------------------------------------quoted
-----Original Message----- From: pbhagavatula@marvell.com <redacted> Sent: Wednesday, March 24, 2021 10:35 AM To: jerinj@marvell.com; Jayatheerthan, Jay[off-list ref]; Carrillo, Erik G [off-list ref]; Gujjar,quoted
Abhinandan S [off-list ref]; McDaniel, Timothy[off-list ref]; hemant.agrawal@nxp.com; Vanquoted
Haaren, Harry [off-list ref]; mattias.ronnblom[off-list ref]; Ma, Liang Jquoted
[off-list ref] Cc: dev@dpdk.org; Pavan Nikhilesh <redacted> Subject: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adaptereventquoted
quoted
vector supportquoted
From: Pavan Nikhilesh <redacted> Add event vector support for event eth Rx adapter, theimplementationquoted
creates vector flows based on port and queue identifier of thereceivedquoted
mbufs. Signed-off-by: Pavan Nikhilesh <redacted> --- lib/librte_eventdev/eventdev_pmd.h | 7 +- .../rte_event_eth_rx_adapter.c | 257 ++++++++++++++++--quoted
quoted
quoted
lib/librte_eventdev/rte_eventdev.c | 6 +- 3 files changed, 250 insertions(+), 20 deletions(-)diff --git a/lib/librte_eventdev/eventdev_pmd.hb/lib/librte_eventdev/eventdev_pmd.hquoted
index 9297f1433..0f724ac85 100644--- a/lib/librte_eventdev/eventdev_pmd.h +++ b/lib/librte_eventdev/eventdev_pmd.h@@ -69,9 +69,10 @@ extern "C" { } \ } while (0) -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \ -((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \quoted
-(RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))quoted
+#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP\quoted
+ ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |\quoted
+ (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |\quoted
+ (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR)) #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATAquoted
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.cb/lib/librte_eventdev/rte_event_eth_rx_adapter.cquoted
index ac8ba5bf0..c71990078 100644--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c@@ -26,6 +26,10 @@ #define BATCH_SIZE 32 #define BLOCK_CNT_THRESHOLD 10 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE) +#define MAX_VECTOR_SIZE 1024 +#define MIN_VECTOR_SIZE 4 +#define MAX_VECTOR_NS 1E9 +#define MIN_VECTOR_NS 1E5 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32@@ -59,6 +63,20 @@ struct eth_rx_poll_entry { uint16_t eth_rx_qid; }; +struct eth_rx_vector_data { + TAILQ_ENTRY(eth_rx_vector_data) next; + uint16_t port; + uint16_t queue; + uint16_t max_vector_count; + uint64_t event; + uint64_t ts; + uint64_t vector_timeout_ticks; + struct rte_mempool *vector_pool; + struct rte_event_vector *vector_ev; +} __rte_cache_aligned; + +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data); + /* Instance per adapter */ struct rte_eth_event_enqueue_buffer { /* Count of events in this buffer */@@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter { uint32_t wrr_pos; /* Event burst buffer */ struct rte_eth_event_enqueue_buffer event_enqueue_buffer; + /* Vector enable flag */ + uint8_t ena_vector; + /* Timestamp of previous vector expiry list traversal */ + uint64_t prev_expiry_ts; + /* Minimum ticks to wait before traversing expiry list */ + uint64_t vector_tmo_ticks; + /* vector list */ + struct eth_rx_vector_data_list vector_list; /* Per adapter stats */ struct rte_event_eth_rx_adapter_stats stats; /* Block count, counts up to BLOCK_CNT_THRESHOLD */@@ -198,9 +224,11 @@ struct eth_device_info { struct eth_rx_queue_info { int queue_enabled; /* True if added */ int intr_enabled; + uint8_t ena_vector; uint16_t wt; /* Polling weight */ uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else0 */quoted
uint64_t event; + struct eth_rx_vector_data vector_data; }; static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;@@ -722,6 +750,9 @@ rxa_flush_event_buffer(structrte_event_eth_rx_adapter *rx_adapter)quoted
&rx_adapter->event_enqueue_buffer; struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter- stats; + if (!buf->count) + return 0; + uint16_t n = rte_event_enqueue_new_burst(rx_adapter- eventdev_id, rx_adapter->event_port_id, buf->events,@@ -742,6 +773,72 @@ rxa_flush_event_buffer(structrte_event_eth_rx_adapter *rx_adapter)quoted
return n; } +static inline uint16_t +rxa_create_event_vector(struct rte_event_eth_rx_adapter*rx_adapter,quoted
+ struct eth_rx_queue_info *queue_info, + struct rte_eth_event_enqueue_buffer *buf, + struct rte_mbuf **mbufs, uint16_t num) +{ + struct rte_event *ev = &buf->events[buf->count]; + struct eth_rx_vector_data *vec; + uint16_t filled, space, sz; + + filled = 0; + vec = &queue_info->vector_data; + while (num) { + if (vec->vector_ev == NULL) { + if (rte_mempool_get(vec->vector_pool, + (void **)&vec->vector_ev) <0) {quoted
+ rte_pktmbuf_free_bulk(mbufs, num); + return 0; + } + vec->vector_ev->nb_elem = 0; + vec->vector_ev->port = vec->port; + vec->vector_ev->queue = vec->queue; + vec->vector_ev->attr_valid = true; + TAILQ_INSERT_TAIL(&rx_adapter->vector_list,vec, next);quoted
+ } else if (vec->vector_ev->nb_elem == vec- max_vector_count) {Is there a case where nb_elem > max_vector_count as weaccumulatequoted
quoted
sz to it ?I don't think so, that would overflow the vector event.quoted
quoted
+ /* Event ready. */ + ev->event = vec->event; + ev->vec = vec->vector_ev; + ev++; + filled++; + vec->vector_ev = NULL; + TAILQ_REMOVE(&rx_adapter->vector_list, vec,next);quoted
+ if (rte_mempool_get(vec->vector_pool, + (void **)&vec->vector_ev) <0) {quoted
+ rte_pktmbuf_free_bulk(mbufs, num); + return 0; + } + vec->vector_ev->nb_elem = 0; + vec->vector_ev->port = vec->port; + vec->vector_ev->queue = vec->queue; + vec->vector_ev->attr_valid = true; + TAILQ_INSERT_TAIL(&rx_adapter->vector_list,vec, next);quoted
+ } + + space = vec->max_vector_count - vec->vector_ev- nb_elem; + sz = num > space ? space : num; + memcpy(vec->vector_ev->mbufs + vec->vector_ev- nb_elem, mbufs, + sizeof(void *) * sz); + vec->vector_ev->nb_elem += sz; + num -= sz; + mbufs += sz; + vec->ts = rte_rdtsc(); + } + + if (vec->vector_ev->nb_elem == vec->max_vector_count) {Same here.quoted
+ ev->event = vec->event; + ev->vec = vec->vector_ev; + ev++; + filled++; + vec->vector_ev = NULL; + TAILQ_REMOVE(&rx_adapter->vector_list, vec, next); + } + + return filled; +}I am seeing more than one repeating code chunks in this function. Perhaps, you can give it a try to not repeat. We can drop if its performance affecting.I will try to move them to inline functions and test.quoted
quoted
+ static inline void rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,@@ -770,25 +867,30 @@ rxa_buffer_mbufs(structrte_event_eth_rx_adapter *rx_adapter,quoted
rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1); do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;The RSS related code is executed for vector case as well. Can this be moved inside ena_vector if condition ?RSS is used to generate the event flowid, in vector case the flow id Will be a combination of port and queue id. The idea is that flows having the same RSS LSB will end up in the same queue.I meant to say, rss_mask and do_rss are used only when ena_vector is false. Could be moved inside the appropriate condition ?
Ah! I see I will move them inside the conditional.
quoted
quoted
quoted
- for (i = 0; i < num; i++) { - m = mbufs[i]; - - rss = do_rss ? - rxa_do_softrss(m, rx_adapter->rss_key_be) : - m->hash.rss; - ev->event = event; - ev->flow_id = (rss & ~flow_id_mask) | - (ev->flow_id & flow_id_mask); - ev->mbuf = m; - ev++; + if (!eth_rx_queue_info->ena_vector) { + for (i = 0; i < num; i++) { + m = mbufs[i]; + + rss = do_rss ? rxa_do_softrss(m, rx_adapter- rss_key_be) + : m->hash.rss; + ev->event = event; + ev->flow_id = (rss & ~flow_id_mask) | + (ev->flow_id & flow_id_mask); + ev->mbuf = m; + ev++; + } + } else { + num = rxa_create_event_vector(rx_adapter,eth_rx_queue_info,quoted
+ buf, mbufs, num); } - if (dev_info->cb_fn) { + if (num && dev_info->cb_fn) { dropped = 0; nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id, - ETH_EVENT_BUFFER_SIZE, buf- count, ev, - num, dev_info->cb_arg,&dropped);quoted
+ ETH_EVENT_BUFFER_SIZE, buf- count, + &buf->events[buf->count],num,quoted
+ dev_info->cb_arg, &dropped);Before this patch, we pass ev which is &buf->events[buf->count] +numquoted
quoted
as fifth param when calling cb_fn. Now, we are passing &buf-quoted
events[buf->count] for non-vector case. Do you see this as anissue?quoted
quoted
The callback function takes in the array newly formed events i.e. weneedquoted
to pass the start of array and the count. the previous code had a bug where it passes the end of the event list.ok, that makes sense.quoted
quoted
Also, for vector case would it make sense to do pass &buf-events[buf-quoted
quoted
count] + num ?quoted
if (unlikely(nb_cb > num)) RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d)events",quoted
nb_cb, num);@@ -1124,6 +1226,30 @@ rxa_poll(structrte_event_eth_rx_adapterquoted
quoted
*rx_adapter)quoted
return nb_rx; } +static void +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg) +{ + struct rte_event_eth_rx_adapter *rx_adapter = arg; + struct rte_eth_event_enqueue_buffer *buf = + &rx_adapter->event_enqueue_buffer; + struct rte_event *ev; + + if (buf->count) + rxa_flush_event_buffer(rx_adapter); + + if (vec->vector_ev->nb_elem == 0) + return; + ev = &buf->events[buf->count]; + + /* Event ready. */ + ev->event = vec->event; + ev->vec = vec->vector_ev; + buf->count++; + + vec->vector_ev = NULL; + vec->ts = 0; +} + static int rxa_service_func(void *args) {@@ -1137,6 +1263,24 @@ rxa_service_func(void *args) return 0; } + if (rx_adapter->ena_vector) { + if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >= + rx_adapter->vector_tmo_ticks) { + struct eth_rx_vector_data *vec; + + TAILQ_FOREACH(vec, &rx_adapter->vector_list,next) {quoted
+ uint64_t elapsed_time = rte_rdtsc() -vec->ts;quoted
+ + if (elapsed_time >= vec- vector_timeout_ticks) { + rxa_vector_expire(vec,rx_adapter);quoted
+ TAILQ_REMOVE(&rx_adapter- vector_list, + vec, next); + } + } + rx_adapter->prev_expiry_ts = rte_rdtsc(); + } + } + stats = &rx_adapter->stats; stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter); stats->rx_packets += rxa_poll(rx_adapter);@@ -1640,6 +1784,28 @@ rxa_update_queue(structrte_event_eth_rx_adapter *rx_adapter,quoted
} } +static void +rxa_set_vector_data(struct eth_rx_queue_info *queue_info,uint16_t vector_count,quoted
+ uint64_t vector_ns, struct rte_mempool *mp, int32_tqid,quoted
+ uint16_t port_id) +{ +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9) + struct eth_rx_vector_data *vector_data; + uint32_t flow_id; + + vector_data = &queue_info->vector_data; + vector_data->max_vector_count = vector_count; + vector_data->port = port_id; + vector_data->queue = qid; + vector_data->vector_pool = mp; + vector_data->vector_timeout_ticks = + NSEC2TICK(vector_ns, rte_get_timer_hz()); + vector_data->ts = 0; + flow_id = queue_info->event & 0xFFFFF; + flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) :flow_id; Maybe I am missing something here. Looking at the code it looks like qid and port_id may overlap. For e.g., if qid = 0x10 and port_id =0x11,quoted
quoted
flow_id would end up being 0x11. Is this the expectation? Also, itmayquoted
quoted
be useful to document flow_id format.The flow_id is 20 bit, I guess we could do 12bit queue_id and 8bit port as a flow.This sounds reasonable to me. It would be useful to have the flow_id format and how it is used for vectorization in Rx/Tx adapter documentation.
This is only applicable to the SW Rx adapter implementation a HW implementation might have its own way of implementing the flow aggregation. There is no documentation specific to SW Rx adapter, I will add it to the commit log.
quoted
quoted
Comparing this format with existing RSS hash based method, are we saying that all mbufs received in a rx burst are part of same flowwhenquoted
quoted
vectorization is used?Yes, the hard way to do this is to use a hash table and treating each mbuf having an unique flow.quoted
quoted
+ vector_data->event = (queue_info->event & ~0xFFFFF) |flow_id;quoted
+} + static void rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter, struct eth_device_info *dev_info,@@ -1741,6 +1907,44 @@ rxa_add_queue(structrte_event_eth_rx_adapter *rx_adapter,quoted
} } +static void +rxa_sw_event_vector_configure( + struct rte_event_eth_rx_adapter *rx_adapter, uint16_teth_dev_id,quoted
+ int rx_queue_id, + const struct rte_event_eth_rx_adapter_event_vector_config*config)quoted
+{ + struct eth_device_info *dev_info = &rx_adapter- eth_devices[eth_dev_id]; + struct eth_rx_queue_info *queue_info; + struct rte_event *qi_ev; + + if (rx_queue_id == -1) { + uint16_t nb_rx_queues; + uint16_t i; + + nb_rx_queues = dev_info->dev->data->nb_rx_queues; + for (i = 0; i < nb_rx_queues; i++) + rxa_sw_event_vector_configure(rx_adapter,eth_dev_id, i,quoted
+ config); + return; + } + + queue_info = &dev_info->rx_queue[rx_queue_id]; + qi_ev = (struct rte_event *)&queue_info->event; + queue_info->ena_vector = 1; + qi_ev->event_type =RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;quoted
+ rxa_set_vector_data(queue_info, config->vector_sz, + config->vector_timeout_ns, config- vector_mp, + rx_queue_id, dev_info->dev->data->port_id); + rx_adapter->ena_vector = 1; + rx_adapter->vector_tmo_ticks = + rx_adapter->vector_tmo_ticks ? + RTE_MIN(config->vector_timeout_ns << 1, + rx_adapter->vector_tmo_ticks) : + config->vector_timeout_ns << 1; + rx_adapter->prev_expiry_ts = 0; + TAILQ_INIT(&rx_adapter->vector_list); +} + static int rxa_sw_add(struct rte_event_eth_rx_adapter*rx_adapter,quoted
quoted
quoted
uint16_t eth_dev_id, int rx_queue_id,@@ -2081,6 +2285,15 @@rte_event_eth_rx_adapter_queue_add(uint8_t id,quoted
return -EINVAL; } + if ((cap &RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &"ed
+ (queue_conf->rx_queue_flags & + RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) { + RTE_EDEV_LOG_ERR("Event vectorization is notsupported,"quoted
+ " eth port: %" PRIu16 " adapter id: %"PRIu8,quoted
+ eth_dev_id, id); + return -EINVAL; + } + if ((cap &RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &"ed
(rx_queue_id != -1)) { RTE_EDEV_LOG_ERR("Rx queues can only be connectedto single "quoted
@@ -2143,6 +2356,17 @@rte_event_eth_rx_adapter_queue_add(uint8_t id,quoted
return 0; } +static int +rxa_sw_vector_limits(structrte_event_eth_rx_adapter_vector_limits *limits)quoted
+{ + limits->max_sz = MAX_VECTOR_SIZE; + limits->min_sz = MIN_VECTOR_SIZE; + limits->max_timeout_ns = MAX_VECTOR_NS; + limits->min_timeout_ns = MIN_VECTOR_NS; + + return 0; +} + int rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_teth_dev_id,quoted
int32_t rx_queue_id)@@ -2333,7 +2557,8 @@rte_event_eth_rx_adapter_queue_event_vector_config(quoted
ret = dev->dev_ops- eth_rx_adapter_event_vector_config( dev, &rte_eth_devices[eth_dev_id],rx_queue_id, config);quoted
} else { - ret = -ENOTSUP; + rxa_sw_event_vector_configure(rx_adapter,eth_dev_id,quoted
+ rx_queue_id, config); } return ret;@@ -2371,7 +2596,7 @@rte_event_eth_rx_adapter_vector_limits_get(quoted
ret = dev->dev_ops- eth_rx_adapter_vector_limits_get( dev, &rte_eth_devices[eth_port_id], limits); } else { - ret = -ENOTSUP; + ret = rxa_sw_vector_limits(limits); } return ret;diff --git a/lib/librte_eventdev/rte_eventdev.cb/lib/librte_eventdev/rte_eventdev.cquoted
index f95edc075..254a31b1f 100644--- a/lib/librte_eventdev/rte_eventdev.c +++ b/lib/librte_eventdev/rte_eventdev.c@@ -122,7 +122,11 @@rte_event_eth_rx_adapter_caps_get(uint8_tquoted
quoted
dev_id, uint16_t eth_port_id,quoted
if (caps == NULL) return -EINVAL; - *caps = 0; + + if (dev->dev_ops->eth_rx_adapter_caps_get == NULL) + *caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP; + else + *caps = 0;Any reason why we had to set default caps value? I am thinking if sw event device is used, it would set it anyways.There are multiple sw event devices which don't implement caps_get function, this changes solves that.quoted
quoted
return dev->dev_ops->eth_rx_adapter_caps_get ? (*dev->dev_ops- eth_rx_adapter_caps_get)(dev, -- 2.17.1