diff --git a/pping/pping.c b/pping/pping.c index c9cf301..612b1e0 100644 --- a/pping/pping.c +++ b/pping/pping.c @@ -69,6 +69,7 @@ static const char *__doc__ = #define PPING_ABORT 5555 #define ARG_AGG_REVERSE 256 +#define AGG_ARG_TIMEOUT 257 enum pping_output_format { PPING_OUTPUT_STANDARD, @@ -104,6 +105,7 @@ struct map_cleanup_args { struct aggregation_config { __u64 aggregation_interval; + __u64 timeout_interval; __u64 n_bins; __u64 bin_width; __u8 ipv4_prefix_len; @@ -166,6 +168,7 @@ static const struct option long_options[] = { { "aggregate-subnets-v4", required_argument, NULL, '4' }, // Set the subnet size for IPv4 when aggregating (default 24) { "aggregate-subnets-v6", required_argument, NULL, '6' }, // Set the subnet size for IPv6 when aggregating (default 48) { "aggregate-reverse", no_argument, NULL, ARG_AGG_REVERSE }, // Aggregate RTTs by dst IP of reply packet (instead of src like default) + { "aggregate-timeout", required_argument, NULL, AGG_ARG_TIMEOUT }, // Interval for timing out subnet entries in seconds (default 30s) { 0, 0, NULL, 0 } }; @@ -415,6 +418,15 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config) case ARG_AGG_REVERSE: config->bpf_config.agg_by_dst = true; break; + case AGG_ARG_TIMEOUT: + err = parse_bounded_long(&user_int, optarg, 0, + 7 * S_PER_DAY, + "aggregate-timeout"); + if (err) + return -EINVAL; + config->agg_conf.timeout_interval = + user_int * NS_PER_SECOND; + break; case 'h': printf("HELP:\n"); print_usage(argv); @@ -1152,6 +1164,10 @@ merge_percpu_aggreated_rtts(struct aggregated_rtt_stats *percpu_stats, memset(merged_stats, 0, sizeof(*merged_stats)); for (i = 0; i < n_cpus; i++) { + if (percpu_stats[i].last_updated > merged_stats->last_updated) + merged_stats->last_updated = + percpu_stats[i].last_updated; + if (aggregated_rtt_stats_empty(&percpu_stats[i])) continue; @@ -1166,6 +1182,13 @@ merge_percpu_aggreated_rtts(struct aggregated_rtt_stats *percpu_stats, } } +static void clear_aggregated_rtts(struct aggregated_rtt_stats *stats) +{ + __u64 last_updated = stats->last_updated; + memset(stats, 0, sizeof(*stats)); + stats->last_updated = last_updated; +} + // Stolen from BPF selftests int kern_sync_rcu(void) { @@ -1204,20 +1227,33 @@ static int switch_agg_map(int map_active_fd) static void report_aggregated_rtt_mapentry( struct ipprefix_key *prefix, struct aggregated_rtt_stats *percpu_stats, int n_cpus, int af, __u8 prefix_len, __u64 t_monotonic, - struct aggregation_config *agg_conf) + struct aggregation_config *agg_conf, bool *del_entry) { struct aggregated_rtt_stats merged_stats; + int i; merge_percpu_aggreated_rtts(percpu_stats, &merged_stats, n_cpus, agg_conf->n_bins); + if (prefix_len > 0 && // There's no point in deleting /0 entries (can only be one of them) + agg_conf->timeout_interval > 0 && + merged_stats.last_updated < t_monotonic && + t_monotonic - merged_stats.last_updated > + agg_conf->timeout_interval) + *del_entry = true; + else + *del_entry = false; + // Only print and clear prefixes which have RTT samples if (!aggregated_rtt_stats_empty(&merged_stats)) { print_aggregated_rtts(t_monotonic, prefix, af, prefix_len, &merged_stats, agg_conf); // Clear out the reported stats - memset(percpu_stats, 0, sizeof(*percpu_stats) * n_cpus); + if (!*del_entry) + for (i = 0; i < n_cpus; i++) { + clear_aggregated_rtts(&percpu_stats[i]); + } } } @@ -1226,19 +1262,21 @@ static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len, struct aggregation_config *agg_conf) { struct aggregated_rtt_stats *values = NULL; - void *keys = NULL; + void *keys = NULL, *del_keys = NULL; int n_cpus = libbpf_num_possible_cpus(); size_t keysize = af == AF_INET ? sizeof(__u32) : sizeof(__u64); __u64 batch, total = 0; - __u32 count = AGG_BATCH_SIZE; + __u32 count = AGG_BATCH_SIZE, del_idx = 0; bool remaining_entries = true; int err = 0, i; + bool del_key; DECLARE_LIBBPF_OPTS(bpf_map_batch_opts, batch_opts, .flags = BPF_EXIST); values = calloc(n_cpus, sizeof(*values) * AGG_BATCH_SIZE); keys = calloc(AGG_BATCH_SIZE, keysize); - if (!values || !keys) { + del_keys = calloc(MAP_AGGREGATION_SIZE, keysize); + if (!values || !keys || !del_keys) { err = -ENOMEM; goto exit; } @@ -1257,7 +1295,11 @@ static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len, report_aggregated_rtt_mapentry(keys + i * keysize, values + i * n_cpus, n_cpus, af, prefix_len, - t_monotonic, agg_conf); + t_monotonic, agg_conf, + &del_key); + if (del_key) + memcpy(del_keys + del_idx++ * keysize, + keys + i * keysize, keysize); } // Update cleared stats @@ -1270,9 +1312,15 @@ static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len, count = AGG_BATCH_SIZE; // Ensure we always try to fetch full batch } + // Delete keys outside of fetch/update loop to avoid messing with key order + if (del_idx > 0) + err = bpf_map_delete_batch(map_fd, del_keys, &del_idx, + &batch_opts); + exit: free(values); free(keys); + free(del_keys); return err; } @@ -1801,6 +1849,7 @@ int main(int argc, char *argv[]) .clean_args = { .cleanup_interval = 1 * NS_PER_SECOND, .valid_thread = false }, .agg_conf = { .aggregation_interval = 1 * NS_PER_SECOND, + .timeout_interval = 30 * NS_PER_SECOND, .ipv4_prefix_len = 24, .ipv6_prefix_len = 48, .n_bins = RTT_AGG_NR_BINS, diff --git a/pping/pping.h b/pping/pping.h index 28909a9..3d2d26f 100644 --- a/pping/pping.h +++ b/pping/pping.h @@ -11,6 +11,10 @@ #define MS_PER_S 1000UL #define S_PER_DAY (24 * 3600UL) +#define MAP_TIMESTAMP_SIZE 131072UL // 2^17, Maximum number of in-flight/unmatched timestamps we can keep track of +#define MAP_FLOWSTATE_SIZE 131072UL // 2^17, Maximum number of concurrent flows that can be tracked +#define MAP_AGGREGATION_SIZE 16384UL // 2^14, Maximum number of different IP-prefixes we can aggregate stats for + typedef __u64 fixpoint64; #define FIXPOINT_SHIFT 16 #define DOUBLE_TO_FIXPOINT(X) ((fixpoint64)((X) * (1UL << FIXPOINT_SHIFT))) @@ -225,6 +229,7 @@ union pping_event { }; struct aggregated_rtt_stats { + __u64 last_updated; __u64 min; __u64 max; __u32 bins[RTT_AGG_NR_BINS]; diff --git a/pping/pping_kern.c b/pping/pping_kern.c index 1eff7c9..b72868f 100644 --- a/pping/pping_kern.c +++ b/pping/pping_kern.c @@ -44,10 +44,6 @@ #define ICMP_FLOW_LIFETIME (30 * NS_PER_SECOND) // Clear any ICMP flows if they're inactive this long #define UNOPENED_FLOW_LIFETIME (30 * NS_PER_SECOND) // Clear out flows that have not seen a response after this long -#define MAP_TIMESTAMP_SIZE 131072UL // 2^17, Maximum number of in-flight/unmatched timestamps we can keep track of -#define MAP_FLOWSTATE_SIZE 131072UL // 2^17, Maximum number of concurrent flows that can be tracked -#define MAP_AGGREGATION_SIZE 16384UL // 2^14, Maximum number of different IP-prefixes we can aggregate stats for - #define MAX_MEMCMP_SIZE 128 /* @@ -1028,7 +1024,7 @@ lookup_or_create_aggregation_stats(struct in6_addr *ip, __u8 ipv) return bpf_map_lookup_elem(agg_map, &key); } -static void aggregate_rtt(__u64 rtt, struct in6_addr *ip, __u8 ipv) +static void aggregate_rtt(__u64 rtt, __u64 t, struct in6_addr *ip, __u8 ipv) { if (!config.agg_rtts) return; @@ -1040,6 +1036,8 @@ static void aggregate_rtt(__u64 rtt, struct in6_addr *ip, __u8 ipv) if (!rtt_agg) return; + rtt_agg->last_updated = t; + if (!rtt_agg->min || rtt < rtt_agg->min) rtt_agg->min = rtt; if (rtt > rtt_agg->max) @@ -1123,7 +1121,7 @@ static void pping_match_packet(struct flow_state *f_state, void *ctx, f_state->srtt = calculate_srtt(f_state->srtt, rtt); send_rtt_event(ctx, rtt, f_state, p_info); - aggregate_rtt(rtt, + aggregate_rtt(rtt, p_info->time, config.agg_by_dst ? &p_info->pid.flow.daddr.ip : &p_info->pid.flow.saddr.ip, p_info->pid.flow.ipv);