diff --git a/pping/pping.c b/pping/pping.c index 73fcc26..f917346 100644 --- a/pping/pping.c +++ b/pping/pping.c @@ -21,11 +21,16 @@ static const char *__doc__ = #include #include #include +#include #include #include "json_writer.h" #include "pping.h" //common structs for user-space and BPF parts +// Maximum string length for IP prefix (including /xx[x] and '\0') +#define INET_PREFIXSTRLEN (INET_ADDRSTRLEN + 3) +#define INET6_PREFIXSTRLEN (INET6_ADDRSTRLEN + 4) + #define PERF_BUFFER_PAGES 64 // Related to the perf-buffer size? #define MAX_PATH_LEN 1024 @@ -46,9 +51,12 @@ static const char *__doc__ = #define PPING_EPEVENT_TYPE_PERFBUF (1ULL << 63) #define PPING_EPEVENT_TYPE_SIGNAL (1ULL << 62) #define PPING_EPEVENT_TYPE_PIPE (1ULL << 61) +#define PPING_EPEVENT_TYPE_AGGTIMER (1ULL << 60) #define PPING_EPEVENT_MASK \ (~(PPING_EPEVENT_TYPE_PERFBUF | PPING_EPEVENT_TYPE_SIGNAL | \ - PPING_EPEVENT_TYPE_PIPE)) + PPING_EPEVENT_TYPE_PIPE | PPING_EPEVENT_TYPE_AGGTIMER)) + +#define AGG_BATCH_SIZE 64 // Batch size for fetching aggregation maps (bpf_map_lookup_batch) /* Value that can be returned by functions to indicate the program should abort * Should ideally not collide with any error codes (including libbpf ones), but @@ -88,12 +96,27 @@ struct map_cleanup_args { bool valid_thread; }; +struct aggregation_config { + __u64 aggregation_interval; + __u64 n_bins; + __u64 bin_width; + __u8 ipv4_prefix_len; + __u8 ipv6_prefix_len; +}; + +struct aggregation_maps { + int map_v4_fd; + int map_v6_fd; +}; + // Store configuration values in struct to easily pass around struct pping_config { struct bpf_config bpf_config; struct bpf_tc_opts tc_ingress_opts; struct bpf_tc_opts tc_egress_opts; struct map_cleanup_args clean_args; + struct aggregation_config agg_conf; + struct aggregation_maps agg_maps; char *object_path; char *ingress_prog; char *egress_prog; @@ -117,20 +140,23 @@ static json_writer_t *json_ctx = NULL; static void (*print_event_func)(const union pping_event *) = NULL; static const struct option long_options[] = { - { "help", no_argument, NULL, 'h' }, - { "interface", required_argument, NULL, 'i' }, // Name of interface to run on - { "rate-limit", required_argument, NULL, 'r' }, // Sampling rate-limit in ms - { "rtt-rate", required_argument, NULL, 'R' }, // Sampling rate in terms of flow-RTT (ex 1 sample per RTT-interval) - { "rtt-type", required_argument, NULL, 't' }, // What type of RTT the RTT-rate should be applied to ("min" or "smoothed"), only relevant if rtt-rate is provided - { "force", no_argument, NULL, 'f' }, // Overwrite any existing XDP program on interface, remove qdisc on cleanup - { "cleanup-interval", required_argument, NULL, 'c' }, // Map cleaning interval in s, 0 to disable - { "format", required_argument, NULL, 'F' }, // Which format to output in (standard/json/ppviz) - { "ingress-hook", required_argument, NULL, 'I' }, // Use tc or XDP as ingress hook - { "xdp-mode", required_argument, NULL, 'x' }, // Which xdp-mode to use (unspecified, native or generic) - { "tcp", no_argument, NULL, 'T' }, // Calculate and report RTTs for TCP traffic (with TCP timestamps) - { "icmp", no_argument, NULL, 'C' }, // Calculate and report RTTs for ICMP echo-reply traffic - { "include-local", no_argument, NULL, 'l' }, // Also report "internal" RTTs - { "include-SYN", no_argument, NULL, 's' }, // Include SYN-packets in tracking (may fill up flow state with half-open connections) + { "help", no_argument, NULL, 'h' }, + { "interface", required_argument, NULL, 'i' }, // Name of interface to run on + { "rate-limit", required_argument, NULL, 'r' }, // Sampling rate-limit in ms + { "rtt-rate", required_argument, NULL, 'R' }, // Sampling rate in terms of flow-RTT (ex 1 sample per RTT-interval) + { "rtt-type", required_argument, NULL, 't' }, // What type of RTT the RTT-rate should be applied to ("min" or "smoothed"), only relevant if rtt-rate is provided + { "force", no_argument, NULL, 'f' }, // Overwrite any existing XDP program on interface, remove qdisc on cleanup + { "cleanup-interval", required_argument, NULL, 'c' }, // Map cleaning interval in s, 0 to disable + { "format", required_argument, NULL, 'F' }, // Which format to output in (standard/json/ppviz) + { "ingress-hook", required_argument, NULL, 'I' }, // Use tc or XDP as ingress hook + { "xdp-mode", required_argument, NULL, 'x' }, // Which xdp-mode to use (unspecified, native or generic) + { "tcp", no_argument, NULL, 'T' }, // Calculate and report RTTs for TCP traffic (with TCP timestamps) + { "icmp", no_argument, NULL, 'C' }, // Calculate and report RTTs for ICMP echo-reply traffic + { "include-local", no_argument, NULL, 'l' }, // Also report "internal" RTTs + { "include-SYN", no_argument, NULL, 's' }, // Include SYN-packets in tracking (may fill up flow state with half-open connections) + { "aggregate", required_argument, NULL, 'a' }, // Aggregate RTTs every X seconds instead of reporting them individually + { "aggregate-subnets-v4", required_argument, NULL, '4' }, // Set the subnet size for IPv4 when aggregating (default 24) + { "aggregate-subnets-v6", required_argument, NULL, '6' }, // Set the subnet size for IPv6 when aggregating (default 48) { 0, 0, NULL, 0 } }; @@ -197,19 +223,49 @@ static int parse_bounded_double(double *res, const char *str, double low, return 0; } +static int parse_bounded_long(long long *res, const char *str, long long low, + long long high, const char *name) +{ + char *endptr; + errno = 0; + + *res = strtoll(str, &endptr, 10); + if (endptr == str || strlen(str) != endptr - str) { + fprintf(stderr, "%s %s is not a valid integer\n", name, str); + return -EINVAL; + } + + if (errno == ERANGE) { + fprintf(stderr, "%s %s overflowed\n", name, str); + return -ERANGE; + } + + if (*res < low || *res > high) { + fprintf(stderr, "%s must be in range [%lld, %lld]\n", name, low, + high); + return -ERANGE; + } + + return 0; +} + static int parse_arguments(int argc, char *argv[], struct pping_config *config) { int err, opt; double user_float; + long long user_int; config->ifindex = 0; - config->bpf_config.localfilt = true; config->force = false; + + config->bpf_config.localfilt = true; config->bpf_config.track_tcp = false; config->bpf_config.track_icmp = false; config->bpf_config.skip_syn = true; + config->bpf_config.push_individual_events = true; + config->bpf_config.agg_rtts = false; - while ((opt = getopt_long(argc, argv, "hflTCsi:r:R:t:c:F:I:x:", + while ((opt = getopt_long(argc, argv, "hflTCsi:r:R:t:c:F:I:x:a:4:6:", long_options, NULL)) != -1) { switch (opt) { case 'i': @@ -319,6 +375,33 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config) return -EINVAL; } break; + case 'a': + /* Aggregated output currently disables individual RTT */ + config->bpf_config.push_individual_events = false; + config->bpf_config.agg_rtts = true; + + err = parse_bounded_long(&user_int, optarg, 1, + 7 * S_PER_DAY, "aggregate"); + if (err) + return -EINVAL; + + config->agg_conf.aggregation_interval = + user_int * NS_PER_SECOND; + break; + case '4': + err = parse_bounded_long(&user_int, optarg, 0, 32, + "aggregate-subnets-v4"); + if (err) + return -EINVAL; + config->agg_conf.ipv4_prefix_len = user_int; + break; + case '6': + err = parse_bounded_long(&user_int, optarg, 0, 64, + "aggregate-subnets-v6"); + if (err) + return -EINVAL; + config->agg_conf.ipv6_prefix_len = user_int; + break; case 'h': printf("HELP:\n"); print_usage(argv); @@ -335,6 +418,12 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config) return -EINVAL; } + config->bpf_config.ipv4_prefix_mask = + htonl(0xffffffffUL << (32 - config->agg_conf.ipv4_prefix_len)); + config->bpf_config.ipv6_prefix_mask = + htobe64(0xffffffffffffffffUL + << (64 - config->agg_conf.ipv6_prefix_len)); + return 0; } @@ -649,6 +738,35 @@ static __u64 convert_monotonic_to_realtime(__u64 monotonic_time) return monotonic_time + offset; } +// Stolen from xdp-tool/lib/util/util.c +int try_snprintf(char *buf, size_t buf_len, const char *format, ...) +{ + va_list args; + int len; + + va_start(args, format); + len = vsnprintf(buf, buf_len, format, args); + va_end(args); + + if (len < 0) + return -EINVAL; + else if ((size_t)len >= buf_len) + return -ENAMETOOLONG; + + return 0; +} + +/* + * Is the passed ip an IPv4 address mapped into the IPv6 space as specified by + * RFC 4291 sec 2.5.5.2? + */ +static bool is_ipv4_in_ipv6(const struct in6_addr *ip) +{ + __u16 ipv4_prefix[] = { 0x0, 0x0, 0x0, 0x0, 0x0, 0xFFFF }; + + return memcmp(ipv4_prefix, ip, sizeof(ipv4_prefix)) == 0; +} + /* * Wrapper around inet_ntop designed to handle the "bug" that mapped IPv4 * addresses are formated as IPv6 addresses for AF_INET6 @@ -656,14 +774,58 @@ static __u64 convert_monotonic_to_realtime(__u64 monotonic_time) static int format_ip_address(char *buf, size_t size, int af, const struct in6_addr *addr) { + if (af == AF_UNSPEC) + af = is_ipv4_in_ipv6(addr) ? AF_INET : AF_INET6; + if (af == AF_INET) return inet_ntop(af, &addr->s6_addr[12], buf, size) ? -errno : - 0; + 0; else if (af == AF_INET6) return inet_ntop(af, addr, buf, size) ? -errno : 0; return -EINVAL; } +/* Formats IPv4 or IPv6 IP-prefix string from a struct ipprefix_key */ +static int format_ipprefix(char *buf, size_t size, int af, + struct ipprefix_key *prefix, __u8 prefix_len) +{ + union { + struct in_addr ipv4; + struct in6_addr ipv6; + } ip; + size_t iplen; + int err; + + if (af == AF_INET) { + if (size < INET_PREFIXSTRLEN) + return -ENOSPC; + if (prefix_len > 32) + return -EINVAL; + + ip.ipv4.s_addr = prefix->v4; + } else if (af == AF_INET6) { + if (size < INET6_PREFIXSTRLEN) + return -ENOSPC; + if (prefix_len > 64) + return -EINVAL; + + memcpy(&ip.ipv6, &prefix->v6, sizeof(__u64)); + memset(&ip.ipv6.s6_addr32[2], 0, sizeof(__u64)); + } else { + return -EINVAL; + } + + err = inet_ntop(af, &ip, buf, size) ? 0 : -errno; + if (err) + return err; + + iplen = strlen(buf); + err = try_snprintf(buf + iplen, size - iplen, "/%u", prefix_len); + buf[size - 1] = '\0'; + + return err; +} + static const char *proto_to_str(__u16 proto) { static char buf[8]; @@ -901,6 +1063,143 @@ static void handle_missed_events(void *ctx, int cpu, __u64 lost_cnt) fprintf(stderr, "Lost %llu events on CPU %d\n", lost_cnt, cpu); } +static void print_histogram(FILE *stream, + struct aggregated_rtt_stats *rtt_stats, int n_bins) +{ + int i; + + fprintf(stream, "[%llu", rtt_stats->bins[0]); + for (i = 1; i < n_bins; i++) + fprintf(stream, ",%llu", rtt_stats->bins[i]); + fprintf(stream, "]"); +} + +static void print_aggregated_rtts(FILE *stream, __u64 t, + struct ipprefix_key *prefix, int af, + __u8 prefix_len, + struct aggregated_rtt_stats *rtt_stats, + struct aggregation_config *agg_conf) +{ + char prefixstr[INET6_PREFIXSTRLEN] = { 0 }; + format_ipprefix(prefixstr, sizeof(prefixstr), af, prefix, prefix_len); + + print_ns_datetime(stream, t); + fprintf(stream, + ": %s -> min=%.6g ms, max=%.6g ms, histogram=", prefixstr, + (double)rtt_stats->min / NS_PER_MS, + (double)rtt_stats->max / NS_PER_MS); + print_histogram(stream, rtt_stats, agg_conf->n_bins); + fprintf(stream, "\n"); +} + +static bool aggregated_rtt_stats_empty(struct aggregated_rtt_stats *stats) +{ + return stats->max == 0; +} + +static void +merge_percpu_aggreated_rtts(struct aggregated_rtt_stats *percpu_stats, + struct aggregated_rtt_stats *merged_stats, + int n_cpus, int n_bins) +{ + int i, bin; + + memset(merged_stats, 0, sizeof(*merged_stats)); + + for (i = 0; i < n_cpus; i++) { + if (aggregated_rtt_stats_empty(&percpu_stats[i])) + continue; + + if (percpu_stats[i].max > merged_stats->max) + merged_stats->max = percpu_stats[i].max; + if (merged_stats->min == 0 || + percpu_stats[i].min < merged_stats->min) + merged_stats->min = percpu_stats[i].min; + + for (bin = 0; bin < n_bins; bin++) + merged_stats->bins[bin] += percpu_stats[i].bins[bin]; + } +} + +static void report_aggregated_rtt_mapentry( + struct ipprefix_key *prefix, struct aggregated_rtt_stats *percpu_stats, + int n_cpus, int af, __u8 prefix_len, __u64 t_monotonic, + struct aggregation_config *agg_conf) +{ + struct aggregated_rtt_stats merged_stats; + + merge_percpu_aggreated_rtts(percpu_stats, &merged_stats, n_cpus, + agg_conf->n_bins); + + // Only print prefixes which have RTT samples + if (!aggregated_rtt_stats_empty(&merged_stats)) + print_aggregated_rtts(stdout, t_monotonic, prefix, af, + prefix_len, &merged_stats, agg_conf); +} + +static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len, + __u64 t_monotonic, + struct aggregation_config *agg_conf) +{ + struct aggregated_rtt_stats *values = NULL; + void *keys = NULL; + int n_cpus = libbpf_num_possible_cpus(); + size_t keysize = af == AF_INET ? sizeof(__u32) : sizeof(__u64); + __u64 batch, total = 0; + __u32 count = AGG_BATCH_SIZE; + bool remaining_entries = true; + int err = 0, i; + + values = calloc(n_cpus, sizeof(*values) * AGG_BATCH_SIZE); + keys = calloc(AGG_BATCH_SIZE, keysize); + if (!values || !keys) { + err = -ENOMEM; + goto exit; + } + + while (remaining_entries) { + err = bpf_map_lookup_batch(map_fd, total ? &batch : NULL, + &batch, keys, values, &count, NULL); + if (err == -ENOENT) { + remaining_entries = false; + err = 0; + } else if (err) { + goto exit; + } + + for (i = 0; i < count; i++) { + report_aggregated_rtt_mapentry(keys + i * keysize, + values + i * n_cpus, + n_cpus, af, prefix_len, + t_monotonic, agg_conf); + } + + total += count; + count = AGG_BATCH_SIZE; // Ensure we always try to fetch full batch + } + +exit: + free(values); + free(keys); + return err; +} + +static int report_aggregated_rtts(struct aggregation_maps *maps, + struct aggregation_config *agg_conf) +{ + __u64 t = get_time_ns(CLOCK_MONOTONIC); + int err; + + err = report_aggregated_rtt_map(maps->map_v4_fd, AF_INET, + agg_conf->ipv4_prefix_len, t, agg_conf); + if (err) + return err; + + err = report_aggregated_rtt_map(maps->map_v6_fd, AF_INET6, + agg_conf->ipv6_prefix_len, t, agg_conf); + return err; +} + /* * Sets only the necessary programs in the object file to autoload. * @@ -1164,6 +1463,98 @@ static int handle_pipefd(int pipe_rfd) return PPING_ABORT; } +int fetch_aggregation_map_fds(struct bpf_object *obj, + struct aggregation_maps *maps) +{ + maps->map_v4_fd = bpf_object__find_map_fd_by_name(obj, "map_v4_agg"); + maps->map_v6_fd = bpf_object__find_map_fd_by_name(obj, "map_v6_agg"); + + if (maps->map_v4_fd < 0 || maps->map_v6_fd < 0) { + fprintf(stderr, "Unable to find aggregation maps (%d/%d).\n", + maps->map_v4_fd, maps->map_v6_fd); + return -ENOENT; + } + + return 0; +} + +static int setup_timer(__u64 init_delay_ns, __u64 interval_ns) +{ + struct itimerspec timercfg = { + .it_value = { .tv_sec = init_delay_ns / NS_PER_SECOND, + .tv_nsec = init_delay_ns % NS_PER_SECOND }, + .it_interval = { .tv_sec = interval_ns / NS_PER_SECOND, + .tv_nsec = interval_ns % NS_PER_SECOND } + }; + int fd, err; + + fd = timerfd_create(CLOCK_MONOTONIC, 0); + if (fd < 0) { + return -errno; + } + + err = timerfd_settime(fd, 0, &timercfg, NULL); + if (err) { + err = -errno; + close(fd); + return err; + } + + return fd; +} + +static int init_aggregation_timer(struct bpf_object *obj, + struct pping_config *config) +{ + int err, fd; + + err = fetch_aggregation_map_fds(obj, &config->agg_maps); + if (err) { + fprintf(stderr, "Failed fetching aggregation maps: %s\n", + get_libbpf_strerror(err)); + return err; + } + + fd = setup_timer(config->agg_conf.aggregation_interval, + config->agg_conf.aggregation_interval); + if (fd < 0) { + fprintf(stderr, + "Failed creating timer for periodic aggregation: %s\n", + get_libbpf_strerror(fd)); + return fd; + } + + return fd; +} + +static int handle_aggregation_timer(int timer_fd, struct aggregation_maps *maps, + struct aggregation_config *agg_conf) +{ + __u64 timer_exps; + int ret, err; + + ret = read(timer_fd, &timer_exps, sizeof(timer_exps)); + if (ret != sizeof(timer_exps)) { + fprintf(stderr, "Failed reading timerfd\n"); + return -EBADFD; + } + + if (timer_exps > 1) { + fprintf(stderr, + "Warning - missed %llu aggregation timer expirations\n", + timer_exps - 1); + } + + err = report_aggregated_rtts(maps, agg_conf); + if (err) { + fprintf(stderr, "Failed reporting aggregated RTTs: %s\n", + get_libbpf_strerror(err)); + return err; + } + + return 0; +} + static int epoll_add_event_type(int epfd, int fd, __u64 event_type, __u64 value) { struct epoll_event ev = { @@ -1197,7 +1588,7 @@ static int epoll_add_perf_buffer(int epfd, struct perf_buffer *pb) } static int epoll_add_events(int epfd, struct perf_buffer *pb, int sigfd, - int pipe_rfd) + int pipe_rfd, int aggfd) { int err; @@ -1225,6 +1616,17 @@ static int epoll_add_events(int epfd, struct perf_buffer *pb, int sigfd, return err; } + if (aggfd >= 0) { + err = epoll_add_event_type(epfd, aggfd, + PPING_EPEVENT_TYPE_AGGTIMER, aggfd); + if (err) { + fprintf(stderr, + "Failed adding aggregation timerfd to epoll instance: %s\n", + get_libbpf_strerror(err)); + return err; + } + } + return 0; } @@ -1247,6 +1649,11 @@ static int epoll_poll_events(int epfd, struct pping_config *config, err = perf_buffer__consume_buffer( pb, events[i].data.u64 & PPING_EPEVENT_MASK); break; + case PPING_EPEVENT_TYPE_AGGTIMER: + err = handle_aggregation_timer( + events[i].data.u64 & PPING_EPEVENT_MASK, + &config->agg_maps, &config->agg_conf); + break; case PPING_EPEVENT_TYPE_SIGNAL: err = handle_signalfd(events[i].data.u64 & PPING_EPEVENT_MASK); @@ -1273,7 +1680,7 @@ int main(int argc, char *argv[]) void *thread_err; struct bpf_object *obj = NULL; struct perf_buffer *pb = NULL; - int epfd, sigfd; + int epfd, sigfd, aggfd; DECLARE_LIBBPF_OPTS(bpf_tc_opts, tc_ingress_opts); DECLARE_LIBBPF_OPTS(bpf_tc_opts, tc_egress_opts); @@ -1284,6 +1691,11 @@ int main(int argc, char *argv[]) .use_srtt = false }, .clean_args = { .cleanup_interval = 1 * NS_PER_SECOND, .valid_thread = false }, + .agg_conf = { .aggregation_interval = 1 * NS_PER_SECOND, + .ipv4_prefix_len = 24, + .ipv6_prefix_len = 48, + .n_bins = RTT_AGG_NR_BINS, + .bin_width = RTT_AGG_BIN_WIDTH }, .object_path = "pping_kern.o", .ingress_prog = PROG_INGRESS_TC, .egress_prog = PROG_EGRESS_TC, @@ -1344,6 +1756,14 @@ int main(int argc, char *argv[]) output_format_to_str(config.output_format), tracked_protocols_to_str(&config), config.ifname); + if (config.bpf_config.agg_rtts) + fprintf(stderr, + "Aggregating RTTs in histograms with %llu %.6g ms wide bins every %.9g seconds\n", + config.agg_conf.n_bins, + (double)config.agg_conf.bin_width / NS_PER_MS, + (double)config.agg_conf.aggregation_interval / + NS_PER_SECOND); + // Setup signalhandling (allow graceful shutdown on SIGINT/SIGTERM) sigfd = init_signalfd(); if (sigfd < 0) { @@ -1374,14 +1794,26 @@ int main(int argc, char *argv[]) goto cleanup_mapcleaning; } + if (config.bpf_config.agg_rtts) { + aggfd = init_aggregation_timer(obj, &config); + if (aggfd < 0) { + fprintf(stderr, + "Failed setting up aggregation timerfd: %s\n", + get_libbpf_strerror(aggfd)); + goto cleanup_perf_buffer; + } + } else { + aggfd = -1; + } + epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0) { fprintf(stderr, "Failed creating epoll instance: %s\n", get_libbpf_strerror(err)); - goto cleanup_perf_buffer; + goto cleanup_aggfd; } - err = epoll_add_events(epfd, pb, sigfd, config.clean_args.pipe_rfd); + err = epoll_add_events(epfd, pb, sigfd, config.clean_args.pipe_rfd, aggfd); if (err) { fprintf(stderr, "Failed adding events to epoll instace: %s\n", get_libbpf_strerror(err)); @@ -1410,6 +1842,10 @@ int main(int argc, char *argv[]) cleanup_epfd: close(epfd); +cleanup_aggfd: + if (aggfd >= 0) + close(aggfd); + cleanup_perf_buffer: perf_buffer__free(pb); diff --git a/pping/pping.h b/pping/pping.h index a2a5eef..cc345d3 100644 --- a/pping/pping.h +++ b/pping/pping.h @@ -22,6 +22,9 @@ typedef __u64 fixpoint64; #define EVENT_TYPE_MAP_FULL 3 #define EVENT_TYPE_MAP_CLEAN 4 +#define RTT_AGG_NR_BINS 1000UL +#define RTT_AGG_BIN_WIDTH (1 * NS_PER_MS) // 1 ms + enum __attribute__((__packed__)) flow_event_type { FLOW_EVENT_NONE, FLOW_EVENT_OPENING, @@ -60,12 +63,23 @@ enum __attribute__((__packed__)) connection_state { struct bpf_config { __u64 rate_limit; fixpoint64 rtt_rate; + __u64 ipv6_prefix_mask; + __u32 ipv4_prefix_mask; bool use_srtt; bool track_tcp; bool track_icmp; bool localfilt; bool skip_syn; - __u8 reserved[3]; + bool push_individual_events; + bool agg_rtts; + __u8 reserved; +}; + +struct ipprefix_key { + union { + __u32 v4; + __u64 v6; + }; }; /* @@ -81,9 +95,9 @@ struct flow_address { /* * Struct to hold a full network tuple - * The ipv member is technically not necessary, but makes it easier to + * The ipv member is technically not necessary, but makes it easier to * determine if saddr/daddr are IPv4 or IPv6 address (don't need to look at the - * first 12 bytes of address). The proto memeber is not currently used, but + * first 12 bytes of address). The proto memeber is not currently used, but * could be useful once pping is extended to work for other protocols than TCP. */ struct network_tuple { @@ -210,4 +224,10 @@ union pping_event { struct map_clean_event map_clean_event; }; +struct aggregated_rtt_stats { + __u64 min; + __u64 max; + __u64 bins[RTT_AGG_NR_BINS]; +}; + #endif diff --git a/pping/pping_kern.c b/pping/pping_kern.c index 9a28341..edd289b 100644 --- a/pping/pping_kern.c +++ b/pping/pping_kern.c @@ -11,6 +11,7 @@ #include #include #include +#include // overwrite xdp/parsing_helpers.h value to avoid hitting verifier limit #ifdef IPV6_EXT_MAX_CHAIN @@ -45,6 +46,7 @@ #define MAP_TIMESTAMP_SIZE 131072UL // 2^17, Maximum number of in-flight/unmatched timestamps we can keep track of #define MAP_FLOWSTATE_SIZE 131072UL // 2^17, Maximum number of concurrent flows that can be tracked +#define MAP_AGGREGATION_SIZE 16384UL // 2^14, Maximum number of different IP-prefixes we can aggregate stats for #define MAX_MEMCMP_SIZE 128 @@ -128,6 +130,11 @@ char _license[] SEC("license") = "GPL"; static volatile const struct bpf_config config = {}; static volatile __u64 last_warn_time[2] = { 0 }; +// Keep an empty aggregated_rtt_stats as a global variable to use as a template +// when creating new entries. That way, it won't have to be allocated on stack +// (where it won't fit anyways) and initialized each time during run time. +static struct aggregated_rtt_stats empty_stats = { 0 }; + // Map definitions struct { @@ -150,6 +157,22 @@ struct { __uint(value_size, sizeof(__u32)); } events SEC(".maps"); +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_HASH); + __type(key, __u32); + __type(value, struct aggregated_rtt_stats); + __uint(max_entries, MAP_AGGREGATION_SIZE); + __uint(map_flags, BPF_F_NO_PREALLOC); +} map_v4_agg SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_HASH); + __type(key, __u64); + __type(value, struct aggregated_rtt_stats); + __uint(max_entries, MAP_AGGREGATION_SIZE); + __uint(map_flags, BPF_F_NO_PREALLOC); +} map_v6_agg SEC(".maps"); + // Help functions @@ -640,6 +663,9 @@ static bool is_rate_limited(__u64 now, __u64 last_ts, __u64 rtt) static void send_flow_open_event(void *ctx, struct packet_info *p_info, struct flow_state *rev_flow) { + if (!config.push_individual_events) + return; + struct flow_event fe = { .event_type = EVENT_TYPE_FLOW, .flow_event_type = FLOW_EVENT_OPENING, @@ -663,6 +689,9 @@ static void send_flow_open_event(void *ctx, struct packet_info *p_info, static void send_flow_event(void *ctx, struct packet_info *p_info, bool rev_flow) { + if (!config.push_individual_events) + return; + struct flow_event fe = { .event_type = EVENT_TYPE_FLOW, .flow_event_type = p_info->event_type, @@ -707,8 +736,11 @@ static void send_map_full_event(void *ctx, struct packet_info *p_info, } static void send_rtt_event(void *ctx, __u64 rtt, struct flow_state *f_state, - struct packet_info *p_info) + struct packet_info *p_info) { + if (!config.push_individual_events) + return; + struct rtt_event re = { .event_type = EVENT_TYPE_RTT, .timestamp = p_info->time, @@ -926,6 +958,68 @@ static bool is_new_identifier(struct packet_id *pid, struct flow_state *f_state) return pid->identifier != f_state->last_id; } +static void create_ipprefix_key_v4(__u32 *prefix_key, struct in6_addr *ip) +{ + *prefix_key = ip->s6_addr32[3] & config.ipv4_prefix_mask; +} + +static void create_ipprefix_key_v6(__u64 *prefix_key, struct in6_addr *ip) +{ + *prefix_key = *(__u64 *)&ip->in6_u & config.ipv6_prefix_mask; + // *prefix_key = *(__u64 *)ip & config.ipv6_prefix_mask; // gives verifier rejection "misaligned stack access off" +} + +static struct aggregated_rtt_stats * +lookup_or_create_aggregation_stats(struct in6_addr *ip, __u8 ipv) +{ + struct aggregated_rtt_stats *agg; + struct ipprefix_key key; + void *agg_map; + int err; + + if (ipv == AF_INET) { + create_ipprefix_key_v4(&key.v4, ip); + agg_map = &map_v4_agg; + + } else { + create_ipprefix_key_v6(&key.v6, ip); + agg_map = &map_v6_agg; + } + + agg = bpf_map_lookup_elem(agg_map, &key); + if (agg) + return agg; + + // No existing entry, try to create new one + err = bpf_map_update_elem(agg_map, &key, &empty_stats, BPF_NOEXIST); + if (err && err != -EEXIST) + return NULL; + + return bpf_map_lookup_elem(agg_map, &key); +} + +static void aggregate_rtt(__u64 rtt, struct in6_addr *ip, __u8 ipv) +{ + if (!config.agg_rtts) + return; + + struct aggregated_rtt_stats *rtt_agg; + int bin_idx; + + rtt_agg = lookup_or_create_aggregation_stats(ip, ipv); + if (!rtt_agg) + return; + + if (!rtt_agg->min || rtt < rtt_agg->min) + rtt_agg->min = rtt; + if (rtt > rtt_agg->max) + rtt_agg->max = rtt; + + bin_idx = rtt / RTT_AGG_BIN_WIDTH; + bin_idx = bin_idx >= RTT_AGG_NR_BINS ? RTT_AGG_NR_BINS - 1 : bin_idx; + rtt_agg->bins[bin_idx]++; +} + /* * Attempt to create a timestamp-entry for packet p_info for flow in f_state */ @@ -999,6 +1093,7 @@ static void pping_match_packet(struct flow_state *f_state, void *ctx, f_state->srtt = calculate_srtt(f_state->srtt, rtt); send_rtt_event(ctx, rtt, f_state, p_info); + aggregate_rtt(rtt, &p_info->pid.flow.saddr.ip, p_info->pid.flow.ipv); } /* @@ -1090,6 +1185,9 @@ static bool is_flow_old(struct network_tuple *flow, struct flow_state *f_state, static void send_flow_timeout_message(void *ctx, struct network_tuple *flow, __u64 time) { + if (!config.push_individual_events) + return; + struct flow_event fe = { .event_type = EVENT_TYPE_FLOW, .flow_event_type = FLOW_EVENT_CLOSING,