pping: Expire old aggregation prefixes

Keep track of when the last update was made to each IP-prefix in the
aggregation map, and delete entries which are older than
--aggregate-timeout (30 seconds by default). If the user specifies
zero (0), that is interpreted as never expire an entry (which is
consistent with how the --cleanup-interval operates).

Note that as the BPF programs rotate between two maps (an active one
for BPF progs to use, and an inactive one the user space can operate
on), it may expire an aggregation prefix from one of the maps even if
it has seen recent action in the other map.

Signed-off-by: Simon Sundberg <simon.sundberg@kau.se>
This commit is contained in:
Simon Sundberg
2023-07-04 19:54:54 +02:00
parent ec92f5a91f
commit 0f6042bf0c
3 changed files with 64 additions and 12 deletions

View File

@@ -69,6 +69,7 @@ static const char *__doc__ =
#define PPING_ABORT 5555
#define ARG_AGG_REVERSE 256
#define AGG_ARG_TIMEOUT 257
enum pping_output_format {
PPING_OUTPUT_STANDARD,
@@ -104,6 +105,7 @@ struct map_cleanup_args {
struct aggregation_config {
__u64 aggregation_interval;
__u64 timeout_interval;
__u64 n_bins;
__u64 bin_width;
__u8 ipv4_prefix_len;
@@ -166,6 +168,7 @@ static const struct option long_options[] = {
{ "aggregate-subnets-v4", required_argument, NULL, '4' }, // Set the subnet size for IPv4 when aggregating (default 24)
{ "aggregate-subnets-v6", required_argument, NULL, '6' }, // Set the subnet size for IPv6 when aggregating (default 48)
{ "aggregate-reverse", no_argument, NULL, ARG_AGG_REVERSE }, // Aggregate RTTs by dst IP of reply packet (instead of src like default)
{ "aggregate-timeout", required_argument, NULL, AGG_ARG_TIMEOUT }, // Interval for timing out subnet entries in seconds (default 30s)
{ 0, 0, NULL, 0 }
};
@@ -415,6 +418,15 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config)
case ARG_AGG_REVERSE:
config->bpf_config.agg_by_dst = true;
break;
case AGG_ARG_TIMEOUT:
err = parse_bounded_long(&user_int, optarg, 0,
7 * S_PER_DAY,
"aggregate-timeout");
if (err)
return -EINVAL;
config->agg_conf.timeout_interval =
user_int * NS_PER_SECOND;
break;
case 'h':
printf("HELP:\n");
print_usage(argv);
@@ -1152,6 +1164,10 @@ merge_percpu_aggreated_rtts(struct aggregated_rtt_stats *percpu_stats,
memset(merged_stats, 0, sizeof(*merged_stats));
for (i = 0; i < n_cpus; i++) {
if (percpu_stats[i].last_updated > merged_stats->last_updated)
merged_stats->last_updated =
percpu_stats[i].last_updated;
if (aggregated_rtt_stats_empty(&percpu_stats[i]))
continue;
@@ -1166,6 +1182,13 @@ merge_percpu_aggreated_rtts(struct aggregated_rtt_stats *percpu_stats,
}
}
static void clear_aggregated_rtts(struct aggregated_rtt_stats *stats)
{
__u64 last_updated = stats->last_updated;
memset(stats, 0, sizeof(*stats));
stats->last_updated = last_updated;
}
// Stolen from BPF selftests
int kern_sync_rcu(void)
{
@@ -1204,20 +1227,33 @@ static int switch_agg_map(int map_active_fd)
static void report_aggregated_rtt_mapentry(
struct ipprefix_key *prefix, struct aggregated_rtt_stats *percpu_stats,
int n_cpus, int af, __u8 prefix_len, __u64 t_monotonic,
struct aggregation_config *agg_conf)
struct aggregation_config *agg_conf, bool *del_entry)
{
struct aggregated_rtt_stats merged_stats;
int i;
merge_percpu_aggreated_rtts(percpu_stats, &merged_stats, n_cpus,
agg_conf->n_bins);
if (prefix_len > 0 && // There's no point in deleting /0 entries (can only be one of them)
agg_conf->timeout_interval > 0 &&
merged_stats.last_updated < t_monotonic &&
t_monotonic - merged_stats.last_updated >
agg_conf->timeout_interval)
*del_entry = true;
else
*del_entry = false;
// Only print and clear prefixes which have RTT samples
if (!aggregated_rtt_stats_empty(&merged_stats)) {
print_aggregated_rtts(t_monotonic, prefix, af, prefix_len,
&merged_stats, agg_conf);
// Clear out the reported stats
memset(percpu_stats, 0, sizeof(*percpu_stats) * n_cpus);
if (!*del_entry)
for (i = 0; i < n_cpus; i++) {
clear_aggregated_rtts(&percpu_stats[i]);
}
}
}
@@ -1226,19 +1262,21 @@ static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len,
struct aggregation_config *agg_conf)
{
struct aggregated_rtt_stats *values = NULL;
void *keys = NULL;
void *keys = NULL, *del_keys = NULL;
int n_cpus = libbpf_num_possible_cpus();
size_t keysize = af == AF_INET ? sizeof(__u32) : sizeof(__u64);
__u64 batch, total = 0;
__u32 count = AGG_BATCH_SIZE;
__u32 count = AGG_BATCH_SIZE, del_idx = 0;
bool remaining_entries = true;
int err = 0, i;
bool del_key;
DECLARE_LIBBPF_OPTS(bpf_map_batch_opts, batch_opts, .flags = BPF_EXIST);
values = calloc(n_cpus, sizeof(*values) * AGG_BATCH_SIZE);
keys = calloc(AGG_BATCH_SIZE, keysize);
if (!values || !keys) {
del_keys = calloc(MAP_AGGREGATION_SIZE, keysize);
if (!values || !keys || !del_keys) {
err = -ENOMEM;
goto exit;
}
@@ -1257,7 +1295,11 @@ static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len,
report_aggregated_rtt_mapentry(keys + i * keysize,
values + i * n_cpus,
n_cpus, af, prefix_len,
t_monotonic, agg_conf);
t_monotonic, agg_conf,
&del_key);
if (del_key)
memcpy(del_keys + del_idx++ * keysize,
keys + i * keysize, keysize);
}
// Update cleared stats
@@ -1270,9 +1312,15 @@ static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len,
count = AGG_BATCH_SIZE; // Ensure we always try to fetch full batch
}
// Delete keys outside of fetch/update loop to avoid messing with key order
if (del_idx > 0)
err = bpf_map_delete_batch(map_fd, del_keys, &del_idx,
&batch_opts);
exit:
free(values);
free(keys);
free(del_keys);
return err;
}
@@ -1801,6 +1849,7 @@ int main(int argc, char *argv[])
.clean_args = { .cleanup_interval = 1 * NS_PER_SECOND,
.valid_thread = false },
.agg_conf = { .aggregation_interval = 1 * NS_PER_SECOND,
.timeout_interval = 30 * NS_PER_SECOND,
.ipv4_prefix_len = 24,
.ipv6_prefix_len = 48,
.n_bins = RTT_AGG_NR_BINS,

View File

@@ -11,6 +11,10 @@
#define MS_PER_S 1000UL
#define S_PER_DAY (24 * 3600UL)
#define MAP_TIMESTAMP_SIZE 131072UL // 2^17, Maximum number of in-flight/unmatched timestamps we can keep track of
#define MAP_FLOWSTATE_SIZE 131072UL // 2^17, Maximum number of concurrent flows that can be tracked
#define MAP_AGGREGATION_SIZE 16384UL // 2^14, Maximum number of different IP-prefixes we can aggregate stats for
typedef __u64 fixpoint64;
#define FIXPOINT_SHIFT 16
#define DOUBLE_TO_FIXPOINT(X) ((fixpoint64)((X) * (1UL << FIXPOINT_SHIFT)))
@@ -225,6 +229,7 @@ union pping_event {
};
struct aggregated_rtt_stats {
__u64 last_updated;
__u64 min;
__u64 max;
__u32 bins[RTT_AGG_NR_BINS];

View File

@@ -44,10 +44,6 @@
#define ICMP_FLOW_LIFETIME (30 * NS_PER_SECOND) // Clear any ICMP flows if they're inactive this long
#define UNOPENED_FLOW_LIFETIME (30 * NS_PER_SECOND) // Clear out flows that have not seen a response after this long
#define MAP_TIMESTAMP_SIZE 131072UL // 2^17, Maximum number of in-flight/unmatched timestamps we can keep track of
#define MAP_FLOWSTATE_SIZE 131072UL // 2^17, Maximum number of concurrent flows that can be tracked
#define MAP_AGGREGATION_SIZE 16384UL // 2^14, Maximum number of different IP-prefixes we can aggregate stats for
#define MAX_MEMCMP_SIZE 128
/*
@@ -1028,7 +1024,7 @@ lookup_or_create_aggregation_stats(struct in6_addr *ip, __u8 ipv)
return bpf_map_lookup_elem(agg_map, &key);
}
static void aggregate_rtt(__u64 rtt, struct in6_addr *ip, __u8 ipv)
static void aggregate_rtt(__u64 rtt, __u64 t, struct in6_addr *ip, __u8 ipv)
{
if (!config.agg_rtts)
return;
@@ -1040,6 +1036,8 @@ static void aggregate_rtt(__u64 rtt, struct in6_addr *ip, __u8 ipv)
if (!rtt_agg)
return;
rtt_agg->last_updated = t;
if (!rtt_agg->min || rtt < rtt_agg->min)
rtt_agg->min = rtt;
if (rtt > rtt_agg->max)
@@ -1123,7 +1121,7 @@ static void pping_match_packet(struct flow_state *f_state, void *ctx,
f_state->srtt = calculate_srtt(f_state->srtt, rtt);
send_rtt_event(ctx, rtt, f_state, p_info);
aggregate_rtt(rtt,
aggregate_rtt(rtt, p_info->time,
config.agg_by_dst ? &p_info->pid.flow.daddr.ip :
&p_info->pid.flow.saddr.ip,
p_info->pid.flow.ipv);