From 556d6fd47b57f10d8dc93ff53126e2fa0903f512 Mon Sep 17 00:00:00 2001 From: Jesper Dangaard Brouer Date: Thu, 4 Nov 2021 13:49:20 +0100 Subject: [PATCH] AF_XDP-interaction: Move refill of fill-queue to after RX processing The root cause of the slowdown on the first packet with timestamps, as primary related to refilling the fill-queue, which happend before process_packet. Primary cause, as first packet still see a slowdown of around 4 usec while it was around 9 usec before. In commit 5df5332f235e ("AF_XDP-interaction: config AF_XDP frame_headroom") the fill-queue size was increase to be larger. This caused the first call to handle_receive_packets() to refill too many frames into the fill-queue. Patch split out refill into function restock_receive_fill_queue() which is now called *after* process_packet step, but before releasing RX packets via xsk_ring_cons__release(&xsk->rx). Signed-off-by: Jesper Dangaard Brouer --- AF_XDP-interaction/af_xdp_kern.c | 5 ++++ AF_XDP-interaction/af_xdp_user.c | 45 ++++++++++++++++++++++---------- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/AF_XDP-interaction/af_xdp_kern.c b/AF_XDP-interaction/af_xdp_kern.c index 3ac0f9f..57efb21 100644 --- a/AF_XDP-interaction/af_xdp_kern.c +++ b/AF_XDP-interaction/af_xdp_kern.c @@ -110,6 +110,11 @@ int xdp_sock_prog(struct xdp_md *ctx) if (!pkt_count) return XDP_ABORTED; __u64 cnt = (*pkt_count)++; +// if (cnt == 0) { +// if (bpf_ktime_get_ns() == 42) +// return XDP_ABORTED; +// cnt++; +// } /* Notice how two different xdp_hints meta-data are used */ if ((cnt % 2) == 0) { diff --git a/AF_XDP-interaction/af_xdp_user.c b/AF_XDP-interaction/af_xdp_user.c index 049957b..8517136 100644 --- a/AF_XDP-interaction/af_xdp_user.c +++ b/AF_XDP-interaction/af_xdp_user.c @@ -164,7 +164,8 @@ static struct xsk_umem_info *configure_xsk_umem(void *buffer, uint64_t size) * allocated memory is used that only runs out in OOM situations * that should be rare. */ - .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2, +// .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2, + .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, /* Fix later */ .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, .frame_size = FRAME_SIZE, /* Notice XSK_UMEM__DEFAULT_FRAME_HEADROOM is zero */ @@ -526,8 +527,8 @@ static bool process_packet(struct xsk_socket_info *xsk, } print_meta_info_via_btf(pkt); - if (debug) - printf("XXX addr:0x%lX pkt_ptr:0x%p\n", addr, pkt); + //if (debug) + // printf("XXX addr:0x%lX pkt_ptr:0x%p\n", addr, pkt); if (debug_pkt) print_pkt_info(pkt, len); @@ -593,15 +594,14 @@ static bool process_packet(struct xsk_socket_info *xsk, return false; } -static void handle_receive_packets(struct xsk_socket_info *xsk) +void restock_receive_fill_queue(struct xsk_socket_info *xsk) { - unsigned int rcvd, stock_frames, i; - uint32_t idx_rx = 0, idx_fq = 0; + unsigned int i, stock_frames; + uint32_t idx_fq = 0; int ret; - rcvd = xsk_ring_cons__peek(&xsk->rx, RX_BATCH_SIZE, &idx_rx); - if (!rcvd) - return; + int free_frames = xsk_umem_free_frames(xsk); + __u64 start = gettime(); /* Stuff the ring with as much frames as possible */ stock_frames = xsk_prod_nb_free(&xsk->umem->fq, @@ -613,9 +613,11 @@ static void handle_receive_packets(struct xsk_socket_info *xsk) &idx_fq); /* This should not happen, but just in case */ - while (ret != stock_frames) - ret = xsk_ring_prod__reserve(&xsk->umem->fq, rcvd, - &idx_fq); + if (ret != stock_frames) { + printf("XXX %s() should not happen (%d vs %d)\n", __func__, + stock_frames, ret); + stock_frames = ret; + } for (i = 0; i < stock_frames; i++) *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) = @@ -623,6 +625,20 @@ static void handle_receive_packets(struct xsk_socket_info *xsk) xsk_ring_prod__submit(&xsk->umem->fq, stock_frames); } + __u64 now = gettime(); + if (debug && (stock_frames || free_frames)) + printf("XXX stock_frame:%d free_frames:%d cost of xsk_prod_nb_free() %llu ns\n", + stock_frames, free_frames, now - start); +} + +static void handle_receive_packets(struct xsk_socket_info *xsk) +{ + unsigned int rcvd, i; + uint32_t idx_rx = 0; + + rcvd = xsk_ring_cons__peek(&xsk->rx, RX_BATCH_SIZE, &idx_rx); + if (!rcvd) + return; /* Process received packets */ for (i = 0; i < rcvd; i++) { @@ -634,10 +650,11 @@ static void handle_receive_packets(struct xsk_socket_info *xsk) xsk->stats.rx_bytes += len; } - - xsk_ring_cons__release(&xsk->rx, rcvd); xsk->stats.rx_packets += rcvd; + restock_receive_fill_queue(xsk); + xsk_ring_cons__release(&xsk->rx, rcvd); + /* Do we need to wake up the kernel for transmission */ complete_tx(xsk); }