AF_XDP-interaction: Move refill of fill-queue to after RX processing

The root cause of the slowdown on the first packet with timestamps,
as primary related to refilling the fill-queue, which happend before
process_packet.

Primary cause, as first packet still see a slowdown of around 4 usec
while it was around 9 usec before.

In commit 5df5332f23 ("AF_XDP-interaction: config AF_XDP frame_headroom")
the fill-queue size was increase to be larger.  This caused the
first call to handle_receive_packets() to refill too many frames
into the fill-queue.

Patch split out refill into function restock_receive_fill_queue()
which is now called *after* process_packet step, but before
releasing RX packets via xsk_ring_cons__release(&xsk->rx).

Signed-off-by: Jesper Dangaard Brouer <brouer@redhat.com>
This commit is contained in:
Jesper Dangaard Brouer
2021-11-04 13:49:20 +01:00
parent 6016a13008
commit 556d6fd47b
2 changed files with 36 additions and 14 deletions

View File

@@ -110,6 +110,11 @@ int xdp_sock_prog(struct xdp_md *ctx)
if (!pkt_count)
return XDP_ABORTED;
__u64 cnt = (*pkt_count)++;
// if (cnt == 0) {
// if (bpf_ktime_get_ns() == 42)
// return XDP_ABORTED;
// cnt++;
// }
/* Notice how two different xdp_hints meta-data are used */
if ((cnt % 2) == 0) {

View File

@@ -164,7 +164,8 @@ static struct xsk_umem_info *configure_xsk_umem(void *buffer, uint64_t size)
* allocated memory is used that only runs out in OOM situations
* that should be rare.
*/
.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
// .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, /* Fix later */
.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
.frame_size = FRAME_SIZE,
/* Notice XSK_UMEM__DEFAULT_FRAME_HEADROOM is zero */
@@ -526,8 +527,8 @@ static bool process_packet(struct xsk_socket_info *xsk,
}
print_meta_info_via_btf(pkt);
if (debug)
printf("XXX addr:0x%lX pkt_ptr:0x%p\n", addr, pkt);
//if (debug)
// printf("XXX addr:0x%lX pkt_ptr:0x%p\n", addr, pkt);
if (debug_pkt)
print_pkt_info(pkt, len);
@@ -593,15 +594,14 @@ static bool process_packet(struct xsk_socket_info *xsk,
return false;
}
static void handle_receive_packets(struct xsk_socket_info *xsk)
void restock_receive_fill_queue(struct xsk_socket_info *xsk)
{
unsigned int rcvd, stock_frames, i;
uint32_t idx_rx = 0, idx_fq = 0;
unsigned int i, stock_frames;
uint32_t idx_fq = 0;
int ret;
rcvd = xsk_ring_cons__peek(&xsk->rx, RX_BATCH_SIZE, &idx_rx);
if (!rcvd)
return;
int free_frames = xsk_umem_free_frames(xsk);
__u64 start = gettime();
/* Stuff the ring with as much frames as possible */
stock_frames = xsk_prod_nb_free(&xsk->umem->fq,
@@ -613,9 +613,11 @@ static void handle_receive_packets(struct xsk_socket_info *xsk)
&idx_fq);
/* This should not happen, but just in case */
while (ret != stock_frames)
ret = xsk_ring_prod__reserve(&xsk->umem->fq, rcvd,
&idx_fq);
if (ret != stock_frames) {
printf("XXX %s() should not happen (%d vs %d)\n", __func__,
stock_frames, ret);
stock_frames = ret;
}
for (i = 0; i < stock_frames; i++)
*xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) =
@@ -623,6 +625,20 @@ static void handle_receive_packets(struct xsk_socket_info *xsk)
xsk_ring_prod__submit(&xsk->umem->fq, stock_frames);
}
__u64 now = gettime();
if (debug && (stock_frames || free_frames))
printf("XXX stock_frame:%d free_frames:%d cost of xsk_prod_nb_free() %llu ns\n",
stock_frames, free_frames, now - start);
}
static void handle_receive_packets(struct xsk_socket_info *xsk)
{
unsigned int rcvd, i;
uint32_t idx_rx = 0;
rcvd = xsk_ring_cons__peek(&xsk->rx, RX_BATCH_SIZE, &idx_rx);
if (!rcvd)
return;
/* Process received packets */
for (i = 0; i < rcvd; i++) {
@@ -634,10 +650,11 @@ static void handle_receive_packets(struct xsk_socket_info *xsk)
xsk->stats.rx_bytes += len;
}
xsk_ring_cons__release(&xsk->rx, rcvd);
xsk->stats.rx_packets += rcvd;
restock_receive_fill_queue(xsk);
xsk_ring_cons__release(&xsk->rx, rcvd);
/* Do we need to wake up the kernel for transmission */
complete_tx(xsk);
}