mirror of
https://github.com/xdp-project/bpf-examples.git
synced 2024-05-06 15:54:53 +00:00
pping: Add option to aggregate RTTs
Add an option -a or --aggregate to provide an aggregate report of RTT samples every X seconds. This is currently mutually exclusive with the normal per-RTT sample reports. The aggregated stats are never reset, and thus contain all RTTs since the start of tracing. The next commit will change this to reset the stats after every report, so that each report only contain the RTTs since the last report. The RTTs are aggregated and reported per IP-prefix, where the user can modify the size of the prefixes used for IPv4 and IPv6 using the --aggregate-subnet-v4/v6 flags. In this intital implementation for aggregating RTTs, the minimum and maximum RTT are tracked and all RTTs are added to a histogram. It uses a predetermined number of bins of equal width (set to 1000 bins, each 1 ms wide), see RTT_AGG_NR_BINS and RTT_AGG_BIN_WIDTH in pping.h. In the future this could be changed to use more sophisticated histograms that better capture a wide variety of RTTs. Implement the periodic reporting of RTTs by using a timerfd (configured to the user-provided interval) and add it to the main epoll-loop. To minimize overhead from the hash lookups, use separate maps for IPv4 and IPv6, so that for IPv4 traffic the hashmap key is only 4 bytes (instead of 16). Furthermore, limit the maximum IPv6 prefix size to 64 so that the IPv6 map can use a 8 byte key. This limits the maximum prefix size for IPv6 to /64. Signed-off-by: Simon Sundberg <simon.sundberg@kau.se>
This commit is contained in:
480
pping/pping.c
480
pping/pping.c
@ -21,11 +21,16 @@ static const char *__doc__ =
|
||||
#include <pthread.h>
|
||||
#include <xdp/libxdp.h>
|
||||
#include <sys/signalfd.h>
|
||||
#include <sys/timerfd.h>
|
||||
#include <sys/epoll.h>
|
||||
|
||||
#include "json_writer.h"
|
||||
#include "pping.h" //common structs for user-space and BPF parts
|
||||
|
||||
// Maximum string length for IP prefix (including /xx[x] and '\0')
|
||||
#define INET_PREFIXSTRLEN (INET_ADDRSTRLEN + 3)
|
||||
#define INET6_PREFIXSTRLEN (INET6_ADDRSTRLEN + 4)
|
||||
|
||||
#define PERF_BUFFER_PAGES 64 // Related to the perf-buffer size?
|
||||
|
||||
#define MAX_PATH_LEN 1024
|
||||
@ -46,9 +51,12 @@ static const char *__doc__ =
|
||||
#define PPING_EPEVENT_TYPE_PERFBUF (1ULL << 63)
|
||||
#define PPING_EPEVENT_TYPE_SIGNAL (1ULL << 62)
|
||||
#define PPING_EPEVENT_TYPE_PIPE (1ULL << 61)
|
||||
#define PPING_EPEVENT_TYPE_AGGTIMER (1ULL << 60)
|
||||
#define PPING_EPEVENT_MASK \
|
||||
(~(PPING_EPEVENT_TYPE_PERFBUF | PPING_EPEVENT_TYPE_SIGNAL | \
|
||||
PPING_EPEVENT_TYPE_PIPE))
|
||||
PPING_EPEVENT_TYPE_PIPE | PPING_EPEVENT_TYPE_AGGTIMER))
|
||||
|
||||
#define AGG_BATCH_SIZE 64 // Batch size for fetching aggregation maps (bpf_map_lookup_batch)
|
||||
|
||||
/* Value that can be returned by functions to indicate the program should abort
|
||||
* Should ideally not collide with any error codes (including libbpf ones), but
|
||||
@ -88,12 +96,27 @@ struct map_cleanup_args {
|
||||
bool valid_thread;
|
||||
};
|
||||
|
||||
struct aggregation_config {
|
||||
__u64 aggregation_interval;
|
||||
__u64 n_bins;
|
||||
__u64 bin_width;
|
||||
__u8 ipv4_prefix_len;
|
||||
__u8 ipv6_prefix_len;
|
||||
};
|
||||
|
||||
struct aggregation_maps {
|
||||
int map_v4_fd;
|
||||
int map_v6_fd;
|
||||
};
|
||||
|
||||
// Store configuration values in struct to easily pass around
|
||||
struct pping_config {
|
||||
struct bpf_config bpf_config;
|
||||
struct bpf_tc_opts tc_ingress_opts;
|
||||
struct bpf_tc_opts tc_egress_opts;
|
||||
struct map_cleanup_args clean_args;
|
||||
struct aggregation_config agg_conf;
|
||||
struct aggregation_maps agg_maps;
|
||||
char *object_path;
|
||||
char *ingress_prog;
|
||||
char *egress_prog;
|
||||
@ -117,20 +140,23 @@ static json_writer_t *json_ctx = NULL;
|
||||
static void (*print_event_func)(const union pping_event *) = NULL;
|
||||
|
||||
static const struct option long_options[] = {
|
||||
{ "help", no_argument, NULL, 'h' },
|
||||
{ "interface", required_argument, NULL, 'i' }, // Name of interface to run on
|
||||
{ "rate-limit", required_argument, NULL, 'r' }, // Sampling rate-limit in ms
|
||||
{ "rtt-rate", required_argument, NULL, 'R' }, // Sampling rate in terms of flow-RTT (ex 1 sample per RTT-interval)
|
||||
{ "rtt-type", required_argument, NULL, 't' }, // What type of RTT the RTT-rate should be applied to ("min" or "smoothed"), only relevant if rtt-rate is provided
|
||||
{ "force", no_argument, NULL, 'f' }, // Overwrite any existing XDP program on interface, remove qdisc on cleanup
|
||||
{ "cleanup-interval", required_argument, NULL, 'c' }, // Map cleaning interval in s, 0 to disable
|
||||
{ "format", required_argument, NULL, 'F' }, // Which format to output in (standard/json/ppviz)
|
||||
{ "ingress-hook", required_argument, NULL, 'I' }, // Use tc or XDP as ingress hook
|
||||
{ "xdp-mode", required_argument, NULL, 'x' }, // Which xdp-mode to use (unspecified, native or generic)
|
||||
{ "tcp", no_argument, NULL, 'T' }, // Calculate and report RTTs for TCP traffic (with TCP timestamps)
|
||||
{ "icmp", no_argument, NULL, 'C' }, // Calculate and report RTTs for ICMP echo-reply traffic
|
||||
{ "include-local", no_argument, NULL, 'l' }, // Also report "internal" RTTs
|
||||
{ "include-SYN", no_argument, NULL, 's' }, // Include SYN-packets in tracking (may fill up flow state with half-open connections)
|
||||
{ "help", no_argument, NULL, 'h' },
|
||||
{ "interface", required_argument, NULL, 'i' }, // Name of interface to run on
|
||||
{ "rate-limit", required_argument, NULL, 'r' }, // Sampling rate-limit in ms
|
||||
{ "rtt-rate", required_argument, NULL, 'R' }, // Sampling rate in terms of flow-RTT (ex 1 sample per RTT-interval)
|
||||
{ "rtt-type", required_argument, NULL, 't' }, // What type of RTT the RTT-rate should be applied to ("min" or "smoothed"), only relevant if rtt-rate is provided
|
||||
{ "force", no_argument, NULL, 'f' }, // Overwrite any existing XDP program on interface, remove qdisc on cleanup
|
||||
{ "cleanup-interval", required_argument, NULL, 'c' }, // Map cleaning interval in s, 0 to disable
|
||||
{ "format", required_argument, NULL, 'F' }, // Which format to output in (standard/json/ppviz)
|
||||
{ "ingress-hook", required_argument, NULL, 'I' }, // Use tc or XDP as ingress hook
|
||||
{ "xdp-mode", required_argument, NULL, 'x' }, // Which xdp-mode to use (unspecified, native or generic)
|
||||
{ "tcp", no_argument, NULL, 'T' }, // Calculate and report RTTs for TCP traffic (with TCP timestamps)
|
||||
{ "icmp", no_argument, NULL, 'C' }, // Calculate and report RTTs for ICMP echo-reply traffic
|
||||
{ "include-local", no_argument, NULL, 'l' }, // Also report "internal" RTTs
|
||||
{ "include-SYN", no_argument, NULL, 's' }, // Include SYN-packets in tracking (may fill up flow state with half-open connections)
|
||||
{ "aggregate", required_argument, NULL, 'a' }, // Aggregate RTTs every X seconds instead of reporting them individually
|
||||
{ "aggregate-subnets-v4", required_argument, NULL, '4' }, // Set the subnet size for IPv4 when aggregating (default 24)
|
||||
{ "aggregate-subnets-v6", required_argument, NULL, '6' }, // Set the subnet size for IPv6 when aggregating (default 48)
|
||||
{ 0, 0, NULL, 0 }
|
||||
};
|
||||
|
||||
@ -197,19 +223,49 @@ static int parse_bounded_double(double *res, const char *str, double low,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int parse_bounded_long(long long *res, const char *str, long long low,
|
||||
long long high, const char *name)
|
||||
{
|
||||
char *endptr;
|
||||
errno = 0;
|
||||
|
||||
*res = strtoll(str, &endptr, 10);
|
||||
if (endptr == str || strlen(str) != endptr - str) {
|
||||
fprintf(stderr, "%s %s is not a valid integer\n", name, str);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
if (errno == ERANGE) {
|
||||
fprintf(stderr, "%s %s overflowed\n", name, str);
|
||||
return -ERANGE;
|
||||
}
|
||||
|
||||
if (*res < low || *res > high) {
|
||||
fprintf(stderr, "%s must be in range [%lld, %lld]\n", name, low,
|
||||
high);
|
||||
return -ERANGE;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int parse_arguments(int argc, char *argv[], struct pping_config *config)
|
||||
{
|
||||
int err, opt;
|
||||
double user_float;
|
||||
long long user_int;
|
||||
|
||||
config->ifindex = 0;
|
||||
config->bpf_config.localfilt = true;
|
||||
config->force = false;
|
||||
|
||||
config->bpf_config.localfilt = true;
|
||||
config->bpf_config.track_tcp = false;
|
||||
config->bpf_config.track_icmp = false;
|
||||
config->bpf_config.skip_syn = true;
|
||||
config->bpf_config.push_individual_events = true;
|
||||
config->bpf_config.agg_rtts = false;
|
||||
|
||||
while ((opt = getopt_long(argc, argv, "hflTCsi:r:R:t:c:F:I:x:",
|
||||
while ((opt = getopt_long(argc, argv, "hflTCsi:r:R:t:c:F:I:x:a:4:6:",
|
||||
long_options, NULL)) != -1) {
|
||||
switch (opt) {
|
||||
case 'i':
|
||||
@ -319,6 +375,33 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config)
|
||||
return -EINVAL;
|
||||
}
|
||||
break;
|
||||
case 'a':
|
||||
/* Aggregated output currently disables individual RTT */
|
||||
config->bpf_config.push_individual_events = false;
|
||||
config->bpf_config.agg_rtts = true;
|
||||
|
||||
err = parse_bounded_long(&user_int, optarg, 1,
|
||||
7 * S_PER_DAY, "aggregate");
|
||||
if (err)
|
||||
return -EINVAL;
|
||||
|
||||
config->agg_conf.aggregation_interval =
|
||||
user_int * NS_PER_SECOND;
|
||||
break;
|
||||
case '4':
|
||||
err = parse_bounded_long(&user_int, optarg, 0, 32,
|
||||
"aggregate-subnets-v4");
|
||||
if (err)
|
||||
return -EINVAL;
|
||||
config->agg_conf.ipv4_prefix_len = user_int;
|
||||
break;
|
||||
case '6':
|
||||
err = parse_bounded_long(&user_int, optarg, 0, 64,
|
||||
"aggregate-subnets-v6");
|
||||
if (err)
|
||||
return -EINVAL;
|
||||
config->agg_conf.ipv6_prefix_len = user_int;
|
||||
break;
|
||||
case 'h':
|
||||
printf("HELP:\n");
|
||||
print_usage(argv);
|
||||
@ -335,6 +418,12 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config)
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
config->bpf_config.ipv4_prefix_mask =
|
||||
htonl(0xffffffffUL << (32 - config->agg_conf.ipv4_prefix_len));
|
||||
config->bpf_config.ipv6_prefix_mask =
|
||||
htobe64(0xffffffffffffffffUL
|
||||
<< (64 - config->agg_conf.ipv6_prefix_len));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -649,6 +738,35 @@ static __u64 convert_monotonic_to_realtime(__u64 monotonic_time)
|
||||
return monotonic_time + offset;
|
||||
}
|
||||
|
||||
// Stolen from xdp-tool/lib/util/util.c
|
||||
int try_snprintf(char *buf, size_t buf_len, const char *format, ...)
|
||||
{
|
||||
va_list args;
|
||||
int len;
|
||||
|
||||
va_start(args, format);
|
||||
len = vsnprintf(buf, buf_len, format, args);
|
||||
va_end(args);
|
||||
|
||||
if (len < 0)
|
||||
return -EINVAL;
|
||||
else if ((size_t)len >= buf_len)
|
||||
return -ENAMETOOLONG;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Is the passed ip an IPv4 address mapped into the IPv6 space as specified by
|
||||
* RFC 4291 sec 2.5.5.2?
|
||||
*/
|
||||
static bool is_ipv4_in_ipv6(const struct in6_addr *ip)
|
||||
{
|
||||
__u16 ipv4_prefix[] = { 0x0, 0x0, 0x0, 0x0, 0x0, 0xFFFF };
|
||||
|
||||
return memcmp(ipv4_prefix, ip, sizeof(ipv4_prefix)) == 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wrapper around inet_ntop designed to handle the "bug" that mapped IPv4
|
||||
* addresses are formated as IPv6 addresses for AF_INET6
|
||||
@ -656,14 +774,58 @@ static __u64 convert_monotonic_to_realtime(__u64 monotonic_time)
|
||||
static int format_ip_address(char *buf, size_t size, int af,
|
||||
const struct in6_addr *addr)
|
||||
{
|
||||
if (af == AF_UNSPEC)
|
||||
af = is_ipv4_in_ipv6(addr) ? AF_INET : AF_INET6;
|
||||
|
||||
if (af == AF_INET)
|
||||
return inet_ntop(af, &addr->s6_addr[12], buf, size) ? -errno :
|
||||
0;
|
||||
0;
|
||||
else if (af == AF_INET6)
|
||||
return inet_ntop(af, addr, buf, size) ? -errno : 0;
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
/* Formats IPv4 or IPv6 IP-prefix string from a struct ipprefix_key */
|
||||
static int format_ipprefix(char *buf, size_t size, int af,
|
||||
struct ipprefix_key *prefix, __u8 prefix_len)
|
||||
{
|
||||
union {
|
||||
struct in_addr ipv4;
|
||||
struct in6_addr ipv6;
|
||||
} ip;
|
||||
size_t iplen;
|
||||
int err;
|
||||
|
||||
if (af == AF_INET) {
|
||||
if (size < INET_PREFIXSTRLEN)
|
||||
return -ENOSPC;
|
||||
if (prefix_len > 32)
|
||||
return -EINVAL;
|
||||
|
||||
ip.ipv4.s_addr = prefix->v4;
|
||||
} else if (af == AF_INET6) {
|
||||
if (size < INET6_PREFIXSTRLEN)
|
||||
return -ENOSPC;
|
||||
if (prefix_len > 64)
|
||||
return -EINVAL;
|
||||
|
||||
memcpy(&ip.ipv6, &prefix->v6, sizeof(__u64));
|
||||
memset(&ip.ipv6.s6_addr32[2], 0, sizeof(__u64));
|
||||
} else {
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
err = inet_ntop(af, &ip, buf, size) ? 0 : -errno;
|
||||
if (err)
|
||||
return err;
|
||||
|
||||
iplen = strlen(buf);
|
||||
err = try_snprintf(buf + iplen, size - iplen, "/%u", prefix_len);
|
||||
buf[size - 1] = '\0';
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
static const char *proto_to_str(__u16 proto)
|
||||
{
|
||||
static char buf[8];
|
||||
@ -901,6 +1063,143 @@ static void handle_missed_events(void *ctx, int cpu, __u64 lost_cnt)
|
||||
fprintf(stderr, "Lost %llu events on CPU %d\n", lost_cnt, cpu);
|
||||
}
|
||||
|
||||
static void print_histogram(FILE *stream,
|
||||
struct aggregated_rtt_stats *rtt_stats, int n_bins)
|
||||
{
|
||||
int i;
|
||||
|
||||
fprintf(stream, "[%llu", rtt_stats->bins[0]);
|
||||
for (i = 1; i < n_bins; i++)
|
||||
fprintf(stream, ",%llu", rtt_stats->bins[i]);
|
||||
fprintf(stream, "]");
|
||||
}
|
||||
|
||||
static void print_aggregated_rtts(FILE *stream, __u64 t,
|
||||
struct ipprefix_key *prefix, int af,
|
||||
__u8 prefix_len,
|
||||
struct aggregated_rtt_stats *rtt_stats,
|
||||
struct aggregation_config *agg_conf)
|
||||
{
|
||||
char prefixstr[INET6_PREFIXSTRLEN] = { 0 };
|
||||
format_ipprefix(prefixstr, sizeof(prefixstr), af, prefix, prefix_len);
|
||||
|
||||
print_ns_datetime(stream, t);
|
||||
fprintf(stream,
|
||||
": %s -> min=%.6g ms, max=%.6g ms, histogram=", prefixstr,
|
||||
(double)rtt_stats->min / NS_PER_MS,
|
||||
(double)rtt_stats->max / NS_PER_MS);
|
||||
print_histogram(stream, rtt_stats, agg_conf->n_bins);
|
||||
fprintf(stream, "\n");
|
||||
}
|
||||
|
||||
static bool aggregated_rtt_stats_empty(struct aggregated_rtt_stats *stats)
|
||||
{
|
||||
return stats->max == 0;
|
||||
}
|
||||
|
||||
static void
|
||||
merge_percpu_aggreated_rtts(struct aggregated_rtt_stats *percpu_stats,
|
||||
struct aggregated_rtt_stats *merged_stats,
|
||||
int n_cpus, int n_bins)
|
||||
{
|
||||
int i, bin;
|
||||
|
||||
memset(merged_stats, 0, sizeof(*merged_stats));
|
||||
|
||||
for (i = 0; i < n_cpus; i++) {
|
||||
if (aggregated_rtt_stats_empty(&percpu_stats[i]))
|
||||
continue;
|
||||
|
||||
if (percpu_stats[i].max > merged_stats->max)
|
||||
merged_stats->max = percpu_stats[i].max;
|
||||
if (merged_stats->min == 0 ||
|
||||
percpu_stats[i].min < merged_stats->min)
|
||||
merged_stats->min = percpu_stats[i].min;
|
||||
|
||||
for (bin = 0; bin < n_bins; bin++)
|
||||
merged_stats->bins[bin] += percpu_stats[i].bins[bin];
|
||||
}
|
||||
}
|
||||
|
||||
static void report_aggregated_rtt_mapentry(
|
||||
struct ipprefix_key *prefix, struct aggregated_rtt_stats *percpu_stats,
|
||||
int n_cpus, int af, __u8 prefix_len, __u64 t_monotonic,
|
||||
struct aggregation_config *agg_conf)
|
||||
{
|
||||
struct aggregated_rtt_stats merged_stats;
|
||||
|
||||
merge_percpu_aggreated_rtts(percpu_stats, &merged_stats, n_cpus,
|
||||
agg_conf->n_bins);
|
||||
|
||||
// Only print prefixes which have RTT samples
|
||||
if (!aggregated_rtt_stats_empty(&merged_stats))
|
||||
print_aggregated_rtts(stdout, t_monotonic, prefix, af,
|
||||
prefix_len, &merged_stats, agg_conf);
|
||||
}
|
||||
|
||||
static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len,
|
||||
__u64 t_monotonic,
|
||||
struct aggregation_config *agg_conf)
|
||||
{
|
||||
struct aggregated_rtt_stats *values = NULL;
|
||||
void *keys = NULL;
|
||||
int n_cpus = libbpf_num_possible_cpus();
|
||||
size_t keysize = af == AF_INET ? sizeof(__u32) : sizeof(__u64);
|
||||
__u64 batch, total = 0;
|
||||
__u32 count = AGG_BATCH_SIZE;
|
||||
bool remaining_entries = true;
|
||||
int err = 0, i;
|
||||
|
||||
values = calloc(n_cpus, sizeof(*values) * AGG_BATCH_SIZE);
|
||||
keys = calloc(AGG_BATCH_SIZE, keysize);
|
||||
if (!values || !keys) {
|
||||
err = -ENOMEM;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
while (remaining_entries) {
|
||||
err = bpf_map_lookup_batch(map_fd, total ? &batch : NULL,
|
||||
&batch, keys, values, &count, NULL);
|
||||
if (err == -ENOENT) {
|
||||
remaining_entries = false;
|
||||
err = 0;
|
||||
} else if (err) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
for (i = 0; i < count; i++) {
|
||||
report_aggregated_rtt_mapentry(keys + i * keysize,
|
||||
values + i * n_cpus,
|
||||
n_cpus, af, prefix_len,
|
||||
t_monotonic, agg_conf);
|
||||
}
|
||||
|
||||
total += count;
|
||||
count = AGG_BATCH_SIZE; // Ensure we always try to fetch full batch
|
||||
}
|
||||
|
||||
exit:
|
||||
free(values);
|
||||
free(keys);
|
||||
return err;
|
||||
}
|
||||
|
||||
static int report_aggregated_rtts(struct aggregation_maps *maps,
|
||||
struct aggregation_config *agg_conf)
|
||||
{
|
||||
__u64 t = get_time_ns(CLOCK_MONOTONIC);
|
||||
int err;
|
||||
|
||||
err = report_aggregated_rtt_map(maps->map_v4_fd, AF_INET,
|
||||
agg_conf->ipv4_prefix_len, t, agg_conf);
|
||||
if (err)
|
||||
return err;
|
||||
|
||||
err = report_aggregated_rtt_map(maps->map_v6_fd, AF_INET6,
|
||||
agg_conf->ipv6_prefix_len, t, agg_conf);
|
||||
return err;
|
||||
}
|
||||
|
||||
/*
|
||||
* Sets only the necessary programs in the object file to autoload.
|
||||
*
|
||||
@ -1164,6 +1463,98 @@ static int handle_pipefd(int pipe_rfd)
|
||||
return PPING_ABORT;
|
||||
}
|
||||
|
||||
int fetch_aggregation_map_fds(struct bpf_object *obj,
|
||||
struct aggregation_maps *maps)
|
||||
{
|
||||
maps->map_v4_fd = bpf_object__find_map_fd_by_name(obj, "map_v4_agg");
|
||||
maps->map_v6_fd = bpf_object__find_map_fd_by_name(obj, "map_v6_agg");
|
||||
|
||||
if (maps->map_v4_fd < 0 || maps->map_v6_fd < 0) {
|
||||
fprintf(stderr, "Unable to find aggregation maps (%d/%d).\n",
|
||||
maps->map_v4_fd, maps->map_v6_fd);
|
||||
return -ENOENT;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int setup_timer(__u64 init_delay_ns, __u64 interval_ns)
|
||||
{
|
||||
struct itimerspec timercfg = {
|
||||
.it_value = { .tv_sec = init_delay_ns / NS_PER_SECOND,
|
||||
.tv_nsec = init_delay_ns % NS_PER_SECOND },
|
||||
.it_interval = { .tv_sec = interval_ns / NS_PER_SECOND,
|
||||
.tv_nsec = interval_ns % NS_PER_SECOND }
|
||||
};
|
||||
int fd, err;
|
||||
|
||||
fd = timerfd_create(CLOCK_MONOTONIC, 0);
|
||||
if (fd < 0) {
|
||||
return -errno;
|
||||
}
|
||||
|
||||
err = timerfd_settime(fd, 0, &timercfg, NULL);
|
||||
if (err) {
|
||||
err = -errno;
|
||||
close(fd);
|
||||
return err;
|
||||
}
|
||||
|
||||
return fd;
|
||||
}
|
||||
|
||||
static int init_aggregation_timer(struct bpf_object *obj,
|
||||
struct pping_config *config)
|
||||
{
|
||||
int err, fd;
|
||||
|
||||
err = fetch_aggregation_map_fds(obj, &config->agg_maps);
|
||||
if (err) {
|
||||
fprintf(stderr, "Failed fetching aggregation maps: %s\n",
|
||||
get_libbpf_strerror(err));
|
||||
return err;
|
||||
}
|
||||
|
||||
fd = setup_timer(config->agg_conf.aggregation_interval,
|
||||
config->agg_conf.aggregation_interval);
|
||||
if (fd < 0) {
|
||||
fprintf(stderr,
|
||||
"Failed creating timer for periodic aggregation: %s\n",
|
||||
get_libbpf_strerror(fd));
|
||||
return fd;
|
||||
}
|
||||
|
||||
return fd;
|
||||
}
|
||||
|
||||
static int handle_aggregation_timer(int timer_fd, struct aggregation_maps *maps,
|
||||
struct aggregation_config *agg_conf)
|
||||
{
|
||||
__u64 timer_exps;
|
||||
int ret, err;
|
||||
|
||||
ret = read(timer_fd, &timer_exps, sizeof(timer_exps));
|
||||
if (ret != sizeof(timer_exps)) {
|
||||
fprintf(stderr, "Failed reading timerfd\n");
|
||||
return -EBADFD;
|
||||
}
|
||||
|
||||
if (timer_exps > 1) {
|
||||
fprintf(stderr,
|
||||
"Warning - missed %llu aggregation timer expirations\n",
|
||||
timer_exps - 1);
|
||||
}
|
||||
|
||||
err = report_aggregated_rtts(maps, agg_conf);
|
||||
if (err) {
|
||||
fprintf(stderr, "Failed reporting aggregated RTTs: %s\n",
|
||||
get_libbpf_strerror(err));
|
||||
return err;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int epoll_add_event_type(int epfd, int fd, __u64 event_type, __u64 value)
|
||||
{
|
||||
struct epoll_event ev = {
|
||||
@ -1197,7 +1588,7 @@ static int epoll_add_perf_buffer(int epfd, struct perf_buffer *pb)
|
||||
}
|
||||
|
||||
static int epoll_add_events(int epfd, struct perf_buffer *pb, int sigfd,
|
||||
int pipe_rfd)
|
||||
int pipe_rfd, int aggfd)
|
||||
{
|
||||
int err;
|
||||
|
||||
@ -1225,6 +1616,17 @@ static int epoll_add_events(int epfd, struct perf_buffer *pb, int sigfd,
|
||||
return err;
|
||||
}
|
||||
|
||||
if (aggfd >= 0) {
|
||||
err = epoll_add_event_type(epfd, aggfd,
|
||||
PPING_EPEVENT_TYPE_AGGTIMER, aggfd);
|
||||
if (err) {
|
||||
fprintf(stderr,
|
||||
"Failed adding aggregation timerfd to epoll instance: %s\n",
|
||||
get_libbpf_strerror(err));
|
||||
return err;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1247,6 +1649,11 @@ static int epoll_poll_events(int epfd, struct pping_config *config,
|
||||
err = perf_buffer__consume_buffer(
|
||||
pb, events[i].data.u64 & PPING_EPEVENT_MASK);
|
||||
break;
|
||||
case PPING_EPEVENT_TYPE_AGGTIMER:
|
||||
err = handle_aggregation_timer(
|
||||
events[i].data.u64 & PPING_EPEVENT_MASK,
|
||||
&config->agg_maps, &config->agg_conf);
|
||||
break;
|
||||
case PPING_EPEVENT_TYPE_SIGNAL:
|
||||
err = handle_signalfd(events[i].data.u64 &
|
||||
PPING_EPEVENT_MASK);
|
||||
@ -1273,7 +1680,7 @@ int main(int argc, char *argv[])
|
||||
void *thread_err;
|
||||
struct bpf_object *obj = NULL;
|
||||
struct perf_buffer *pb = NULL;
|
||||
int epfd, sigfd;
|
||||
int epfd, sigfd, aggfd;
|
||||
|
||||
DECLARE_LIBBPF_OPTS(bpf_tc_opts, tc_ingress_opts);
|
||||
DECLARE_LIBBPF_OPTS(bpf_tc_opts, tc_egress_opts);
|
||||
@ -1284,6 +1691,11 @@ int main(int argc, char *argv[])
|
||||
.use_srtt = false },
|
||||
.clean_args = { .cleanup_interval = 1 * NS_PER_SECOND,
|
||||
.valid_thread = false },
|
||||
.agg_conf = { .aggregation_interval = 1 * NS_PER_SECOND,
|
||||
.ipv4_prefix_len = 24,
|
||||
.ipv6_prefix_len = 48,
|
||||
.n_bins = RTT_AGG_NR_BINS,
|
||||
.bin_width = RTT_AGG_BIN_WIDTH },
|
||||
.object_path = "pping_kern.o",
|
||||
.ingress_prog = PROG_INGRESS_TC,
|
||||
.egress_prog = PROG_EGRESS_TC,
|
||||
@ -1344,6 +1756,14 @@ int main(int argc, char *argv[])
|
||||
output_format_to_str(config.output_format),
|
||||
tracked_protocols_to_str(&config), config.ifname);
|
||||
|
||||
if (config.bpf_config.agg_rtts)
|
||||
fprintf(stderr,
|
||||
"Aggregating RTTs in histograms with %llu %.6g ms wide bins every %.9g seconds\n",
|
||||
config.agg_conf.n_bins,
|
||||
(double)config.agg_conf.bin_width / NS_PER_MS,
|
||||
(double)config.agg_conf.aggregation_interval /
|
||||
NS_PER_SECOND);
|
||||
|
||||
// Setup signalhandling (allow graceful shutdown on SIGINT/SIGTERM)
|
||||
sigfd = init_signalfd();
|
||||
if (sigfd < 0) {
|
||||
@ -1374,14 +1794,26 @@ int main(int argc, char *argv[])
|
||||
goto cleanup_mapcleaning;
|
||||
}
|
||||
|
||||
if (config.bpf_config.agg_rtts) {
|
||||
aggfd = init_aggregation_timer(obj, &config);
|
||||
if (aggfd < 0) {
|
||||
fprintf(stderr,
|
||||
"Failed setting up aggregation timerfd: %s\n",
|
||||
get_libbpf_strerror(aggfd));
|
||||
goto cleanup_perf_buffer;
|
||||
}
|
||||
} else {
|
||||
aggfd = -1;
|
||||
}
|
||||
|
||||
epfd = epoll_create1(EPOLL_CLOEXEC);
|
||||
if (epfd < 0) {
|
||||
fprintf(stderr, "Failed creating epoll instance: %s\n",
|
||||
get_libbpf_strerror(err));
|
||||
goto cleanup_perf_buffer;
|
||||
goto cleanup_aggfd;
|
||||
}
|
||||
|
||||
err = epoll_add_events(epfd, pb, sigfd, config.clean_args.pipe_rfd);
|
||||
err = epoll_add_events(epfd, pb, sigfd, config.clean_args.pipe_rfd, aggfd);
|
||||
if (err) {
|
||||
fprintf(stderr, "Failed adding events to epoll instace: %s\n",
|
||||
get_libbpf_strerror(err));
|
||||
@ -1410,6 +1842,10 @@ int main(int argc, char *argv[])
|
||||
cleanup_epfd:
|
||||
close(epfd);
|
||||
|
||||
cleanup_aggfd:
|
||||
if (aggfd >= 0)
|
||||
close(aggfd);
|
||||
|
||||
cleanup_perf_buffer:
|
||||
perf_buffer__free(pb);
|
||||
|
||||
|
@ -22,6 +22,9 @@ typedef __u64 fixpoint64;
|
||||
#define EVENT_TYPE_MAP_FULL 3
|
||||
#define EVENT_TYPE_MAP_CLEAN 4
|
||||
|
||||
#define RTT_AGG_NR_BINS 1000UL
|
||||
#define RTT_AGG_BIN_WIDTH (1 * NS_PER_MS) // 1 ms
|
||||
|
||||
enum __attribute__((__packed__)) flow_event_type {
|
||||
FLOW_EVENT_NONE,
|
||||
FLOW_EVENT_OPENING,
|
||||
@ -60,12 +63,23 @@ enum __attribute__((__packed__)) connection_state {
|
||||
struct bpf_config {
|
||||
__u64 rate_limit;
|
||||
fixpoint64 rtt_rate;
|
||||
__u64 ipv6_prefix_mask;
|
||||
__u32 ipv4_prefix_mask;
|
||||
bool use_srtt;
|
||||
bool track_tcp;
|
||||
bool track_icmp;
|
||||
bool localfilt;
|
||||
bool skip_syn;
|
||||
__u8 reserved[3];
|
||||
bool push_individual_events;
|
||||
bool agg_rtts;
|
||||
__u8 reserved;
|
||||
};
|
||||
|
||||
struct ipprefix_key {
|
||||
union {
|
||||
__u32 v4;
|
||||
__u64 v6;
|
||||
};
|
||||
};
|
||||
|
||||
/*
|
||||
@ -81,9 +95,9 @@ struct flow_address {
|
||||
|
||||
/*
|
||||
* Struct to hold a full network tuple
|
||||
* The ipv member is technically not necessary, but makes it easier to
|
||||
* The ipv member is technically not necessary, but makes it easier to
|
||||
* determine if saddr/daddr are IPv4 or IPv6 address (don't need to look at the
|
||||
* first 12 bytes of address). The proto memeber is not currently used, but
|
||||
* first 12 bytes of address). The proto memeber is not currently used, but
|
||||
* could be useful once pping is extended to work for other protocols than TCP.
|
||||
*/
|
||||
struct network_tuple {
|
||||
@ -210,4 +224,10 @@ union pping_event {
|
||||
struct map_clean_event map_clean_event;
|
||||
};
|
||||
|
||||
struct aggregated_rtt_stats {
|
||||
__u64 min;
|
||||
__u64 max;
|
||||
__u64 bins[RTT_AGG_NR_BINS];
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <linux/icmp.h>
|
||||
#include <linux/icmpv6.h>
|
||||
#include <stdbool.h>
|
||||
#include <errno.h>
|
||||
|
||||
// overwrite xdp/parsing_helpers.h value to avoid hitting verifier limit
|
||||
#ifdef IPV6_EXT_MAX_CHAIN
|
||||
@ -45,6 +46,7 @@
|
||||
|
||||
#define MAP_TIMESTAMP_SIZE 131072UL // 2^17, Maximum number of in-flight/unmatched timestamps we can keep track of
|
||||
#define MAP_FLOWSTATE_SIZE 131072UL // 2^17, Maximum number of concurrent flows that can be tracked
|
||||
#define MAP_AGGREGATION_SIZE 16384UL // 2^14, Maximum number of different IP-prefixes we can aggregate stats for
|
||||
|
||||
#define MAX_MEMCMP_SIZE 128
|
||||
|
||||
@ -128,6 +130,11 @@ char _license[] SEC("license") = "GPL";
|
||||
static volatile const struct bpf_config config = {};
|
||||
static volatile __u64 last_warn_time[2] = { 0 };
|
||||
|
||||
// Keep an empty aggregated_rtt_stats as a global variable to use as a template
|
||||
// when creating new entries. That way, it won't have to be allocated on stack
|
||||
// (where it won't fit anyways) and initialized each time during run time.
|
||||
static struct aggregated_rtt_stats empty_stats = { 0 };
|
||||
|
||||
|
||||
// Map definitions
|
||||
struct {
|
||||
@ -150,6 +157,22 @@ struct {
|
||||
__uint(value_size, sizeof(__u32));
|
||||
} events SEC(".maps");
|
||||
|
||||
struct {
|
||||
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
|
||||
__type(key, __u32);
|
||||
__type(value, struct aggregated_rtt_stats);
|
||||
__uint(max_entries, MAP_AGGREGATION_SIZE);
|
||||
__uint(map_flags, BPF_F_NO_PREALLOC);
|
||||
} map_v4_agg SEC(".maps");
|
||||
|
||||
struct {
|
||||
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
|
||||
__type(key, __u64);
|
||||
__type(value, struct aggregated_rtt_stats);
|
||||
__uint(max_entries, MAP_AGGREGATION_SIZE);
|
||||
__uint(map_flags, BPF_F_NO_PREALLOC);
|
||||
} map_v6_agg SEC(".maps");
|
||||
|
||||
|
||||
// Help functions
|
||||
|
||||
@ -640,6 +663,9 @@ static bool is_rate_limited(__u64 now, __u64 last_ts, __u64 rtt)
|
||||
static void send_flow_open_event(void *ctx, struct packet_info *p_info,
|
||||
struct flow_state *rev_flow)
|
||||
{
|
||||
if (!config.push_individual_events)
|
||||
return;
|
||||
|
||||
struct flow_event fe = {
|
||||
.event_type = EVENT_TYPE_FLOW,
|
||||
.flow_event_type = FLOW_EVENT_OPENING,
|
||||
@ -663,6 +689,9 @@ static void send_flow_open_event(void *ctx, struct packet_info *p_info,
|
||||
static void send_flow_event(void *ctx, struct packet_info *p_info,
|
||||
bool rev_flow)
|
||||
{
|
||||
if (!config.push_individual_events)
|
||||
return;
|
||||
|
||||
struct flow_event fe = {
|
||||
.event_type = EVENT_TYPE_FLOW,
|
||||
.flow_event_type = p_info->event_type,
|
||||
@ -707,8 +736,11 @@ static void send_map_full_event(void *ctx, struct packet_info *p_info,
|
||||
}
|
||||
|
||||
static void send_rtt_event(void *ctx, __u64 rtt, struct flow_state *f_state,
|
||||
struct packet_info *p_info)
|
||||
struct packet_info *p_info)
|
||||
{
|
||||
if (!config.push_individual_events)
|
||||
return;
|
||||
|
||||
struct rtt_event re = {
|
||||
.event_type = EVENT_TYPE_RTT,
|
||||
.timestamp = p_info->time,
|
||||
@ -926,6 +958,68 @@ static bool is_new_identifier(struct packet_id *pid, struct flow_state *f_state)
|
||||
return pid->identifier != f_state->last_id;
|
||||
}
|
||||
|
||||
static void create_ipprefix_key_v4(__u32 *prefix_key, struct in6_addr *ip)
|
||||
{
|
||||
*prefix_key = ip->s6_addr32[3] & config.ipv4_prefix_mask;
|
||||
}
|
||||
|
||||
static void create_ipprefix_key_v6(__u64 *prefix_key, struct in6_addr *ip)
|
||||
{
|
||||
*prefix_key = *(__u64 *)&ip->in6_u & config.ipv6_prefix_mask;
|
||||
// *prefix_key = *(__u64 *)ip & config.ipv6_prefix_mask; // gives verifier rejection "misaligned stack access off"
|
||||
}
|
||||
|
||||
static struct aggregated_rtt_stats *
|
||||
lookup_or_create_aggregation_stats(struct in6_addr *ip, __u8 ipv)
|
||||
{
|
||||
struct aggregated_rtt_stats *agg;
|
||||
struct ipprefix_key key;
|
||||
void *agg_map;
|
||||
int err;
|
||||
|
||||
if (ipv == AF_INET) {
|
||||
create_ipprefix_key_v4(&key.v4, ip);
|
||||
agg_map = &map_v4_agg;
|
||||
|
||||
} else {
|
||||
create_ipprefix_key_v6(&key.v6, ip);
|
||||
agg_map = &map_v6_agg;
|
||||
}
|
||||
|
||||
agg = bpf_map_lookup_elem(agg_map, &key);
|
||||
if (agg)
|
||||
return agg;
|
||||
|
||||
// No existing entry, try to create new one
|
||||
err = bpf_map_update_elem(agg_map, &key, &empty_stats, BPF_NOEXIST);
|
||||
if (err && err != -EEXIST)
|
||||
return NULL;
|
||||
|
||||
return bpf_map_lookup_elem(agg_map, &key);
|
||||
}
|
||||
|
||||
static void aggregate_rtt(__u64 rtt, struct in6_addr *ip, __u8 ipv)
|
||||
{
|
||||
if (!config.agg_rtts)
|
||||
return;
|
||||
|
||||
struct aggregated_rtt_stats *rtt_agg;
|
||||
int bin_idx;
|
||||
|
||||
rtt_agg = lookup_or_create_aggregation_stats(ip, ipv);
|
||||
if (!rtt_agg)
|
||||
return;
|
||||
|
||||
if (!rtt_agg->min || rtt < rtt_agg->min)
|
||||
rtt_agg->min = rtt;
|
||||
if (rtt > rtt_agg->max)
|
||||
rtt_agg->max = rtt;
|
||||
|
||||
bin_idx = rtt / RTT_AGG_BIN_WIDTH;
|
||||
bin_idx = bin_idx >= RTT_AGG_NR_BINS ? RTT_AGG_NR_BINS - 1 : bin_idx;
|
||||
rtt_agg->bins[bin_idx]++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Attempt to create a timestamp-entry for packet p_info for flow in f_state
|
||||
*/
|
||||
@ -999,6 +1093,7 @@ static void pping_match_packet(struct flow_state *f_state, void *ctx,
|
||||
f_state->srtt = calculate_srtt(f_state->srtt, rtt);
|
||||
|
||||
send_rtt_event(ctx, rtt, f_state, p_info);
|
||||
aggregate_rtt(rtt, &p_info->pid.flow.saddr.ip, p_info->pid.flow.ipv);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1090,6 +1185,9 @@ static bool is_flow_old(struct network_tuple *flow, struct flow_state *f_state,
|
||||
static void send_flow_timeout_message(void *ctx, struct network_tuple *flow,
|
||||
__u64 time)
|
||||
{
|
||||
if (!config.push_individual_events)
|
||||
return;
|
||||
|
||||
struct flow_event fe = {
|
||||
.event_type = EVENT_TYPE_FLOW,
|
||||
.flow_event_type = FLOW_EVENT_CLOSING,
|
||||
|
Reference in New Issue
Block a user