mirror of
https://github.com/rtbrick/bngblaster.git
synced 2024-05-06 15:54:57 +00:00
stream enhancements/fixes
+ fix stream-reset + fix stream loss calc + enhance stream delay calc + ...
This commit is contained in:
@@ -244,9 +244,7 @@ bbl_a10nsp_pppoes_handler(bbl_interface_s *interface,
|
||||
|
||||
bbl_stream *stream;
|
||||
void **search = NULL;
|
||||
|
||||
struct timespec delay;
|
||||
uint64_t delay_nsec;
|
||||
uint64_t loss;
|
||||
|
||||
switch(pppoes->protocol) {
|
||||
case PROTOCOL_LCP:
|
||||
@@ -270,46 +268,42 @@ bbl_a10nsp_pppoes_handler(bbl_interface_s *interface,
|
||||
search = dict_search(ctx->stream_flow_dict, &bbl->flow_id);
|
||||
if(search) {
|
||||
stream = *search;
|
||||
stream->packets_rx++;
|
||||
stream->rx_len = eth->length;
|
||||
stream->rx_priority = ipv4->tos;
|
||||
stream->rx_outer_vlan_pbit = eth->vlan_outer_priority;
|
||||
stream->rx_inner_vlan_pbit = eth->vlan_inner_priority;
|
||||
timespec_sub(&delay, ð->timestamp, &bbl->timestamp);
|
||||
delay_nsec = delay.tv_sec * 1e9 + delay.tv_nsec;
|
||||
if(delay_nsec > stream->max_delay_ns) {
|
||||
stream->max_delay_ns = delay_nsec;
|
||||
}
|
||||
if(stream->min_delay_ns) {
|
||||
if(delay_nsec < stream->min_delay_ns) {
|
||||
stream->min_delay_ns = delay_nsec;
|
||||
if(stream->rx_first_seq) {
|
||||
/* Stream already verified */
|
||||
if((stream->rx_last_seq +1) < bbl->flow_seq) {
|
||||
loss = bbl->flow_seq - (stream->rx_last_seq +1);
|
||||
stream->loss += loss;
|
||||
interface->stats.stream_loss += loss;
|
||||
LOG(LOSS, "LOSS flow: %lu seq: %lu last: %lu\n",
|
||||
bbl->flow_id, bbl->flow_seq, stream->rx_last_seq);
|
||||
}
|
||||
} else {
|
||||
stream->min_delay_ns = delay_nsec;
|
||||
}
|
||||
if(!stream->rx_first_seq) {
|
||||
/* Verify stream ... */
|
||||
stream->rx_len = eth->length;
|
||||
stream->rx_priority = ipv4->tos;
|
||||
stream->rx_outer_vlan_pbit = eth->vlan_outer_priority;
|
||||
stream->rx_inner_vlan_pbit = eth->vlan_inner_priority;
|
||||
stream->rx_first_seq = bbl->flow_seq;
|
||||
interface->ctx->stats.stream_traffic_flows_verified++;
|
||||
if(interface->ctx->config.traffic_stop_verified) {
|
||||
ctx->stats.stream_traffic_flows_verified++;
|
||||
if(ctx->config.traffic_stop_verified) {
|
||||
stream->stop = true;
|
||||
}
|
||||
} else {
|
||||
if(stream->rx_last_seq +1 != bbl->flow_seq) {
|
||||
stream->loss++;
|
||||
}
|
||||
}
|
||||
stream->packets_rx++;
|
||||
stream->rx_last_seq = bbl->flow_seq;
|
||||
bbl_stream_delay(stream, ð->timestamp, &bbl->timestamp);
|
||||
} else {
|
||||
if(bbl->flow_id == session->access_ipv4_tx_flow_id) {
|
||||
interface->stats.session_ipv4_rx++;
|
||||
session->stats.network_ipv4_rx++;
|
||||
if(!session->network_ipv4_rx_first_seq) {
|
||||
session->network_ipv4_rx_first_seq = bbl->flow_seq;
|
||||
interface->ctx->stats.session_traffic_flows_verified++;
|
||||
ctx->stats.session_traffic_flows_verified++;
|
||||
} else {
|
||||
if(session->network_ipv4_rx_last_seq+1 != bbl->flow_seq) {
|
||||
interface->stats.session_ipv4_loss++;
|
||||
session->stats.network_ipv4_loss++;
|
||||
if((session->network_ipv4_rx_last_seq +1) < bbl->flow_seq) {
|
||||
loss = bbl->flow_seq - (session->network_ipv4_rx_last_seq +1);
|
||||
session->stats.network_ipv4_loss += loss;
|
||||
interface->stats.session_ipv4_loss += loss;
|
||||
LOG(LOSS, "LOSS (ID: %u) flow: %lu seq: %lu last: %lu\n",
|
||||
session->session_id, bbl->flow_id, bbl->flow_seq, session->network_ipv4_rx_last_seq);
|
||||
}
|
||||
|
||||
@@ -986,21 +986,21 @@ bbl_ctrl_session_streams(int fd, bbl_ctx_s *ctx, uint32_t session_id, json_t* ar
|
||||
stream = stream->next;
|
||||
}
|
||||
root = json_pack("{ss si s{si si si si si si si si si sf sf so*}}",
|
||||
"status", "ok",
|
||||
"code", 200,
|
||||
"session-streams",
|
||||
"session-id", session->session_id,
|
||||
"rx-packets", session->stats.packets_rx,
|
||||
"tx-packets", session->stats.packets_tx,
|
||||
"rx-accounting-packets", session->stats.accounting_packets_rx,
|
||||
"tx-accounting-packets", session->stats.accounting_packets_tx,
|
||||
"rx-pps", session->stats.rate_packets_rx.avg,
|
||||
"tx-pps", session->stats.rate_packets_tx.avg,
|
||||
"rx-bps-l2", session->stats.rate_bytes_rx.avg * 8,
|
||||
"tx-bps-l2", session->stats.rate_bytes_tx.avg * 8,
|
||||
"rx-mbps-l2", (double)(session->stats.rate_bytes_rx.avg * 8) / 1000000.0,
|
||||
"tx-mbps-l2", (double)(session->stats.rate_bytes_tx.avg * 8) / 1000000.0,
|
||||
"streams", json_streams);
|
||||
"status", "ok",
|
||||
"code", 200,
|
||||
"session-streams",
|
||||
"session-id", session->session_id,
|
||||
"rx-packets", session->stats.packets_rx,
|
||||
"tx-packets", session->stats.packets_tx,
|
||||
"rx-accounting-packets", session->stats.accounting_packets_rx,
|
||||
"tx-accounting-packets", session->stats.accounting_packets_tx,
|
||||
"rx-pps", session->stats.rate_packets_rx.avg,
|
||||
"tx-pps", session->stats.rate_packets_tx.avg,
|
||||
"rx-bps-l2", session->stats.rate_bytes_rx.avg * 8,
|
||||
"tx-bps-l2", session->stats.rate_bytes_tx.avg * 8,
|
||||
"rx-mbps-l2", (double)(session->stats.rate_bytes_rx.avg * 8) / 1000000.0,
|
||||
"tx-mbps-l2", (double)(session->stats.rate_bytes_tx.avg * 8) / 1000000.0,
|
||||
"streams", json_streams);
|
||||
|
||||
if(root) {
|
||||
result = json_dumpfd(root, fd, 0);
|
||||
@@ -1073,11 +1073,10 @@ bbl_ctrl_stream_reset(int fd, bbl_ctx_s *ctx, uint32_t session_id __attribute__(
|
||||
stream->thread.thread->bytes_tx_last_sync = 0;
|
||||
}
|
||||
|
||||
stream->flow_seq = 0;
|
||||
stream->flow_seq = 1;
|
||||
stream->rx_first_seq = 0;
|
||||
stream->rx_last_seq = 0;
|
||||
stream->stop = false;
|
||||
|
||||
stream->packets_tx = 0;
|
||||
stream->packets_rx = 0;
|
||||
stream->packets_tx_last_sync = 0;
|
||||
|
||||
@@ -908,14 +908,8 @@ bbl_l2tp_data_rx(bbl_ethernet_header_t *eth, bbl_l2tp_t *l2tp, bbl_interface_s *
|
||||
|
||||
bbl_stream *stream;
|
||||
void **search = NULL;
|
||||
|
||||
struct timespec delay;
|
||||
uint64_t delay_nsec;
|
||||
uint64_t loss;
|
||||
|
||||
UNUSED(ctx);
|
||||
UNUSED(eth);
|
||||
|
||||
if(l2tp_session->state != BBL_L2TP_SESSION_ESTABLISHED) {
|
||||
return;
|
||||
}
|
||||
@@ -1037,35 +1031,30 @@ bbl_l2tp_data_rx(bbl_ethernet_header_t *eth, bbl_l2tp_t *l2tp, bbl_interface_s *
|
||||
search = dict_search(ctx->stream_flow_dict, &bbl->flow_id);
|
||||
if(search) {
|
||||
stream = *search;
|
||||
stream->packets_rx++;
|
||||
stream->rx_len = eth->length;
|
||||
stream->rx_priority = ipv4->tos;
|
||||
stream->rx_outer_vlan_pbit = eth->vlan_outer_priority;
|
||||
stream->rx_inner_vlan_pbit = eth->vlan_inner_priority;
|
||||
timespec_sub(&delay, ð->timestamp, &bbl->timestamp);
|
||||
delay_nsec = delay.tv_sec * 1e9 + delay.tv_nsec;
|
||||
if(delay_nsec > stream->max_delay_ns) {
|
||||
stream->max_delay_ns = delay_nsec;
|
||||
}
|
||||
if(stream->min_delay_ns) {
|
||||
if(delay_nsec < stream->min_delay_ns) {
|
||||
stream->min_delay_ns = delay_nsec;
|
||||
if(stream->rx_first_seq) {
|
||||
/* Stream already verified */
|
||||
if((stream->rx_last_seq +1) < bbl->flow_seq) {
|
||||
loss = bbl->flow_seq - (stream->rx_last_seq +1);
|
||||
stream->loss += loss;
|
||||
interface->stats.stream_loss += loss;
|
||||
LOG(LOSS, "LOSS flow: %lu seq: %lu last: %lu\n",
|
||||
bbl->flow_id, bbl->flow_seq, stream->rx_last_seq);
|
||||
}
|
||||
} else {
|
||||
stream->min_delay_ns = delay_nsec;
|
||||
}
|
||||
if(!stream->rx_first_seq) {
|
||||
/* Verify stream ... */
|
||||
stream->rx_len = eth->length;
|
||||
stream->rx_priority = ipv4->tos;
|
||||
stream->rx_outer_vlan_pbit = eth->vlan_outer_priority;
|
||||
stream->rx_inner_vlan_pbit = eth->vlan_inner_priority;
|
||||
stream->rx_first_seq = bbl->flow_seq;
|
||||
interface->ctx->stats.stream_traffic_flows_verified++;
|
||||
if(interface->ctx->config.traffic_stop_verified) {
|
||||
ctx->stats.stream_traffic_flows_verified++;
|
||||
if(ctx->config.traffic_stop_verified) {
|
||||
stream->stop = true;
|
||||
}
|
||||
} else {
|
||||
if(stream->rx_last_seq +1 != bbl->flow_seq) {
|
||||
stream->loss++;
|
||||
}
|
||||
}
|
||||
stream->packets_rx++;
|
||||
stream->rx_last_seq = bbl->flow_seq;
|
||||
bbl_stream_delay(stream, ð->timestamp, &bbl->timestamp);
|
||||
} else {
|
||||
if(l2tp_session->pppoe_session) {
|
||||
pppoe_session = l2tp_session->pppoe_session;
|
||||
@@ -1074,8 +1063,8 @@ bbl_l2tp_data_rx(bbl_ethernet_header_t *eth, bbl_l2tp_t *l2tp, bbl_interface_s *
|
||||
pppoe_session->stats.network_ipv4_rx++;
|
||||
if(!pppoe_session->network_ipv4_rx_first_seq) {
|
||||
pppoe_session->network_ipv4_rx_first_seq = bbl->flow_seq;
|
||||
interface->ctx->stats.session_traffic_flows_verified++;
|
||||
pppoe_session->session_traffic_flows_verified++;
|
||||
ctx->stats.session_traffic_flows_verified++;
|
||||
} else {
|
||||
if((pppoe_session->network_ipv4_rx_last_seq +1) < bbl->flow_seq) {
|
||||
loss = bbl->flow_seq - (pppoe_session->network_ipv4_rx_last_seq +1);
|
||||
|
||||
@@ -287,66 +287,68 @@ bbl_igmp_initial_join(timer_s *timer)
|
||||
static void
|
||||
bbl_rx_stream(bbl_interface_s *interface, bbl_ethernet_header_t *eth, bbl_bbl_t *bbl, uint8_t tos) {
|
||||
|
||||
bbl_ctx_s *ctx = interface->ctx;
|
||||
bbl_stream *stream;
|
||||
bbl_mpls_t *mpls;
|
||||
void **search = NULL;
|
||||
|
||||
struct timespec delay;
|
||||
uint64_t delay_nsec;
|
||||
|
||||
uint64_t loss;
|
||||
|
||||
bbl_mpls_t *mpls;
|
||||
|
||||
search = dict_search(interface->ctx->stream_flow_dict, &bbl->flow_id);
|
||||
interface->stats.stream_rx++;
|
||||
search = dict_search(ctx->stream_flow_dict, &bbl->flow_id);
|
||||
if(search) {
|
||||
stream = *search;
|
||||
interface->stats.stream_rx++;
|
||||
stream->packets_rx++;
|
||||
stream->rx_len = eth->length;
|
||||
stream->rx_priority = tos;
|
||||
stream->rx_outer_vlan_pbit = eth->vlan_outer_priority;
|
||||
stream->rx_inner_vlan_pbit = eth->vlan_inner_priority;
|
||||
mpls = eth->mpls;
|
||||
if(mpls) {
|
||||
stream->rx_mpls1 = true;
|
||||
stream->rx_mpls1_label = mpls->label;
|
||||
stream->rx_mpls1_exp = mpls->exp;
|
||||
stream->rx_mpls1_ttl = mpls->ttl;
|
||||
mpls = mpls->next;
|
||||
if(mpls) {
|
||||
stream->rx_mpls2 = true;
|
||||
stream->rx_mpls2_label = mpls->label;
|
||||
stream->rx_mpls2_exp = mpls->exp;
|
||||
stream->rx_mpls2_ttl = mpls->ttl;
|
||||
}
|
||||
}
|
||||
|
||||
timespec_sub(&delay, ð->timestamp, &bbl->timestamp);
|
||||
delay_nsec = delay.tv_sec * 1000000000 + delay.tv_nsec;
|
||||
if(delay_nsec > stream->max_delay_ns) {
|
||||
stream->max_delay_ns = delay_nsec;
|
||||
}
|
||||
if(stream->min_delay_ns) {
|
||||
if(delay_nsec < stream->min_delay_ns) {
|
||||
stream->min_delay_ns = delay_nsec;
|
||||
}
|
||||
} else {
|
||||
stream->min_delay_ns = delay_nsec;
|
||||
}
|
||||
if(!stream->rx_first_seq) {
|
||||
stream->rx_first_seq = bbl->flow_seq;
|
||||
interface->ctx->stats.stream_traffic_flows_verified++;
|
||||
if(interface->ctx->config.traffic_stop_verified) {
|
||||
stream->stop = true;
|
||||
}
|
||||
} else {
|
||||
if(stream->rx_first_seq) {
|
||||
/* Stream already verified */
|
||||
if((stream->rx_last_seq +1) < bbl->flow_seq) {
|
||||
loss = bbl->flow_seq - (stream->rx_last_seq +1);
|
||||
stream->loss += loss;
|
||||
interface->stats.stream_loss += loss;
|
||||
LOG(LOSS, "LOSS flow: %lu seq: %lu last: %lu\n",
|
||||
bbl->flow_id, bbl->flow_seq, stream->rx_last_seq);
|
||||
}
|
||||
} else {
|
||||
/* Verify stream ... */
|
||||
stream->rx_len = eth->length;
|
||||
stream->rx_priority = tos;
|
||||
stream->rx_outer_vlan_pbit = eth->vlan_outer_priority;
|
||||
stream->rx_inner_vlan_pbit = eth->vlan_inner_priority;
|
||||
mpls = eth->mpls;
|
||||
if(mpls) {
|
||||
stream->rx_mpls1 = true;
|
||||
stream->rx_mpls1_label = mpls->label;
|
||||
stream->rx_mpls1_exp = mpls->exp;
|
||||
stream->rx_mpls1_ttl = mpls->ttl;
|
||||
mpls = mpls->next;
|
||||
if(mpls) {
|
||||
stream->rx_mpls2 = true;
|
||||
stream->rx_mpls2_label = mpls->label;
|
||||
stream->rx_mpls2_exp = mpls->exp;
|
||||
stream->rx_mpls2_ttl = mpls->ttl;
|
||||
}
|
||||
}
|
||||
if(stream->config->rx_mpls1_label) {
|
||||
/* Check if expected outer label is received ... */
|
||||
if(stream->rx_mpls1_label != stream->config->rx_mpls1_label) {
|
||||
/* Wrong outer label received! */
|
||||
return;
|
||||
}
|
||||
if(stream->config->rx_mpls2_label) {
|
||||
/* Check if expected inner label is received ... */
|
||||
if(stream->rx_mpls2_label != stream->config->rx_mpls2_label) {
|
||||
/* Wrong inner label received! */
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
stream->rx_first_seq = bbl->flow_seq;
|
||||
ctx->stats.stream_traffic_flows_verified++;
|
||||
if(ctx->config.traffic_stop_verified) {
|
||||
stream->stop = true;
|
||||
}
|
||||
}
|
||||
stream->packets_rx++;
|
||||
stream->rx_last_seq = bbl->flow_seq;
|
||||
bbl_stream_delay(stream, ð->timestamp, &bbl->timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,24 @@ extern volatile bool g_teardown;
|
||||
extern bool g_init_phase;
|
||||
extern bool g_traffic;
|
||||
|
||||
void
|
||||
bbl_stream_delay(bbl_stream *stream, struct timespec *rx_timestamp, struct timespec *bbl_timestamp) {
|
||||
struct timespec delay;
|
||||
uint64_t delay_nsec;
|
||||
timespec_sub(&delay, rx_timestamp, bbl_timestamp);
|
||||
delay_nsec = delay.tv_sec * 1000000000 + delay.tv_nsec;
|
||||
if(delay_nsec > stream->max_delay_ns) {
|
||||
stream->max_delay_ns = delay_nsec;
|
||||
}
|
||||
if(stream->min_delay_ns) {
|
||||
if(delay_nsec < stream->min_delay_ns) {
|
||||
stream->min_delay_ns = delay_nsec;
|
||||
}
|
||||
} else {
|
||||
stream->min_delay_ns = delay_nsec;
|
||||
}
|
||||
}
|
||||
|
||||
static bool
|
||||
bbl_stream_can_send(bbl_stream *stream) {
|
||||
bbl_session_s *session = stream->session;
|
||||
@@ -1450,12 +1468,11 @@ bbl_stream_json(bbl_stream *stream)
|
||||
json_object_set(root, "rx-mpls1-expected", json_integer(stream->config->rx_mpls1_label));
|
||||
}
|
||||
if(stream->rx_mpls1) {
|
||||
|
||||
json_object_set(root, "rx-mpls1", json_integer(stream->rx_mpls1_label));
|
||||
json_object_set(root, "rx-mpls1-exp", json_integer(stream->rx_mpls1_exp));
|
||||
json_object_set(root, "rx-mpls1-ttl", json_integer(stream->rx_mpls1_ttl));
|
||||
}
|
||||
if(stream->config->rx_mpls1) {
|
||||
if(stream->config->rx_mpls2) {
|
||||
json_object_set(root, "rx-mpls2-expected", json_integer(stream->config->rx_mpls2_label));
|
||||
}
|
||||
if(stream->rx_mpls2) {
|
||||
|
||||
@@ -182,6 +182,9 @@ typedef struct bbl_stream_thread_
|
||||
void *next; /* Next stream thread */
|
||||
} bbl_stream_thread;
|
||||
|
||||
void
|
||||
bbl_stream_delay(bbl_stream *stream, struct timespec *rx_timestamp, struct timespec *bbl_timestamp);
|
||||
|
||||
bool
|
||||
bbl_stream_add(bbl_ctx_s *ctx, bbl_access_config_s *access_config, bbl_session_s *session);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user