pping: Reset aggregated RTTs after each report

Instead of keeping all RTTs since ePPing started, reset the aggregated
stats after each time they're reported so the report only shows the
RTTs since the last report.

To avoid concurrency issues due to user space reading and resetting
the map while the BPF programs are updating it, use two aggregation
maps, one active and one inactive. Each time user space wishes to
report the aggregated RTTs it first switches which map is actively
used by the BPF progs, and then reads and resets the now inactive map.

As the RTT stats are now periodically reset, change the
histogram (aggregated_rtt_stats.bins) to use __u32 instead of __u64
counters as risk of overflowing is low (even if 1 million RTTs/s is
added to the same bin, it would take over an hour to overflow, and
report frequency is likely higher than that).

Signed-off-by: Simon Sundberg <simon.sundberg@kau.se>
This commit is contained in:
Simon Sundberg
2023-07-04 19:53:26 +02:00
parent 3a7b15ab3e
commit 5ef4ffdd1b
3 changed files with 117 additions and 20 deletions

View File

@ -23,6 +23,8 @@ static const char *__doc__ =
#include <sys/signalfd.h>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <linux/unistd.h>
#include <linux/membarrier.h>
#include "json_writer.h"
#include "pping.h" //common structs for user-space and BPF parts
@ -105,8 +107,9 @@ struct aggregation_config {
};
struct aggregation_maps {
int map_v4_fd;
int map_v6_fd;
int map_active_fd;
int map_v4_fd[2];
int map_v6_fd[2];
};
// Store configuration values in struct to easily pass around
@ -1068,9 +1071,9 @@ static void print_histogram(FILE *stream,
{
int i;
fprintf(stream, "[%llu", rtt_stats->bins[0]);
fprintf(stream, "[%u", rtt_stats->bins[0]);
for (i = 1; i < n_bins; i++)
fprintf(stream, ",%llu", rtt_stats->bins[i]);
fprintf(stream, ",%u", rtt_stats->bins[i]);
fprintf(stream, "]");
}
@ -1121,6 +1124,41 @@ merge_percpu_aggreated_rtts(struct aggregated_rtt_stats *percpu_stats,
}
}
// Stolen from BPF selftests
int kern_sync_rcu(void)
{
return syscall(__NR_membarrier, MEMBARRIER_CMD_SHARED, 0, 0);
}
/* Changes which map the BPF progs use to aggregate the RTTs in.
* On success returns the map idx that the BPF progs used BEFORE the switch
* (and thus the map filled with data up until the switch, but no longer
* beeing activly used by the BPF progs).
* On failure returns a negative error code */
static int switch_agg_map(int map_active_fd)
{
__u32 prev_map, next_map, key = 0;
int err;
// Get current map being used by BPF progs
err = bpf_map_lookup_elem(map_active_fd, &key, &prev_map);
if (err)
return err;
// Swap map being used by BPF progs to agg RTTs in
next_map = prev_map == 1 ? 0 : 1;
err = bpf_map_update_elem(map_active_fd, &key, &next_map, BPF_EXIST);
if (err)
return err;
// Wait for current BPF programs to finish
// This should garantuee that after this call no BPF progs will attempt
// to update the now inactive maps
kern_sync_rcu();
return prev_map;
}
static void report_aggregated_rtt_mapentry(
struct ipprefix_key *prefix, struct aggregated_rtt_stats *percpu_stats,
int n_cpus, int af, __u8 prefix_len, __u64 t_monotonic,
@ -1131,10 +1169,14 @@ static void report_aggregated_rtt_mapentry(
merge_percpu_aggreated_rtts(percpu_stats, &merged_stats, n_cpus,
agg_conf->n_bins);
// Only print prefixes which have RTT samples
if (!aggregated_rtt_stats_empty(&merged_stats))
// Only print and clear prefixes which have RTT samples
if (!aggregated_rtt_stats_empty(&merged_stats)) {
print_aggregated_rtts(stdout, t_monotonic, prefix, af,
prefix_len, &merged_stats, agg_conf);
// Clear out the reported stats
memset(percpu_stats, 0, sizeof(*percpu_stats) * n_cpus);
}
}
static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len,
@ -1150,6 +1192,8 @@ static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len,
bool remaining_entries = true;
int err = 0, i;
DECLARE_LIBBPF_OPTS(bpf_map_batch_opts, batch_opts, .flags = BPF_EXIST);
values = calloc(n_cpus, sizeof(*values) * AGG_BATCH_SIZE);
keys = calloc(AGG_BATCH_SIZE, keysize);
if (!values || !keys) {
@ -1174,6 +1218,12 @@ static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len,
t_monotonic, agg_conf);
}
// Update cleared stats
err = bpf_map_update_batch(map_fd, keys, values, &count,
&batch_opts);
if (err)
goto exit;
total += count;
count = AGG_BATCH_SIZE; // Ensure we always try to fetch full batch
}
@ -1188,14 +1238,18 @@ static int report_aggregated_rtts(struct aggregation_maps *maps,
struct aggregation_config *agg_conf)
{
__u64 t = get_time_ns(CLOCK_MONOTONIC);
int err;
int err, map_idx;
err = report_aggregated_rtt_map(maps->map_v4_fd, AF_INET,
map_idx = switch_agg_map(maps->map_active_fd);
if (map_idx < 0)
return map_idx;
err = report_aggregated_rtt_map(maps->map_v4_fd[map_idx], AF_INET,
agg_conf->ipv4_prefix_len, t, agg_conf);
if (err)
return err;
err = report_aggregated_rtt_map(maps->map_v6_fd, AF_INET6,
err = report_aggregated_rtt_map(maps->map_v6_fd[map_idx], AF_INET6,
agg_conf->ipv6_prefix_len, t, agg_conf);
return err;
}
@ -1466,12 +1520,25 @@ static int handle_pipefd(int pipe_rfd)
int fetch_aggregation_map_fds(struct bpf_object *obj,
struct aggregation_maps *maps)
{
maps->map_v4_fd = bpf_object__find_map_fd_by_name(obj, "map_v4_agg");
maps->map_v6_fd = bpf_object__find_map_fd_by_name(obj, "map_v6_agg");
maps->map_active_fd =
bpf_object__find_map_fd_by_name(obj, "map_active_agg_instance");
maps->map_v4_fd[0] =
bpf_object__find_map_fd_by_name(obj, "map_v4_agg1");
maps->map_v4_fd[1] =
bpf_object__find_map_fd_by_name(obj, "map_v4_agg2");
maps->map_v6_fd[0] =
bpf_object__find_map_fd_by_name(obj, "map_v6_agg1");
maps->map_v6_fd[1] =
bpf_object__find_map_fd_by_name(obj, "map_v6_agg2");
if (maps->map_v4_fd < 0 || maps->map_v6_fd < 0) {
fprintf(stderr, "Unable to find aggregation maps (%d/%d).\n",
maps->map_v4_fd, maps->map_v6_fd);
if (maps->map_active_fd < 0 || maps->map_v4_fd[0] < 0 ||
maps->map_v4_fd[1] < 0 || maps->map_v6_fd[0] < 0 ||
maps->map_v6_fd[1] < 0) {
fprintf(stderr,
"Unable to find aggregation maps (%d/%d/%d/%d/%d).\n",
maps->map_active_fd, maps->map_v4_fd[0],
maps->map_v4_fd[1], maps->map_v6_fd[0],
maps->map_v6_fd[1]);
return -ENOENT;
}

View File

@ -227,7 +227,7 @@ union pping_event {
struct aggregated_rtt_stats {
__u64 min;
__u64 max;
__u64 bins[RTT_AGG_NR_BINS];
__u32 bins[RTT_AGG_NR_BINS];
};
#endif

View File

@ -163,7 +163,15 @@ struct {
__type(value, struct aggregated_rtt_stats);
__uint(max_entries, MAP_AGGREGATION_SIZE);
__uint(map_flags, BPF_F_NO_PREALLOC);
} map_v4_agg SEC(".maps");
} map_v4_agg1 SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__type(key, __u32);
__type(value, struct aggregated_rtt_stats);
__uint(max_entries, MAP_AGGREGATION_SIZE);
__uint(map_flags, BPF_F_NO_PREALLOC);
} map_v4_agg2 SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
@ -171,7 +179,22 @@ struct {
__type(value, struct aggregated_rtt_stats);
__uint(max_entries, MAP_AGGREGATION_SIZE);
__uint(map_flags, BPF_F_NO_PREALLOC);
} map_v6_agg SEC(".maps");
} map_v6_agg1 SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__type(key, __u64);
__type(value, struct aggregated_rtt_stats);
__uint(max_entries, MAP_AGGREGATION_SIZE);
__uint(map_flags, BPF_F_NO_PREALLOC);
} map_v6_agg2 SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_ARRAY);
__type(key, __u32);
__type(value, __u32);
__uint(max_entries, 1);
} map_active_agg_instance SEC(".maps");
// Help functions
@ -974,16 +997,23 @@ lookup_or_create_aggregation_stats(struct in6_addr *ip, __u8 ipv)
{
struct aggregated_rtt_stats *agg;
struct ipprefix_key key;
__u32 *map_choice;
__u32 zero = 0;
void *agg_map;
int err;
map_choice = bpf_map_lookup_elem(&map_active_agg_instance, &zero);
if (!map_choice)
return NULL;
if (ipv == AF_INET) {
create_ipprefix_key_v4(&key.v4, ip);
agg_map = &map_v4_agg;
agg_map = *map_choice == 0 ? (void *)&map_v4_agg1 :
(void *)&map_v4_agg2;
} else {
create_ipprefix_key_v6(&key.v6, ip);
agg_map = &map_v6_agg;
agg_map = *map_choice == 0 ? (void *)&map_v6_agg1 :
(void *)&map_v6_agg2;
}
agg = bpf_map_lookup_elem(agg_map, &key);