pping: Add global per-protocol counters for aggregated output

Add global per-protocol counters for the aggregated output. These
counters include all the packets the eBPF program processes (even if
it cannot parse an IP-address, and thereby add it to the per-subnet
packet counts). Output the global counts at the end of every
aggregated report.

Example with standard output (linebreakes not part of output):

15:47:28.544011000: non-IP(pkts=6, bytes=252), TCP(pkts=88316,
bytes=3094356024), ICMP(pkts=3983, bytes=390110), 47(pkts=80)

Example with JSON output:
{
  "timestamp": 1697635992487286800,
  "protocol_counters": {
    "non_IP": {
      "packets": 4,
      "bytes": 168
    },
    "TCP": {
      "packets": 344633,
      "bytes": 16609641822
    },
    "ICMP": {
      "packets": 3960,
      "bytes": 388016
    },
    "47": {
      "packets": 60
    }
  }
}

Some implementation details:
Internally keep packet and byte counters for non-IP, TCP, UDP, ICMP
and ICMPv6, i.e. the "common protocols". To catch any other non-common
IP-protocol, keep an array of packet counters for every possible
IP-protocol [0, 255]. In the output, provide names for the common
protocols (e.g. "TCP"), while only outputting the protocol number of
non-common protocols. To avoid excessive output, only output counters
that are non-zero. This way, output is minimized while still allowing
for detecting unexpected (or even illegal) protocol numbers.

Unlike the per-prefix stats, do not reset the global counters. Instead
keep a copy of the previous counts and calculate the difference in
user space to report the difference since the previous report. This
unsynchronized approach is simpler than synchronized approach
swapping between two instances of the map used by the per-prefix
stats, but may result in small inconsistencies (ex. the packet-count
and byte-count may mismatch in case the counters are fetched when an
eBPF program has updated the packet-counter but not the byte-counter).

Signed-off-by: Simon Sundberg <simon.sundberg@kau.se>
This commit is contained in:
Simon Sundberg
2023-10-17 20:36:03 +02:00
parent 9ebcc2a2f9
commit 7ebf7d6125
3 changed files with 325 additions and 30 deletions
+257 -28
View File
@@ -121,6 +121,12 @@ struct aggregation_maps {
int map_active_fd;
int map_v4_fd[2];
int map_v6_fd[2];
int map_globcnt_fd;
};
struct aggregation_context {
struct aggregation_maps maps;
struct global_counters prev_counters;
};
// Store configuration values in struct to easily pass around
@@ -130,7 +136,7 @@ struct pping_config {
struct bpf_tc_opts tc_egress_opts;
struct map_cleanup_args clean_args;
struct aggregation_config agg_conf;
struct aggregation_maps agg_maps;
struct aggregation_context agg_ctx;
struct output_context *out_ctx;
char *object_path;
char *ingress_prog;
@@ -1136,6 +1142,233 @@ static void handle_missed_events(void *ctx, int cpu, __u64 lost_cnt)
fprintf(stderr, "Lost %llu events on CPU %d\n", lost_cnt, cpu);
}
static void print_counter_standard(FILE *stream, const char *name, __u64 val,
bool *first)
{
if (val == 0)
return;
if (*first)
*first = false;
else
fprintf(stream, ", ");
fprintf(stream, "%s=%llu", name, val);
}
static void print_pktbytes_tuple_standard(FILE *stream, const char *tuple_name,
__u64 pkts, __u64 bytes,
bool *first_tuple)
{
bool first = true;
if (pkts == 0)
return;
if (*first_tuple)
*first_tuple = false;
else
fprintf(stream, ", ");
fprintf(stream, "%s=(", tuple_name);
print_counter_standard(stream, "pkts", pkts, &first);
print_counter_standard(stream, "bytes", bytes, &first);
fprintf(stream, ")");
}
static void
print_globalcounters_standard(FILE *stream, __u64 t_monotonic,
const struct global_counters *counters)
{
char protostr[16];
bool first = true;
int proto;
print_ns_datetime(stream, t_monotonic);
fprintf(stream, ": ");
print_pktbytes_tuple_standard(stream, "non-IP", counters->nonip_pkts,
counters->nonip_bytes, &first);
print_pktbytes_tuple_standard(stream, "TCP", counters->tcp_pkts,
counters->tcp_bytes, &first);
print_pktbytes_tuple_standard(stream, "UDP", counters->udp_pkts,
counters->udp_bytes, &first);
print_pktbytes_tuple_standard(stream, "ICMP", counters->icmp_pkts,
counters->icmp_bytes, &first);
print_pktbytes_tuple_standard(stream, "ICMPv6", counters->icmp6_pkts,
counters->icmp6_bytes, &first);
for (proto = 0; proto < N_IPPROTOS; proto++) {
if (counters->other_ipprotos[proto] > 0) { // Check to avoid uncecessary ipproto_to_str
ipproto_to_str(protostr, sizeof(protostr), proto);
print_pktbytes_tuple_standard(
stream, protostr,
counters->other_ipprotos[proto], 0, &first);
}
}
if (first) // Global counters are empty
fprintf(stream, "pkts=0, bytes=0");
fprintf(stream, "\n");
}
static void print_counter_json(json_writer_t *jctx, const char *name, __u64 val)
{
if (val > 0)
jsonw_u64_field(jctx, name, val);
}
static void print_pktbytes_tuple_json(json_writer_t *jctx,
const char *tuple_name, __u64 pkts,
__u64 bytes)
{
// Don't include empty pkt/byte tuples
if (pkts == 0)
return;
jsonw_name(jctx, tuple_name);
jsonw_start_object(jctx);
print_counter_json(jctx, "packets", pkts);
print_counter_json(jctx, "bytes", bytes);
jsonw_end_object(jctx);
}
static void print_globalcounters_json(json_writer_t *jctx, __u64 t_monotonic,
const struct global_counters *counters)
{
char protostr[16];
int proto;
jsonw_start_object(jctx);
jsonw_u64_field(jctx, "timestamp",
convert_monotonic_to_realtime(t_monotonic));
jsonw_name(jctx, "protocol_counters");
jsonw_start_object(jctx);
print_pktbytes_tuple_json(jctx, "non_IP", counters->nonip_pkts,
counters->nonip_bytes);
print_pktbytes_tuple_json(jctx, "TCP", counters->tcp_pkts,
counters->tcp_bytes);
print_pktbytes_tuple_json(jctx, "UDP", counters->udp_pkts,
counters->udp_bytes);
print_pktbytes_tuple_json(jctx, "ICMP", counters->icmp_pkts,
counters->icmp_bytes);
print_pktbytes_tuple_json(jctx, "ICMPv6", counters->icmp6_pkts,
counters->icmp6_bytes);
for (proto = 0; proto < N_IPPROTOS; proto++) {
if (counters->other_ipprotos[proto] > 0) {
ipproto_to_str(protostr, sizeof(protostr), proto);
print_pktbytes_tuple_json(
jctx, protostr, counters->other_ipprotos[proto],
0);
}
}
jsonw_end_object(jctx);
jsonw_end_object(jctx);
}
static void print_globalcounters(struct output_context *out_ctx,
__u64 t_monotonic,
const struct global_counters *counters)
{
if (out_ctx->format == PPING_OUTPUT_STANDARD)
print_globalcounters_standard(out_ctx->stream, t_monotonic,
counters);
else if (out_ctx->jctx)
print_globalcounters_json(out_ctx->jctx, t_monotonic, counters);
}
static void update_globalcounters(struct global_counters *to,
const struct global_counters *from)
{
int proto;
to->nonip_pkts += from->nonip_pkts;
to->nonip_bytes += from->nonip_bytes;
to->tcp_pkts += from->tcp_pkts;
to->tcp_bytes += from->tcp_bytes;
to->udp_pkts += from->udp_pkts;
to->udp_bytes += from->udp_bytes;
to->icmp_pkts += from->icmp_pkts;
to->icmp_bytes += from->icmp_bytes;
to->icmp6_pkts += from->icmp6_pkts;
to->icmp6_bytes += from->icmp6_bytes;
for (proto = 0; proto < N_IPPROTOS; proto++) {
to->other_ipprotos[proto] += from->other_ipprotos[proto];
}
}
static void merge_percpu_globalcounters(struct global_counters *merged,
const struct global_counters *percpu,
int n_cpus)
{
int cpu;
memset(merged, 0, sizeof(*merged));
for (cpu = 0; cpu < n_cpus; cpu++) {
update_globalcounters(merged, &percpu[cpu]);
}
}
static void diff_globalcounters(struct global_counters *diff,
const struct global_counters *prev,
const struct global_counters *next)
{
int proto;
diff->nonip_pkts = next->nonip_pkts - prev->nonip_pkts;
diff->nonip_bytes = next->nonip_bytes - prev->nonip_bytes;
diff->tcp_pkts = next->tcp_pkts - prev->tcp_pkts;
diff->tcp_bytes = next->tcp_bytes - prev->tcp_bytes;
diff->udp_pkts = next->udp_pkts - prev->udp_pkts;
diff->udp_bytes = next->udp_bytes - prev->udp_bytes;
diff->icmp_pkts = next->icmp_pkts - prev->icmp_pkts;
diff->icmp_bytes = next->icmp_bytes - prev->icmp_bytes;
diff->icmp6_pkts = next->icmp6_pkts - prev->icmp6_pkts;
diff->icmp6_bytes = next->icmp6_bytes - prev->icmp6_bytes;
for (proto = 0; proto < N_IPPROTOS; proto++) {
diff->other_ipprotos[proto] = next->other_ipprotos[proto] -
prev->other_ipprotos[proto];
}
}
static int report_globalcounters(struct output_context *out_ctx,
struct aggregation_context *agg_ctx)
{
int n_cpus = libbpf_num_possible_cpus();
__u64 t = get_time_ns(CLOCK_MONOTONIC);
struct global_counters tot_cnt, diff;
struct global_counters *cpu_cnt;
__u32 key = 0;
int err;
cpu_cnt = calloc(n_cpus, sizeof(*cpu_cnt));
if (!cpu_cnt)
return -errno;
err = bpf_map_lookup_elem(agg_ctx->maps.map_globcnt_fd, &key, cpu_cnt);
if (err)
goto exit;
merge_percpu_globalcounters(&tot_cnt, cpu_cnt, n_cpus);
diff_globalcounters(&diff, &agg_ctx->prev_counters, &tot_cnt);
agg_ctx->prev_counters = tot_cnt;
print_globalcounters(out_ctx, t, &diff);
exit:
free(cpu_cnt);
return err;
}
static __u64 sum_trafficcounts_pkts(const struct traffic_counters *counters)
{
return counters->tcp_ts_pkts + counters->tcp_nots_pkts +
@@ -1298,21 +1531,6 @@ exit:
fprintf(stream, "\n");
}
static void print_pktbytes_tuple_json(json_writer_t *jctx,
const char *tuple_name, __u64 pkts,
__u64 bytes)
{
// Don't include empty pkt/byte tuples
if (pkts == 0)
return;
jsonw_name(jctx, tuple_name);
jsonw_start_object(jctx);
jsonw_u64_field(jctx, "packets", pkts);
jsonw_u64_field(jctx, "bytes", bytes);
jsonw_end_object(jctx);
}
static void print_trafficcount_json(json_writer_t *jctx,
const struct traffic_counters *counters)
{
@@ -1535,25 +1753,31 @@ exit:
}
static int report_aggregated_stats(struct output_context *out_ctx,
struct aggregation_maps *maps,
struct aggregation_context *agg_ctx,
struct aggregation_config *agg_conf)
{
__u64 t = get_time_ns(CLOCK_MONOTONIC);
int err, map_idx;
map_idx = switch_agg_map(maps->map_active_fd);
map_idx = switch_agg_map(agg_ctx->maps.map_active_fd);
if (map_idx < 0)
return map_idx;
err = report_aggregated_stats_map(out_ctx, maps->map_v4_fd[map_idx],
err = report_aggregated_stats_map(out_ctx,
agg_ctx->maps.map_v4_fd[map_idx],
AF_INET, agg_conf->ipv4_prefix_len, t,
agg_conf);
if (err)
return err;
err = report_aggregated_stats_map(out_ctx, maps->map_v6_fd[map_idx],
err = report_aggregated_stats_map(out_ctx,
agg_ctx->maps.map_v6_fd[map_idx],
AF_INET6, agg_conf->ipv6_prefix_len,
t, agg_conf);
if (err)
return err;
err = report_globalcounters(out_ctx, agg_ctx);
return err;
}
@@ -1921,15 +2145,17 @@ int fetch_aggregation_map_fds(struct bpf_object *obj,
bpf_object__find_map_fd_by_name(obj, "map_v6_agg1");
maps->map_v6_fd[1] =
bpf_object__find_map_fd_by_name(obj, "map_v6_agg2");
maps->map_globcnt_fd =
bpf_object__find_map_fd_by_name(obj, "map_global_counters");
if (maps->map_active_fd < 0 || maps->map_v4_fd[0] < 0 ||
maps->map_v4_fd[1] < 0 || maps->map_v6_fd[0] < 0 ||
maps->map_v6_fd[1] < 0) {
maps->map_v6_fd[1] < 0 || maps->map_globcnt_fd < 0) {
fprintf(stderr,
"Unable to find aggregation maps (%d/%d/%d/%d/%d).\n",
"Unable to find aggregation maps (%d/%d/%d/%d/%d/%d).\n",
maps->map_active_fd, maps->map_v4_fd[0],
maps->map_v4_fd[1], maps->map_v6_fd[0],
maps->map_v6_fd[1]);
maps->map_v6_fd[1], maps->map_globcnt_fd);
return -ENOENT;
}
@@ -2006,14 +2232,17 @@ static int init_aggregation_timer(struct bpf_object *obj,
{
int err, fd;
err = fetch_aggregation_map_fds(obj, &config->agg_maps);
memset(&config->agg_ctx.prev_counters, 0,
sizeof(config->agg_ctx.prev_counters));
err = fetch_aggregation_map_fds(obj, &config->agg_ctx.maps);
if (err) {
fprintf(stderr, "Failed fetching aggregation maps: %s\n",
get_libbpf_strerror(err));
return err;
}
err = init_agg_backup_entries(&config->agg_maps,
err = init_agg_backup_entries(&config->agg_ctx.maps,
config->agg_conf.ipv4_prefix_len > 0,
config->agg_conf.ipv6_prefix_len > 0);
if (err) {
@@ -2037,7 +2266,7 @@ static int init_aggregation_timer(struct bpf_object *obj,
static int handle_aggregation_timer(int timer_fd,
struct output_context *out_ctx,
struct aggregation_maps *maps,
struct aggregation_context *agg_ctx,
struct aggregation_config *agg_conf)
{
__u64 timer_exps;
@@ -2055,7 +2284,7 @@ static int handle_aggregation_timer(int timer_fd,
timer_exps - 1);
}
err = report_aggregated_stats(out_ctx, maps, agg_conf);
err = report_aggregated_stats(out_ctx, agg_ctx, agg_conf);
if (err) {
fprintf(stderr, "Failed reporting aggregated RTTs: %s\n",
get_libbpf_strerror(err));
@@ -2162,7 +2391,7 @@ static int epoll_poll_events(int epfd, struct pping_config *config,
case PPING_EPEVENT_TYPE_AGGTIMER:
err = handle_aggregation_timer(
events[i].data.u64 & PPING_EPEVENT_MASK,
config->out_ctx, &config->agg_maps,
config->out_ctx, &config->agg_ctx,
&config->agg_conf);
break;
case PPING_EPEVENT_TYPE_SIGNAL:
+16
View File
@@ -30,6 +30,8 @@ typedef __u64 fixpoint64;
#define RTT_AGG_NR_BINS 250UL
#define RTT_AGG_BIN_WIDTH (4 * NS_PER_MS)
#define N_IPPROTOS 256
/* Special IPv4/IPv6 prefixes used for backup entries
* To avoid them colliding with and actual traffic (causing the traffic to end
* up in the backup entry), use prefixes from blocks reserved for documentation.
@@ -263,4 +265,18 @@ struct aggregated_stats {
__u32 rtt_bins[RTT_AGG_NR_BINS];
};
struct global_counters {
__u64 nonip_pkts;
__u64 nonip_bytes;
__u64 tcp_pkts;
__u64 tcp_bytes;
__u64 udp_pkts;
__u64 udp_bytes;
__u64 icmp_pkts;
__u64 icmp_bytes;
__u64 icmp6_pkts;
__u64 icmp6_bytes;
__u32 other_ipprotos[N_IPPROTOS];
};
#endif
+52 -2
View File
@@ -190,6 +190,13 @@ struct {
__uint(max_entries, 1);
} map_active_agg_instance SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct global_counters);
__uint(max_entries, 1);
} map_global_counters SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
@@ -314,6 +321,44 @@ get_dualflow_key_from_packet(struct packet_info *p_info)
&p_info->reply_pid.flow;
}
static void update_global_counters(__u8 ipproto, __u32 pkt_len)
{
if (!config.agg_rtts)
return;
struct global_counters *counters;
__u32 key = 0;
counters = bpf_map_lookup_elem(&map_global_counters, &key);
if (!counters) // Should never happen
return;
switch (ipproto) {
case 0: // Used to represent non-IP instead of IPv6 hop-by-hop
counters->nonip_pkts++;
counters->nonip_bytes += pkt_len;
break;
case IPPROTO_TCP:
counters->tcp_pkts++;
counters->tcp_bytes += pkt_len;
break;
case IPPROTO_UDP:
counters->udp_pkts++;
counters->udp_bytes += pkt_len;
break;
case IPPROTO_ICMP:
counters->icmp_pkts++;
counters->icmp_bytes += pkt_len;
break;
case IPPROTO_ICMPV6:
counters->icmp6_pkts++;
counters->icmp6_bytes += pkt_len;
break;
default:
counters->other_ipprotos[ipproto]++;
}
}
/*
* Parses the TSval and TSecr values from the TCP options field. If sucessful
* the TSval and TSecr values will be stored at tsval and tsecr (in network
@@ -556,10 +601,10 @@ static int parse_packet_identifier(struct parsing_context *pctx,
p_info->pid.flow.ipv = AF_INET6;
proto = parse_ip6hdr(&pctx->nh, pctx->data_end, &iph_ptr.ip6h);
} else {
return -1;
goto err_not_ip;
}
if (proto < 0)
return -1;
goto err_not_ip;
// IP-header was parsed sucessfully, fill in IP address
p_info->pid.flow.proto = proto;
@@ -577,6 +622,7 @@ static int parse_packet_identifier(struct parsing_context *pctx,
p_info->ip_tos.ipv6_tos =
*(__be32 *)iph_ptr.ip6h & IPV6_FLOWINFO_MASK;
}
update_global_counters(proto, p_info->pkt_len);
// Parse identifer from suitable protocol
err = -1;
@@ -621,6 +667,10 @@ static int parse_packet_identifier(struct parsing_context *pctx,
}
return 0;
err_not_ip:
update_global_counters(0, p_info->pkt_len);
return -1;
}
/*