diff --git a/code/bngblaster/src/bbl.c b/code/bngblaster/src/bbl.c index 1df57c34..7e794689 100644 --- a/code/bngblaster/src/bbl.c +++ b/code/bngblaster/src/bbl.c @@ -632,10 +632,13 @@ main(int argc, char *argv[]) /* Cleanup resources. */ CLEANUP: bbl_interface_unlock_all(); - log_close(); if(g_ctx->ctrl_socket_path) { bbl_ctrl_socket_close(); } +#ifdef BNGBLASTER_DPDK + io_dpdk_close(); +#endif + log_close(); bbl_ctx_del(); return exit_status; } diff --git a/code/bngblaster/src/bbl_interface.h b/code/bngblaster/src/bbl_interface.h index 88beae7c..0acd45ea 100644 --- a/code/bngblaster/src/bbl_interface.h +++ b/code/bngblaster/src/bbl_interface.h @@ -18,6 +18,8 @@ typedef struct bbl_interface_ uint32_t state_transitions; /* interface state transitions */ uint32_t ifindex; /* interface index */ uint32_t pcap_index; /* interface index for packet captures */ + uint16_t portid; /* DPDK port identifier */ + bbl_link_config_s *config; diff --git a/code/bngblaster/src/io/io_def.h b/code/bngblaster/src/io/io_def.h index d9c0305a..95ef7600 100644 --- a/code/bngblaster/src/io/io_def.h +++ b/code/bngblaster/src/io/io_def.h @@ -49,6 +49,12 @@ typedef struct io_handle_ { struct tpacket_req req; struct sockaddr_ll addr; +#ifdef BNGBLASTER_DPDK + struct rte_eth_dev_tx_buffer *tx_buffer; + struct rte_mempool *mbuf_pool; + uint16_t queue; +#endif + uint8_t *ring; /* ring buffer */ unsigned int cursor; /* ring buffer cursor */ unsigned int queued; @@ -78,6 +84,7 @@ typedef struct io_handle_ { uint64_t io_errors; uint64_t no_buffer; uint64_t polled; + uint64_t dropped; } stats; struct io_handle_ *next; diff --git a/code/bngblaster/src/io/io_dpdk.c b/code/bngblaster/src/io/io_dpdk.c index 05ca05a2..5f7bbd43 100644 --- a/code/bngblaster/src/io/io_dpdk.c +++ b/code/bngblaster/src/io/io_dpdk.c @@ -23,11 +23,38 @@ #include #include #include +#include + +#define NUM_MBUFS 8191 +#define MBUF_CACHE_SIZE 250 +#define BURST_SIZE 128 + +static struct rte_eth_conf port_conf = { + .rxmode = { + .split_hdr_size = 0, + }, + .txmode = { + .mq_mode = RTE_ETH_MQ_TX_NONE, + }, +}; + +struct rte_mempool *mbuf_pool; + +static bool +io_dpdk_dev_info(uint16_t portid, struct rte_eth_dev_info *dev_info) +{ + int ret = rte_eth_dev_info_get(portid, dev_info); + if(ret != 0) { + LOG(ERROR, "DPDK: Error during getting device (port %u) info: %s\n", + portid, strerror(-ret)); + return false; + } + return true; +} bool io_dpdk_init() { - int ret; uint16_t portid; uint16_t dpdk_ports; @@ -43,21 +70,353 @@ io_dpdk_init() dpdk_args[0] = "bngblaster"; dpdk_args[1] = "-v"; + LOG_NOARG(DPDK, "DPDK: init the EAL\n"); rte_eal_init(2, argv); dpdk_ports = rte_eth_dev_count_avail(); - LOG(DEBUG, "DPDK: %u ports available\n", dpdk_ports); + LOG(DPDK, "DPDK: %u ports available\n", dpdk_ports); RTE_ETH_FOREACH_DEV(portid) { - ret = rte_eth_dev_info_get(portid, &dev_info); - if(ret != 0) { - LOG(DEBUG, "DPDK: Error during getting device (port %u) info: %s\n", - portid, strerror(-ret)); + if(!io_dpdk_dev_info(portid, &dev_info)) { return false; } - LOG(DEBUG, "DPDK: %s (port %u)\n", + LOG(DPDK, "DPDK: %s (port %u)\n", dev_info.device->name, portid); } + + mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", + NUM_MBUFS * dpdk_ports, MBUF_CACHE_SIZE, 0, + RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); + if(!mbuf_pool) { + LOG(ERROR, "DPDK: failed to create mbuf pool\n"); + return false; + } + + return true; +} + +void +io_dpdk_close() +{ + if(g_ctx->dpdk) { + LOG_NOARG(DPDK, "DPDK: clean up the EAL\n"); + rte_eal_cleanup(); + } +} + +/** + * This job is for DPDK RX in main thread! + */ +void +io_dpdk_rx_job(timer_s *timer) +{ + io_handle_s *io = timer->data; + bbl_interface_s *interface = io->interface; + + bbl_ethernet_header_s *eth; + + struct rte_mbuf *pkts_burst[BURST_SIZE]; + struct rte_mbuf *packet; + uint16_t nb_rx; + uint16_t i; + + protocol_error_t decode_result; + bool pcap = false; + + assert(io->mode == IO_MODE_DPDK); + assert(io->direction == IO_INGRESS); + assert(io->thread == NULL); + + /* Get RX timestamp */ + clock_gettime(CLOCK_MONOTONIC, &io->timestamp); + nb_rx = rte_eth_rx_burst(interface->portid, io->queue, pkts_burst, BURST_SIZE); + for(i = 0; i < nb_rx; i++) { + packet = pkts_burst[i]; + io->buf = rte_pktmbuf_mtod(packet, uint8_t *); + io->buf_len = packet->pkt_len; + io->stats.packets++; + io->stats.bytes += io->buf_len; + decode_result = decode_ethernet(io->buf, io->buf_len, g_ctx->sp, SCRATCHPAD_LEN, ð); + if(decode_result == PROTOCOL_SUCCESS) { + /* Copy RX timestamp */ + eth->timestamp.tv_sec = io->timestamp.tv_sec; + eth->timestamp.tv_nsec = io->timestamp.tv_nsec; + bbl_rx_handler(interface, eth); + } else if(decode_result == UNKNOWN_PROTOCOL) { + io->stats.unknown++; + } else { + io->stats.protocol_errors++; + } + /* Dump the packet into pcap file */ + if(g_ctx->pcap.write_buf && (!eth->bbl || g_ctx->pcap.include_streams)) { + pcap = true; + pcapng_push_packet_header(&io->timestamp, io->buf, io->buf_len, + interface->pcap_index, PCAPNG_EPB_FLAGS_INBOUND); + } + } + if(pcap) { + pcapng_fflush(); + } +} + +/* + * This job is for DPDK TX in main thread! + */ +void +io_dpdk_tx_job(timer_s *timer) +{ + io_handle_s *io = timer->data; + + assert(io->mode == IO_MODE_DPDK); + assert(io->direction == IO_EGRESS); + assert(io->thread == NULL); + + io_update_stream_token_bucket(io); +} + +void +io_dpdk_thread_rx_run_fn(io_thread_s *thread) +{ + io_handle_s *io = thread->io; + bbl_interface_s *interface = io->interface; + + struct rte_mbuf *pkts_burst[BURST_SIZE]; + struct rte_mbuf *packet; + + uint16_t portid = interface->portid; + uint16_t nb_rx; + uint16_t i; + + assert(io->mode == IO_MODE_DPDK); + assert(io->direction == IO_INGRESS); + assert(io->thread); + + struct timespec sleep, rem; + sleep.tv_sec = 0; + sleep.tv_nsec = 0; + + while(thread->active) { + nb_rx = rte_eth_rx_burst(portid, io->queue, pkts_burst, BURST_SIZE); + if(nb_rx == 0) { + sleep.tv_nsec = 1000; /* 0.001ms */ + nanosleep(&sleep, &rem); + continue; + } + /* Get RX timestamp */ + clock_gettime(CLOCK_MONOTONIC, &io->timestamp); + for(i = 0; i < nb_rx; i++) { + packet = pkts_burst[i]; + io->buf = rte_pktmbuf_mtod(packet, uint8_t *); + io->buf_len = packet->pkt_len; + /* Process packet */ + io_thread_rx_handler(thread, io); + } + } +} + +/** + * This job is for DPDK TX in worker thread! + */ +void +io_dpdk_thread_tx_job(timer_s *timer) +{ + io_thread_s *thread = timer->data; + io_handle_s *io = thread->io; + + assert(io->mode == IO_MODE_DPDK); + assert(io->direction == IO_EGRESS); + assert(io->thread); + + io_update_stream_token_bucket(io); +} + +bool +io_dpdk_interface_init(bbl_interface_s *interface) +{ + bbl_link_config_s *config = interface->config; + + int ret; + bool found = false; + + uint16_t portid; + uint16_t queue; + uint16_t id; + uint16_t nb_rx_queue = 1; + uint16_t nb_tx_queue = 1; + struct rte_eth_dev_info dev_info; + struct rte_eth_conf local_port_conf = port_conf; + struct rte_eth_rxconf rx_conf; + struct rte_eth_txconf tx_conf; + struct rte_ether_addr mac; + io_handle_s *io; + + RTE_ETH_FOREACH_DEV(portid) { + if(io_dpdk_dev_info(portid, &dev_info)) { + if(strcmp(dev_info.device->name, interface->name) == 0) { + found = true; + interface->portid = portid; + break; + } + } + } + if(!found) { + LOG(ERROR, "DPDK: interface %s not found\n", interface->name); + return false; + } + + /* Get MAC address */ + if(*(uint32_t*)config->mac) { + memcpy(interface->mac, config->mac, ETH_ADDR_LEN); + } else { + if(rte_eth_macaddr_get(portid, &mac) < 0) { + LOG(ERROR, "DPDK: failed to get MAC from interface %s\n", interface->name); + return false; + } + memcpy(interface->mac, mac.addr_bytes, ETH_ADDR_LEN); + } + + /* Configure interface */ + if(config->tx_threads) { + nb_tx_queue = config->tx_threads; + } + if(config->rx_threads) { + nb_rx_queue = config->rx_threads; + } + if(dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE) { + local_port_conf.txmode.offloads |= RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE; + } + ret = rte_eth_dev_configure(portid, nb_rx_queue, nb_tx_queue, &local_port_conf); + if(ret < 0) { + LOG(ERROR, "DPDK: failed to configure interface %s (error %d)\n", + interface->name, ret); + return false; + } + ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &config->io_slots_rx, &config->io_slots_tx); + if(ret < 0) { + LOG(ERROR, "DPDK: failed to adjust number of descriptors for interface %s (error %d)\n", + interface->name, ret); + return false; + } + + id = nb_rx_queue; + for(queue = 0; queue < nb_rx_queue; queue++) { + io = calloc(1, sizeof(io_handle_s)); + if(!io) return false; + io->id = --id; + io->mode = config->io_mode; + io->direction = IO_INGRESS; + io->next = interface->io.rx; + interface->io.rx = io; + io->interface = interface; + CIRCLEQ_INIT(&io->stream_tx_qhead); + if(config->rx_threads) { + if(!io_thread_init(io)) { + return false; + } + io->thread->run_fn = io_dpdk_thread_rx_run_fn; + } else { + timer_add_periodic(&g_ctx->timer_root, &interface->io.rx_job, "RX", 0, + config->rx_interval, io, &io_dpdk_rx_job); + } + io->queue = queue; +#if 0 + io->mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", + NUM_MBUFS, MBUF_CACHE_SIZE, 0, + RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); + if(!io->mbuf_pool) { + LOG(ERROR, "DPDK: failed to create mbuf pool for interface %s queue %u (error %d)\n", + interface->name, queue, ret); + return false; + } +#endif + rx_conf = dev_info.default_rxconf; + rx_conf.offloads = local_port_conf.rxmode.offloads; + ret = rte_eth_rx_queue_setup(portid, queue, config->io_slots_rx, + rte_eth_dev_socket_id(portid), + &rx_conf, mbuf_pool); + if(ret < 0) { + LOG(ERROR, "DPDK: failed to setup RX queue %u for interface %s (error %d)\n", + queue, interface->name, ret); + return false; + } + } + + id = nb_tx_queue; + for(queue = 0; queue < nb_tx_queue; queue++) { + io = calloc(1, sizeof(io_handle_s)); + if(!io) return false; + io->id = --id; + io->mode = config->io_mode; + io->direction = IO_EGRESS; + io->next = interface->io.tx; + interface->io.tx = io; + io->interface = interface; + io->buf = malloc(IO_BUFFER_LEN); + CIRCLEQ_INIT(&io->stream_tx_qhead); + if(config->tx_threads) { + if(!io_thread_init(io)) { + return false; + } + timer_add_periodic(&io->thread->timer.root, &io->thread->timer.io, "TX (threaded)", 0, + config->tx_interval, io->thread, &io_dpdk_thread_tx_job); + io->thread->timer.io->reset = false; + } else { + timer_add_periodic(&g_ctx->timer_root, &interface->io.tx_job, "TX", 0, + config->tx_interval, io, &io_dpdk_tx_job); + interface->io.tx_job->reset = false; + } + + tx_conf = dev_info.default_txconf; + tx_conf.offloads = local_port_conf.txmode.offloads; + ret = rte_eth_tx_queue_setup(portid, queue, config->io_slots_tx, + rte_eth_dev_socket_id(portid), + &tx_conf); + if(ret < 0) { + LOG(ERROR, "DPDK: failed to setup TX queue %u for interface %s (error %d)\n", + queue, interface->name, ret); + return false; + } + + /* Initialize TX buffers */ + io->tx_buffer = rte_zmalloc_socket("tx_buffer", + RTE_ETH_TX_BUFFER_SIZE(BURST_SIZE), 0, + rte_eth_dev_socket_id(portid)); + if (!io->tx_buffer) { + LOG(ERROR, "DPDK: failed to allocate TX buffer for interface %s queue %u (error %d)\n", + interface->name, queue, ret); + return false; + } + rte_eth_tx_buffer_init(io->tx_buffer, BURST_SIZE); + ret = rte_eth_tx_buffer_set_err_callback(io->tx_buffer, + rte_eth_tx_buffer_count_callback, &io->stats.dropped); + if(ret < 0) { + LOG(ERROR, "DPDK: failed to set TX error callback for interface %s queue %u (error %d)\n", + interface->name, queue, ret); + return false; + } + } + + ret = rte_eth_dev_set_ptypes(portid, RTE_PTYPE_UNKNOWN, NULL, 0); + if (ret < 0) { + LOG(ERROR, "DPDK: failed to disable ptype parsing for interface %s (error %d)\n", + interface->name, ret); + return false; + } + + ret = rte_eth_dev_start(portid); + if (ret < 0) { + LOG(ERROR, "DPDK: failed to start interface %s (error %d)\n", + interface->name, ret); + return false; + } + + ret = rte_eth_promiscuous_enable(portid); + if (ret < 0) { + LOG(ERROR, "DPDK: failed to enable promiscuous mode for interface %s (error %d)\n", + interface->name, ret); + return false; + } + return true; } diff --git a/code/bngblaster/src/io/io_dpdk.h b/code/bngblaster/src/io/io_dpdk.h index 3aa511d1..b2326bff 100644 --- a/code/bngblaster/src/io/io_dpdk.h +++ b/code/bngblaster/src/io/io_dpdk.h @@ -16,4 +16,10 @@ typedef struct io_dpdk_ctx_ { bool io_dpdk_init(); +void +io_dpdk_close(); + +bool +io_dpdk_interface_init(bbl_interface_s *interface); + #endif \ No newline at end of file diff --git a/code/bngblaster/src/io/io_interface.c b/code/bngblaster/src/io/io_interface.c index 22dc0509..f4af3770 100644 --- a/code/bngblaster/src/io/io_interface.c +++ b/code/bngblaster/src/io/io_interface.c @@ -169,6 +169,14 @@ io_interface_init(bbl_interface_s *interface) { bbl_link_config_s *config = interface->config; +#ifdef BNGBLASTER_DPDK + if(config->io_mode == IO_MODE_DPDK) { + if(!io_dpdk_interface_init(interface)) { + return false; + } + } +#endif + if(config->io_mode != IO_MODE_DPDK) { if(!set_kernel_info(interface)) { return false; @@ -176,19 +184,15 @@ io_interface_init(bbl_interface_s *interface) if(!set_promisc(interface)) { return false; } - } - if(*(uint32_t*)config->mac) { - memcpy(interface->mac, config->mac, ETH_ADDR_LEN); - } - - /* RX */ - if(!io_interface_init_rx(interface)) { - return false; - } - - /* TX */ - if(!io_interface_init_tx(interface)) { - return false; + if(*(uint32_t*)config->mac) { + memcpy(interface->mac, config->mac, ETH_ADDR_LEN); + } + if(!io_interface_init_rx(interface)) { + return false; + } + if(!io_interface_init_tx(interface)) { + return false; + } } return true; } \ No newline at end of file