1
0
mirror of https://github.com/rtbrick/bngblaster.git synced 2024-05-06 15:54:57 +00:00

fix LAG member stream distribution

This commit is contained in:
Christian Giese
2024-04-08 16:15:17 +00:00
parent b2bd029434
commit 93c1f6cd69
6 changed files with 80 additions and 87 deletions

View File

@@ -126,9 +126,19 @@ static void
bbl_lag_select(bbl_lag_s *lag)
{
bbl_lag_member_s *member;
bbl_stream_s *stream = lag->stream_head;
io_handle_s *io;
uint8_t active_count = 0;
uint8_t key;
CIRCLEQ_FOREACH(member, &lag->lag_member_qhead, lag_member_qnode) {
io = member->interface->io.tx;
io->stream_pps = 0;
io->stream_count = 0;
io->stream_head = NULL;
io->stream_cur = NULL;
member->primary = false;
if(member->interface->state != INTERFACE_DISABLED) {
if(member->lacp_state == LACP_CURRENT &&
@@ -153,11 +163,27 @@ bbl_lag_select(bbl_lag_s *lag)
if(active_count &&
active_count >= lag->config->lacp_min_active_links) {
bbl_lag_update_state(lag, INTERFACE_UP);
/* Distribute streams */
while(stream) {
key = stream->flow_id % active_count;
io = lag->active_list[key]->interface->io.tx;
stream->io = io;
stream->io_next = io->stream_head;
io->stream_head = stream;
io->stream_cur = stream;
io->stream_pps += stream->config->pps;
io->stream_count++;
stream = stream->lag_next;
}
} else {
bbl_lag_update_state(lag, INTERFACE_DOWN);
while(stream) {
stream->io = NULL;
stream->io_next = NULL;
stream = stream->lag_next;
}
}
lag->active_count = active_count;
lag->select++;
}
void
@@ -188,6 +214,7 @@ bbl_lag_lacp_job(timer_s *timer)
member->partner_state = 0;
LOG(LAG, "LAG (%s) LACP defaulted on interface %s\n",
member->lag->interface->name, interface->name);
bbl_lag_select(member->lag);
}
}
}
@@ -256,7 +283,6 @@ bbl_lag_interface_add(bbl_interface_s *interface, bbl_link_config_s *link_config
member->lacp_state = LACP_DISABLED;
lag->interface->state = INTERFACE_UP;
lag->active_list[lag->active_count++] = member;
lag->select++;
if(CIRCLEQ_EMPTY(&lag->lag_member_qhead)) {
member->primary = true;
}
@@ -363,11 +389,12 @@ bbl_lag_json(bbl_lag_s *lag)
}
}
jobj_lag = json_pack("{si ss* ss* si si so*}",
jobj_lag = json_pack("{si ss* ss* si sI si so*}",
"id", lag->id,
"interface", lag->interface->name,
"state", interface_state_string(lag->interface->state),
"state-transitions", lag->interface->state_transitions,
"stream-count", lag->stream_count,
"members-active", lag->active_count,
"members", jobj_array);

View File

@@ -20,7 +20,8 @@ typedef struct bbl_lag_
uint8_t active_count;
bbl_lag_member_s *active_list[LAG_MEMBER_ACTIVE_MAX];
uint32_t select;
bbl_stream_s *stream_head;
uint32_t stream_count;
CIRCLEQ_ENTRY(bbl_lag_) lag_qnode;
CIRCLEQ_HEAD(lag_member_, bbl_lag_member_ ) lag_member_qhead; /* list of member interfaces */

View File

@@ -1385,57 +1385,6 @@ bbl_stream_update_udp(bbl_stream_s *stream)
}
}
static bool
bbl_stream_lag(bbl_stream_s *stream)
{
bbl_lag_s *lag = stream->tx_interface->lag;
bbl_stream_s *s;
io_handle_s *io, *io_new;
uint8_t key;
if(lag->active_count == 0) {
return false;
}
if(lag->select != stream->lag_select) {
/* Redistribute streams if LAG state has changed. */
stream->lag_select = lag->select;
key = stream->flow_id % lag->active_count;
io = stream->io;
io_new = lag->active_list[key]->interface->io.tx;
if(io_new != io) {
s = io->stream_head;
if(s == stream) {
/* Stream is head. */
io->stream_head = s->io_next;
} else {
/* Remove stream from IO stream list. */
while(s) {
if(s->io_next == stream) {
s->io_next = stream->io_next;
s = NULL;
} else {
s = s->io_next;
}
}
}
if(io->stream_count && (io->stream_pps >= stream->config->pps)) {
io->stream_pps -= stream->config->pps;
io->stream_count--;
}
stream->io = io_new;
stream->io_next = io_new->stream_head;
io_new->stream_head = stream;
io_new->stream_cur = stream;
io_new->stream_pps += stream->config->pps;
io_new->stream_count++;
return false;
}
}
return true;
}
static void
bbl_stream_setup(bbl_stream_s *stream)
{
@@ -1473,13 +1422,6 @@ bbl_stream_io_send(io_handle_s *io, bbl_stream_s *stream)
return STREAM_WAIT;
}
if(stream->lag) {
if(!bbl_stream_lag(stream)) {
stream->tokens = 0;
return WRONG_PROTOCOL_STATE;
}
}
if(!(g_traffic && stream->enabled && stream->tx_interface->state == INTERFACE_UP)) {
stream->tokens = 0;
return STREAM_WAIT;
@@ -1643,21 +1585,38 @@ bbl_stream_select_io_lag(bbl_stream_s *stream)
bbl_lag_s *lag = stream->tx_interface->lag;
bbl_lag_member_s *member;
io_handle_s *io;
io_handle_s *io_iter;
stream->lag = true;
stream->lag_next = lag->stream_head;
lag->stream_head = stream;
lag->stream_count++;
if(lag->config->lacp_enable) {
/* With LACP enabled, member interface will be selected
* if LAG state becomes operational state UP. */
return;
}
/* Without LACP enabled, select member interface with lowest PPS. */
member = CIRCLEQ_FIRST(&lag->lag_member_qhead);
if(member) {
io = member->interface->io.tx;
stream->io = io;
stream->io_next = io->stream_head;
io->stream_head = stream;
io->stream_cur = stream;
io->stream_pps += stream->config->pps;
io->stream_count++;
} else {
if(!member) {
LOG(ERROR, "Failed to add stream %s to LAG %s (no member interfaces)\n",
stream->config->name, lag->interface->name);
}
io = member->interface->io.tx;
CIRCLEQ_FOREACH(member, &lag->lag_member_qhead, lag_member_qnode) {
io_iter = member->interface->io.tx;
if(io_iter->stream_pps < io->stream_pps) {
io = io_iter;
}
}
stream->io = io;
stream->io_next = io->stream_head;
io->stream_head = stream;
io->stream_cur = stream;
io->stream_pps += stream->config->pps;
io->stream_count++;
}
static void
@@ -2496,7 +2455,9 @@ json_t *
bbl_stream_json(bbl_stream_s *stream, bool debug)
{
json_t *root = NULL;
io_handle_s *io = stream->io;
char *tx_interface = NULL;
const char *tx_interface_state = NULL;
char *rx_interface = NULL;
char *src_address = NULL;
char *dst_address = NULL;
@@ -2509,6 +2470,7 @@ bbl_stream_json(bbl_stream_s *stream, bool debug)
if(stream->tx_interface) {
tx_interface = stream->tx_interface->name;
tx_interface_state = interface_state_string(stream->tx_interface->state);
}
if(stream->rx_access_interface) {
rx_interface = stream->rx_access_interface->name;
@@ -2542,7 +2504,7 @@ bbl_stream_json(bbl_stream_s *stream, bool debug)
}
if(stream->type == BBL_TYPE_UNICAST) {
root = json_pack("{sI ss* ss ss ss sb sb sb ss sI ss sI ss ss* ss* sI sI si si si si si sI sI sI sI sI sI sI sI sI sI sI sI sI sf sf sf sI sI sI }",
root = json_pack("{sI ss* ss ss ss sb sb sb ss sI ss sI ss ss* ss* ss* sI sI si si si si si sI sI sI sI sI sI sI sI sI sI sI sI sI sf sf sf sI sI sI }",
"flow-id", stream->flow_id,
"name", stream->config->name,
"type", stream_type_string(stream),
@@ -2557,6 +2519,7 @@ bbl_stream_json(bbl_stream_s *stream, bool debug)
"destination-port", dst_port,
"protocol", stream->tcp ? "tcp" : "udp",
"tx-interface", tx_interface,
"tx-interface-state", tx_interface_state,
"rx-interface", rx_interface,
"rx-first-seq", stream->rx_first_seq,
"rx-last-seq", stream->rx_last_seq,
@@ -2615,12 +2578,12 @@ bbl_stream_json(bbl_stream_s *stream, bool debug)
if(stream->reverse) {
json_object_set(root, "reverse-flow-id", json_integer(stream->reverse->flow_id));
}
if(stream->lag && stream->io && stream->io->interface) {
json_object_set(root, "lag-member-interface", json_string(stream->io->interface->name));
json_object_set(root, "lag-member-interface-state", json_string(interface_state_string(stream->io->interface->state)));
if(stream->lag && io && io->interface) {
json_object_set(root, "lag-member-interface", json_string(io->interface->name));
json_object_set(root, "lag-member-interface-state", json_string(interface_state_string(io->interface->state)));
}
} else {
root = json_pack("{sI ss* ss ss ss sb sb ss* sI sI sI sI sf}",
root = json_pack("{sI ss* ss ss ss sb sb ss* ss* sI sI sI sI sf}",
"flow-id", stream->flow_id,
"name", stream->config->name,
"type", stream_type_string(stream),
@@ -2629,6 +2592,7 @@ bbl_stream_json(bbl_stream_s *stream, bool debug)
"enabled", stream->enabled,
"active", *(stream->endpoint) == ENDPOINT_ACTIVE ? true : false,
"tx-interface", tx_interface,
"tx-interface-state", tx_interface_state,
"tx-len", stream->tx_len,
"tx-packets", stream->tx_packets - stream->reset_packets_tx,
"tx-pps", stream->rate_packets_tx.avg,
@@ -2643,7 +2607,6 @@ bbl_stream_json(bbl_stream_s *stream, bool debug)
json_object_set(root, "debug-reset", json_boolean(stream->reset));
json_object_set(root, "debug-lag", json_boolean(stream->lag));
json_object_set(root, "debug-tx-pps-config", json_integer(stream->config->pps));
json_object_set(root, "debug-tx-interface-state", json_string(interface_state_string(stream->tx_interface->state)));
json_object_set(root, "debug-tx-packets-real", json_integer(stream->tx_packets));
json_object_set(root, "debug-tx-seq", json_integer(stream->flow_seq));
json_object_set(root, "debug-max-packets", json_integer(stream->max_packets));

View File

@@ -137,6 +137,7 @@ typedef struct bbl_stream_
bbl_stream_s *next; /* Next stream (global) */
bbl_stream_s *io_next; /* Next stream of same IO handle */
bbl_stream_s *group_next; /* Next stream of same group */
bbl_stream_s *lag_next; /* Next stream of same LAG group */
bbl_stream_s *session_next; /* Next stream of same session */
bbl_stream_s *reverse; /* Reverse stream direction */
@@ -160,7 +161,6 @@ typedef struct bbl_stream_
uint64_t tokens;
uint64_t tokens_burst;
uint64_t max_packets;
uint32_t lag_select;
__time_t tx_first_epoch;

View File

@@ -12,17 +12,18 @@ void
io_tocken_job(timer_s *timer)
{
io_bucket_s *io_bucket = timer->data;
struct timespec time_elapsed;
uint64_t tokens;
if(unlikely(!io_bucket->timestamp_start.tv_sec)) {
if(io_bucket->started) {
struct timespec time_elapsed;
uint64_t tokens;
timespec_sub(&time_elapsed, timer->timestamp, &io_bucket->timestamp_start);
tokens = io_bucket->tokens_per_sec * time_elapsed.tv_sec;
tokens += (io_bucket->tokens_per_sec * time_elapsed.tv_nsec) / SEC;
io_bucket->tokens = tokens;
} else {
io_bucket->started = true;
io_bucket->timestamp_start.tv_sec = timer->timestamp->tv_sec;
io_bucket->timestamp_start.tv_nsec = timer->timestamp->tv_nsec;
return;
}
timespec_sub(&time_elapsed, timer->timestamp, &io_bucket->timestamp_start);
tokens = io_bucket->tokens_per_sec * time_elapsed.tv_sec;
tokens += (io_bucket->tokens_per_sec * time_elapsed.tv_nsec) / SEC;
io_bucket->tokens = tokens;
}
static io_bucket_s *

View File

@@ -41,12 +41,13 @@ typedef enum {
} __attribute__ ((__packed__)) io_mode_t;
typedef struct io_bucket_ {
bool started;
double pps;
uint64_t tokens_per_sec;
uint64_t tokens;
struct io_bucket_ *next;
struct timer_ *timer;
struct timespec timestamp_start;
struct io_bucket_ *next;
} io_bucket_s;
typedef struct io_handle_ {