From 93c1f6cd69f91fee58ff2e488d06d40529391b05 Mon Sep 17 00:00:00 2001 From: Christian Giese Date: Mon, 8 Apr 2024 16:15:17 +0000 Subject: [PATCH] fix LAG member stream distribution --- code/bngblaster/src/bbl_lag.c | 33 ++++++++- code/bngblaster/src/bbl_lag.h | 3 +- code/bngblaster/src/bbl_stream.c | 109 ++++++++++------------------- code/bngblaster/src/bbl_stream.h | 2 +- code/bngblaster/src/io/io_bucket.c | 17 ++--- code/bngblaster/src/io/io_def.h | 3 +- 6 files changed, 80 insertions(+), 87 deletions(-) diff --git a/code/bngblaster/src/bbl_lag.c b/code/bngblaster/src/bbl_lag.c index deb9a663..9e0b42ab 100644 --- a/code/bngblaster/src/bbl_lag.c +++ b/code/bngblaster/src/bbl_lag.c @@ -126,9 +126,19 @@ static void bbl_lag_select(bbl_lag_s *lag) { bbl_lag_member_s *member; + bbl_stream_s *stream = lag->stream_head; + io_handle_s *io; + uint8_t active_count = 0; + uint8_t key; CIRCLEQ_FOREACH(member, &lag->lag_member_qhead, lag_member_qnode) { + io = member->interface->io.tx; + io->stream_pps = 0; + io->stream_count = 0; + io->stream_head = NULL; + io->stream_cur = NULL; + member->primary = false; if(member->interface->state != INTERFACE_DISABLED) { if(member->lacp_state == LACP_CURRENT && @@ -153,11 +163,27 @@ bbl_lag_select(bbl_lag_s *lag) if(active_count && active_count >= lag->config->lacp_min_active_links) { bbl_lag_update_state(lag, INTERFACE_UP); + /* Distribute streams */ + while(stream) { + key = stream->flow_id % active_count; + io = lag->active_list[key]->interface->io.tx; + stream->io = io; + stream->io_next = io->stream_head; + io->stream_head = stream; + io->stream_cur = stream; + io->stream_pps += stream->config->pps; + io->stream_count++; + stream = stream->lag_next; + } } else { bbl_lag_update_state(lag, INTERFACE_DOWN); + while(stream) { + stream->io = NULL; + stream->io_next = NULL; + stream = stream->lag_next; + } } lag->active_count = active_count; - lag->select++; } void @@ -188,6 +214,7 @@ bbl_lag_lacp_job(timer_s *timer) member->partner_state = 0; LOG(LAG, "LAG (%s) LACP defaulted on interface %s\n", member->lag->interface->name, interface->name); + bbl_lag_select(member->lag); } } } @@ -256,7 +283,6 @@ bbl_lag_interface_add(bbl_interface_s *interface, bbl_link_config_s *link_config member->lacp_state = LACP_DISABLED; lag->interface->state = INTERFACE_UP; lag->active_list[lag->active_count++] = member; - lag->select++; if(CIRCLEQ_EMPTY(&lag->lag_member_qhead)) { member->primary = true; } @@ -363,11 +389,12 @@ bbl_lag_json(bbl_lag_s *lag) } } - jobj_lag = json_pack("{si ss* ss* si si so*}", + jobj_lag = json_pack("{si ss* ss* si sI si so*}", "id", lag->id, "interface", lag->interface->name, "state", interface_state_string(lag->interface->state), "state-transitions", lag->interface->state_transitions, + "stream-count", lag->stream_count, "members-active", lag->active_count, "members", jobj_array); diff --git a/code/bngblaster/src/bbl_lag.h b/code/bngblaster/src/bbl_lag.h index cf34acc1..ffe47684 100644 --- a/code/bngblaster/src/bbl_lag.h +++ b/code/bngblaster/src/bbl_lag.h @@ -20,7 +20,8 @@ typedef struct bbl_lag_ uint8_t active_count; bbl_lag_member_s *active_list[LAG_MEMBER_ACTIVE_MAX]; - uint32_t select; + bbl_stream_s *stream_head; + uint32_t stream_count; CIRCLEQ_ENTRY(bbl_lag_) lag_qnode; CIRCLEQ_HEAD(lag_member_, bbl_lag_member_ ) lag_member_qhead; /* list of member interfaces */ diff --git a/code/bngblaster/src/bbl_stream.c b/code/bngblaster/src/bbl_stream.c index 35812fe8..30b9c7c0 100644 --- a/code/bngblaster/src/bbl_stream.c +++ b/code/bngblaster/src/bbl_stream.c @@ -1385,57 +1385,6 @@ bbl_stream_update_udp(bbl_stream_s *stream) } } -static bool -bbl_stream_lag(bbl_stream_s *stream) -{ - bbl_lag_s *lag = stream->tx_interface->lag; - bbl_stream_s *s; - io_handle_s *io, *io_new; - - uint8_t key; - - if(lag->active_count == 0) { - return false; - } - - if(lag->select != stream->lag_select) { - /* Redistribute streams if LAG state has changed. */ - stream->lag_select = lag->select; - key = stream->flow_id % lag->active_count; - io = stream->io; - io_new = lag->active_list[key]->interface->io.tx; - if(io_new != io) { - s = io->stream_head; - if(s == stream) { - /* Stream is head. */ - io->stream_head = s->io_next; - } else { - /* Remove stream from IO stream list. */ - while(s) { - if(s->io_next == stream) { - s->io_next = stream->io_next; - s = NULL; - } else { - s = s->io_next; - } - } - } - if(io->stream_count && (io->stream_pps >= stream->config->pps)) { - io->stream_pps -= stream->config->pps; - io->stream_count--; - } - stream->io = io_new; - stream->io_next = io_new->stream_head; - io_new->stream_head = stream; - io_new->stream_cur = stream; - io_new->stream_pps += stream->config->pps; - io_new->stream_count++; - return false; - } - } - return true; -} - static void bbl_stream_setup(bbl_stream_s *stream) { @@ -1473,13 +1422,6 @@ bbl_stream_io_send(io_handle_s *io, bbl_stream_s *stream) return STREAM_WAIT; } - if(stream->lag) { - if(!bbl_stream_lag(stream)) { - stream->tokens = 0; - return WRONG_PROTOCOL_STATE; - } - } - if(!(g_traffic && stream->enabled && stream->tx_interface->state == INTERFACE_UP)) { stream->tokens = 0; return STREAM_WAIT; @@ -1643,21 +1585,38 @@ bbl_stream_select_io_lag(bbl_stream_s *stream) bbl_lag_s *lag = stream->tx_interface->lag; bbl_lag_member_s *member; io_handle_s *io; + io_handle_s *io_iter; stream->lag = true; + stream->lag_next = lag->stream_head; + lag->stream_head = stream; + lag->stream_count++; + + if(lag->config->lacp_enable) { + /* With LACP enabled, member interface will be selected + * if LAG state becomes operational state UP. */ + return; + } + + /* Without LACP enabled, select member interface with lowest PPS. */ member = CIRCLEQ_FIRST(&lag->lag_member_qhead); - if(member) { - io = member->interface->io.tx; - stream->io = io; - stream->io_next = io->stream_head; - io->stream_head = stream; - io->stream_cur = stream; - io->stream_pps += stream->config->pps; - io->stream_count++; - } else { + if(!member) { LOG(ERROR, "Failed to add stream %s to LAG %s (no member interfaces)\n", stream->config->name, lag->interface->name); } + io = member->interface->io.tx; + CIRCLEQ_FOREACH(member, &lag->lag_member_qhead, lag_member_qnode) { + io_iter = member->interface->io.tx; + if(io_iter->stream_pps < io->stream_pps) { + io = io_iter; + } + } + stream->io = io; + stream->io_next = io->stream_head; + io->stream_head = stream; + io->stream_cur = stream; + io->stream_pps += stream->config->pps; + io->stream_count++; } static void @@ -2496,7 +2455,9 @@ json_t * bbl_stream_json(bbl_stream_s *stream, bool debug) { json_t *root = NULL; + io_handle_s *io = stream->io; char *tx_interface = NULL; + const char *tx_interface_state = NULL; char *rx_interface = NULL; char *src_address = NULL; char *dst_address = NULL; @@ -2509,6 +2470,7 @@ bbl_stream_json(bbl_stream_s *stream, bool debug) if(stream->tx_interface) { tx_interface = stream->tx_interface->name; + tx_interface_state = interface_state_string(stream->tx_interface->state); } if(stream->rx_access_interface) { rx_interface = stream->rx_access_interface->name; @@ -2542,7 +2504,7 @@ bbl_stream_json(bbl_stream_s *stream, bool debug) } if(stream->type == BBL_TYPE_UNICAST) { - root = json_pack("{sI ss* ss ss ss sb sb sb ss sI ss sI ss ss* ss* sI sI si si si si si sI sI sI sI sI sI sI sI sI sI sI sI sI sf sf sf sI sI sI }", + root = json_pack("{sI ss* ss ss ss sb sb sb ss sI ss sI ss ss* ss* ss* sI sI si si si si si sI sI sI sI sI sI sI sI sI sI sI sI sI sf sf sf sI sI sI }", "flow-id", stream->flow_id, "name", stream->config->name, "type", stream_type_string(stream), @@ -2557,6 +2519,7 @@ bbl_stream_json(bbl_stream_s *stream, bool debug) "destination-port", dst_port, "protocol", stream->tcp ? "tcp" : "udp", "tx-interface", tx_interface, + "tx-interface-state", tx_interface_state, "rx-interface", rx_interface, "rx-first-seq", stream->rx_first_seq, "rx-last-seq", stream->rx_last_seq, @@ -2615,12 +2578,12 @@ bbl_stream_json(bbl_stream_s *stream, bool debug) if(stream->reverse) { json_object_set(root, "reverse-flow-id", json_integer(stream->reverse->flow_id)); } - if(stream->lag && stream->io && stream->io->interface) { - json_object_set(root, "lag-member-interface", json_string(stream->io->interface->name)); - json_object_set(root, "lag-member-interface-state", json_string(interface_state_string(stream->io->interface->state))); + if(stream->lag && io && io->interface) { + json_object_set(root, "lag-member-interface", json_string(io->interface->name)); + json_object_set(root, "lag-member-interface-state", json_string(interface_state_string(io->interface->state))); } } else { - root = json_pack("{sI ss* ss ss ss sb sb ss* sI sI sI sI sf}", + root = json_pack("{sI ss* ss ss ss sb sb ss* ss* sI sI sI sI sf}", "flow-id", stream->flow_id, "name", stream->config->name, "type", stream_type_string(stream), @@ -2629,6 +2592,7 @@ bbl_stream_json(bbl_stream_s *stream, bool debug) "enabled", stream->enabled, "active", *(stream->endpoint) == ENDPOINT_ACTIVE ? true : false, "tx-interface", tx_interface, + "tx-interface-state", tx_interface_state, "tx-len", stream->tx_len, "tx-packets", stream->tx_packets - stream->reset_packets_tx, "tx-pps", stream->rate_packets_tx.avg, @@ -2643,7 +2607,6 @@ bbl_stream_json(bbl_stream_s *stream, bool debug) json_object_set(root, "debug-reset", json_boolean(stream->reset)); json_object_set(root, "debug-lag", json_boolean(stream->lag)); json_object_set(root, "debug-tx-pps-config", json_integer(stream->config->pps)); - json_object_set(root, "debug-tx-interface-state", json_string(interface_state_string(stream->tx_interface->state))); json_object_set(root, "debug-tx-packets-real", json_integer(stream->tx_packets)); json_object_set(root, "debug-tx-seq", json_integer(stream->flow_seq)); json_object_set(root, "debug-max-packets", json_integer(stream->max_packets)); diff --git a/code/bngblaster/src/bbl_stream.h b/code/bngblaster/src/bbl_stream.h index 70310898..80d496d0 100644 --- a/code/bngblaster/src/bbl_stream.h +++ b/code/bngblaster/src/bbl_stream.h @@ -137,6 +137,7 @@ typedef struct bbl_stream_ bbl_stream_s *next; /* Next stream (global) */ bbl_stream_s *io_next; /* Next stream of same IO handle */ bbl_stream_s *group_next; /* Next stream of same group */ + bbl_stream_s *lag_next; /* Next stream of same LAG group */ bbl_stream_s *session_next; /* Next stream of same session */ bbl_stream_s *reverse; /* Reverse stream direction */ @@ -160,7 +161,6 @@ typedef struct bbl_stream_ uint64_t tokens; uint64_t tokens_burst; uint64_t max_packets; - uint32_t lag_select; __time_t tx_first_epoch; diff --git a/code/bngblaster/src/io/io_bucket.c b/code/bngblaster/src/io/io_bucket.c index cd4ba403..f0dd3f6c 100644 --- a/code/bngblaster/src/io/io_bucket.c +++ b/code/bngblaster/src/io/io_bucket.c @@ -12,17 +12,18 @@ void io_tocken_job(timer_s *timer) { io_bucket_s *io_bucket = timer->data; - struct timespec time_elapsed; - uint64_t tokens; - if(unlikely(!io_bucket->timestamp_start.tv_sec)) { + if(io_bucket->started) { + struct timespec time_elapsed; + uint64_t tokens; + timespec_sub(&time_elapsed, timer->timestamp, &io_bucket->timestamp_start); + tokens = io_bucket->tokens_per_sec * time_elapsed.tv_sec; + tokens += (io_bucket->tokens_per_sec * time_elapsed.tv_nsec) / SEC; + io_bucket->tokens = tokens; + } else { + io_bucket->started = true; io_bucket->timestamp_start.tv_sec = timer->timestamp->tv_sec; io_bucket->timestamp_start.tv_nsec = timer->timestamp->tv_nsec; - return; } - timespec_sub(&time_elapsed, timer->timestamp, &io_bucket->timestamp_start); - tokens = io_bucket->tokens_per_sec * time_elapsed.tv_sec; - tokens += (io_bucket->tokens_per_sec * time_elapsed.tv_nsec) / SEC; - io_bucket->tokens = tokens; } static io_bucket_s * diff --git a/code/bngblaster/src/io/io_def.h b/code/bngblaster/src/io/io_def.h index 63294154..8044bce8 100644 --- a/code/bngblaster/src/io/io_def.h +++ b/code/bngblaster/src/io/io_def.h @@ -41,12 +41,13 @@ typedef enum { } __attribute__ ((__packed__)) io_mode_t; typedef struct io_bucket_ { + bool started; double pps; uint64_t tokens_per_sec; uint64_t tokens; + struct io_bucket_ *next; struct timer_ *timer; struct timespec timestamp_start; - struct io_bucket_ *next; } io_bucket_s; typedef struct io_handle_ {