diff --git a/pping/pping.c b/pping/pping.c index 897db5b..d04cd43 100644 --- a/pping/pping.c +++ b/pping/pping.c @@ -121,6 +121,12 @@ struct aggregation_maps { int map_active_fd; int map_v4_fd[2]; int map_v6_fd[2]; + int map_globcnt_fd; +}; + +struct aggregation_context { + struct aggregation_maps maps; + struct global_counters prev_counters; }; // Store configuration values in struct to easily pass around @@ -130,7 +136,7 @@ struct pping_config { struct bpf_tc_opts tc_egress_opts; struct map_cleanup_args clean_args; struct aggregation_config agg_conf; - struct aggregation_maps agg_maps; + struct aggregation_context agg_ctx; struct output_context *out_ctx; char *object_path; char *ingress_prog; @@ -1136,6 +1142,233 @@ static void handle_missed_events(void *ctx, int cpu, __u64 lost_cnt) fprintf(stderr, "Lost %llu events on CPU %d\n", lost_cnt, cpu); } +static void print_counter_standard(FILE *stream, const char *name, __u64 val, + bool *first) +{ + if (val == 0) + return; + + if (*first) + *first = false; + else + fprintf(stream, ", "); + + fprintf(stream, "%s=%llu", name, val); +} + +static void print_pktbytes_tuple_standard(FILE *stream, const char *tuple_name, + __u64 pkts, __u64 bytes, + bool *first_tuple) +{ + bool first = true; + + if (pkts == 0) + return; + + if (*first_tuple) + *first_tuple = false; + else + fprintf(stream, ", "); + + fprintf(stream, "%s=(", tuple_name); + print_counter_standard(stream, "pkts", pkts, &first); + print_counter_standard(stream, "bytes", bytes, &first); + fprintf(stream, ")"); +} + +static void +print_globalcounters_standard(FILE *stream, __u64 t_monotonic, + const struct global_counters *counters) +{ + char protostr[16]; + bool first = true; + int proto; + + print_ns_datetime(stream, t_monotonic); + fprintf(stream, ": "); + + print_pktbytes_tuple_standard(stream, "non-IP", counters->nonip_pkts, + counters->nonip_bytes, &first); + print_pktbytes_tuple_standard(stream, "TCP", counters->tcp_pkts, + counters->tcp_bytes, &first); + print_pktbytes_tuple_standard(stream, "UDP", counters->udp_pkts, + counters->udp_bytes, &first); + print_pktbytes_tuple_standard(stream, "ICMP", counters->icmp_pkts, + counters->icmp_bytes, &first); + print_pktbytes_tuple_standard(stream, "ICMPv6", counters->icmp6_pkts, + counters->icmp6_bytes, &first); + + for (proto = 0; proto < N_IPPROTOS; proto++) { + if (counters->other_ipprotos[proto] > 0) { // Check to avoid uncecessary ipproto_to_str + ipproto_to_str(protostr, sizeof(protostr), proto); + print_pktbytes_tuple_standard( + stream, protostr, + counters->other_ipprotos[proto], 0, &first); + } + } + + if (first) // Global counters are empty + fprintf(stream, "pkts=0, bytes=0"); + + fprintf(stream, "\n"); +} + +static void print_counter_json(json_writer_t *jctx, const char *name, __u64 val) +{ + if (val > 0) + jsonw_u64_field(jctx, name, val); +} + +static void print_pktbytes_tuple_json(json_writer_t *jctx, + const char *tuple_name, __u64 pkts, + __u64 bytes) +{ + // Don't include empty pkt/byte tuples + if (pkts == 0) + return; + + jsonw_name(jctx, tuple_name); + jsonw_start_object(jctx); + print_counter_json(jctx, "packets", pkts); + print_counter_json(jctx, "bytes", bytes); + jsonw_end_object(jctx); +} + +static void print_globalcounters_json(json_writer_t *jctx, __u64 t_monotonic, + const struct global_counters *counters) +{ + char protostr[16]; + int proto; + + jsonw_start_object(jctx); + jsonw_u64_field(jctx, "timestamp", + convert_monotonic_to_realtime(t_monotonic)); + + jsonw_name(jctx, "protocol_counters"); + jsonw_start_object(jctx); + + print_pktbytes_tuple_json(jctx, "non_IP", counters->nonip_pkts, + counters->nonip_bytes); + print_pktbytes_tuple_json(jctx, "TCP", counters->tcp_pkts, + counters->tcp_bytes); + print_pktbytes_tuple_json(jctx, "UDP", counters->udp_pkts, + counters->udp_bytes); + print_pktbytes_tuple_json(jctx, "ICMP", counters->icmp_pkts, + counters->icmp_bytes); + print_pktbytes_tuple_json(jctx, "ICMPv6", counters->icmp6_pkts, + counters->icmp6_bytes); + + for (proto = 0; proto < N_IPPROTOS; proto++) { + if (counters->other_ipprotos[proto] > 0) { + ipproto_to_str(protostr, sizeof(protostr), proto); + print_pktbytes_tuple_json( + jctx, protostr, counters->other_ipprotos[proto], + 0); + } + } + + jsonw_end_object(jctx); + + jsonw_end_object(jctx); +} + +static void print_globalcounters(struct output_context *out_ctx, + __u64 t_monotonic, + const struct global_counters *counters) +{ + if (out_ctx->format == PPING_OUTPUT_STANDARD) + print_globalcounters_standard(out_ctx->stream, t_monotonic, + counters); + else if (out_ctx->jctx) + print_globalcounters_json(out_ctx->jctx, t_monotonic, counters); +} + +static void update_globalcounters(struct global_counters *to, + const struct global_counters *from) +{ + int proto; + + to->nonip_pkts += from->nonip_pkts; + to->nonip_bytes += from->nonip_bytes; + to->tcp_pkts += from->tcp_pkts; + to->tcp_bytes += from->tcp_bytes; + to->udp_pkts += from->udp_pkts; + to->udp_bytes += from->udp_bytes; + to->icmp_pkts += from->icmp_pkts; + to->icmp_bytes += from->icmp_bytes; + to->icmp6_pkts += from->icmp6_pkts; + to->icmp6_bytes += from->icmp6_bytes; + + for (proto = 0; proto < N_IPPROTOS; proto++) { + to->other_ipprotos[proto] += from->other_ipprotos[proto]; + } +} + +static void merge_percpu_globalcounters(struct global_counters *merged, + const struct global_counters *percpu, + int n_cpus) +{ + int cpu; + + memset(merged, 0, sizeof(*merged)); + + for (cpu = 0; cpu < n_cpus; cpu++) { + update_globalcounters(merged, &percpu[cpu]); + } +} + +static void diff_globalcounters(struct global_counters *diff, + const struct global_counters *prev, + const struct global_counters *next) +{ + int proto; + + diff->nonip_pkts = next->nonip_pkts - prev->nonip_pkts; + diff->nonip_bytes = next->nonip_bytes - prev->nonip_bytes; + diff->tcp_pkts = next->tcp_pkts - prev->tcp_pkts; + diff->tcp_bytes = next->tcp_bytes - prev->tcp_bytes; + diff->udp_pkts = next->udp_pkts - prev->udp_pkts; + diff->udp_bytes = next->udp_bytes - prev->udp_bytes; + diff->icmp_pkts = next->icmp_pkts - prev->icmp_pkts; + diff->icmp_bytes = next->icmp_bytes - prev->icmp_bytes; + diff->icmp6_pkts = next->icmp6_pkts - prev->icmp6_pkts; + diff->icmp6_bytes = next->icmp6_bytes - prev->icmp6_bytes; + + for (proto = 0; proto < N_IPPROTOS; proto++) { + diff->other_ipprotos[proto] = next->other_ipprotos[proto] - + prev->other_ipprotos[proto]; + } +} + +static int report_globalcounters(struct output_context *out_ctx, + struct aggregation_context *agg_ctx) +{ + int n_cpus = libbpf_num_possible_cpus(); + __u64 t = get_time_ns(CLOCK_MONOTONIC); + struct global_counters tot_cnt, diff; + struct global_counters *cpu_cnt; + __u32 key = 0; + int err; + + cpu_cnt = calloc(n_cpus, sizeof(*cpu_cnt)); + if (!cpu_cnt) + return -errno; + + err = bpf_map_lookup_elem(agg_ctx->maps.map_globcnt_fd, &key, cpu_cnt); + if (err) + goto exit; + + merge_percpu_globalcounters(&tot_cnt, cpu_cnt, n_cpus); + diff_globalcounters(&diff, &agg_ctx->prev_counters, &tot_cnt); + agg_ctx->prev_counters = tot_cnt; + + print_globalcounters(out_ctx, t, &diff); + +exit: + free(cpu_cnt); + return err; +} + static __u64 sum_trafficcounts_pkts(const struct traffic_counters *counters) { return counters->tcp_ts_pkts + counters->tcp_nots_pkts + @@ -1298,21 +1531,6 @@ exit: fprintf(stream, "\n"); } -static void print_pktbytes_tuple_json(json_writer_t *jctx, - const char *tuple_name, __u64 pkts, - __u64 bytes) -{ - // Don't include empty pkt/byte tuples - if (pkts == 0) - return; - - jsonw_name(jctx, tuple_name); - jsonw_start_object(jctx); - jsonw_u64_field(jctx, "packets", pkts); - jsonw_u64_field(jctx, "bytes", bytes); - jsonw_end_object(jctx); -} - static void print_trafficcount_json(json_writer_t *jctx, const struct traffic_counters *counters) { @@ -1535,25 +1753,31 @@ exit: } static int report_aggregated_stats(struct output_context *out_ctx, - struct aggregation_maps *maps, + struct aggregation_context *agg_ctx, struct aggregation_config *agg_conf) { __u64 t = get_time_ns(CLOCK_MONOTONIC); int err, map_idx; - map_idx = switch_agg_map(maps->map_active_fd); + map_idx = switch_agg_map(agg_ctx->maps.map_active_fd); if (map_idx < 0) return map_idx; - err = report_aggregated_stats_map(out_ctx, maps->map_v4_fd[map_idx], + err = report_aggregated_stats_map(out_ctx, + agg_ctx->maps.map_v4_fd[map_idx], AF_INET, agg_conf->ipv4_prefix_len, t, agg_conf); if (err) return err; - err = report_aggregated_stats_map(out_ctx, maps->map_v6_fd[map_idx], + err = report_aggregated_stats_map(out_ctx, + agg_ctx->maps.map_v6_fd[map_idx], AF_INET6, agg_conf->ipv6_prefix_len, t, agg_conf); + if (err) + return err; + + err = report_globalcounters(out_ctx, agg_ctx); return err; } @@ -1921,15 +2145,17 @@ int fetch_aggregation_map_fds(struct bpf_object *obj, bpf_object__find_map_fd_by_name(obj, "map_v6_agg1"); maps->map_v6_fd[1] = bpf_object__find_map_fd_by_name(obj, "map_v6_agg2"); + maps->map_globcnt_fd = + bpf_object__find_map_fd_by_name(obj, "map_global_counters"); if (maps->map_active_fd < 0 || maps->map_v4_fd[0] < 0 || maps->map_v4_fd[1] < 0 || maps->map_v6_fd[0] < 0 || - maps->map_v6_fd[1] < 0) { + maps->map_v6_fd[1] < 0 || maps->map_globcnt_fd < 0) { fprintf(stderr, - "Unable to find aggregation maps (%d/%d/%d/%d/%d).\n", + "Unable to find aggregation maps (%d/%d/%d/%d/%d/%d).\n", maps->map_active_fd, maps->map_v4_fd[0], maps->map_v4_fd[1], maps->map_v6_fd[0], - maps->map_v6_fd[1]); + maps->map_v6_fd[1], maps->map_globcnt_fd); return -ENOENT; } @@ -2006,14 +2232,17 @@ static int init_aggregation_timer(struct bpf_object *obj, { int err, fd; - err = fetch_aggregation_map_fds(obj, &config->agg_maps); + memset(&config->agg_ctx.prev_counters, 0, + sizeof(config->agg_ctx.prev_counters)); + + err = fetch_aggregation_map_fds(obj, &config->agg_ctx.maps); if (err) { fprintf(stderr, "Failed fetching aggregation maps: %s\n", get_libbpf_strerror(err)); return err; } - err = init_agg_backup_entries(&config->agg_maps, + err = init_agg_backup_entries(&config->agg_ctx.maps, config->agg_conf.ipv4_prefix_len > 0, config->agg_conf.ipv6_prefix_len > 0); if (err) { @@ -2037,7 +2266,7 @@ static int init_aggregation_timer(struct bpf_object *obj, static int handle_aggregation_timer(int timer_fd, struct output_context *out_ctx, - struct aggregation_maps *maps, + struct aggregation_context *agg_ctx, struct aggregation_config *agg_conf) { __u64 timer_exps; @@ -2055,7 +2284,7 @@ static int handle_aggregation_timer(int timer_fd, timer_exps - 1); } - err = report_aggregated_stats(out_ctx, maps, agg_conf); + err = report_aggregated_stats(out_ctx, agg_ctx, agg_conf); if (err) { fprintf(stderr, "Failed reporting aggregated RTTs: %s\n", get_libbpf_strerror(err)); @@ -2162,7 +2391,7 @@ static int epoll_poll_events(int epfd, struct pping_config *config, case PPING_EPEVENT_TYPE_AGGTIMER: err = handle_aggregation_timer( events[i].data.u64 & PPING_EPEVENT_MASK, - config->out_ctx, &config->agg_maps, + config->out_ctx, &config->agg_ctx, &config->agg_conf); break; case PPING_EPEVENT_TYPE_SIGNAL: diff --git a/pping/pping.h b/pping/pping.h index e0579c6..f979826 100644 --- a/pping/pping.h +++ b/pping/pping.h @@ -30,6 +30,8 @@ typedef __u64 fixpoint64; #define RTT_AGG_NR_BINS 250UL #define RTT_AGG_BIN_WIDTH (4 * NS_PER_MS) +#define N_IPPROTOS 256 + /* Special IPv4/IPv6 prefixes used for backup entries * To avoid them colliding with and actual traffic (causing the traffic to end * up in the backup entry), use prefixes from blocks reserved for documentation. @@ -263,4 +265,18 @@ struct aggregated_stats { __u32 rtt_bins[RTT_AGG_NR_BINS]; }; +struct global_counters { + __u64 nonip_pkts; + __u64 nonip_bytes; + __u64 tcp_pkts; + __u64 tcp_bytes; + __u64 udp_pkts; + __u64 udp_bytes; + __u64 icmp_pkts; + __u64 icmp_bytes; + __u64 icmp6_pkts; + __u64 icmp6_bytes; + __u32 other_ipprotos[N_IPPROTOS]; +}; + #endif diff --git a/pping/pping_kern.c b/pping/pping_kern.c index 72bce55..1066868 100644 --- a/pping/pping_kern.c +++ b/pping/pping_kern.c @@ -190,6 +190,13 @@ struct { __uint(max_entries, 1); } map_active_agg_instance SEC(".maps"); +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __type(key, __u32); + __type(value, struct global_counters); + __uint(max_entries, 1); +} map_global_counters SEC(".maps"); + struct { __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); __type(key, __u32); @@ -314,6 +321,44 @@ get_dualflow_key_from_packet(struct packet_info *p_info) &p_info->reply_pid.flow; } +static void update_global_counters(__u8 ipproto, __u32 pkt_len) +{ + if (!config.agg_rtts) + return; + + struct global_counters *counters; + __u32 key = 0; + + counters = bpf_map_lookup_elem(&map_global_counters, &key); + if (!counters) // Should never happen + return; + + switch (ipproto) { + case 0: // Used to represent non-IP instead of IPv6 hop-by-hop + counters->nonip_pkts++; + counters->nonip_bytes += pkt_len; + break; + case IPPROTO_TCP: + counters->tcp_pkts++; + counters->tcp_bytes += pkt_len; + break; + case IPPROTO_UDP: + counters->udp_pkts++; + counters->udp_bytes += pkt_len; + break; + case IPPROTO_ICMP: + counters->icmp_pkts++; + counters->icmp_bytes += pkt_len; + break; + case IPPROTO_ICMPV6: + counters->icmp6_pkts++; + counters->icmp6_bytes += pkt_len; + break; + default: + counters->other_ipprotos[ipproto]++; + } +} + /* * Parses the TSval and TSecr values from the TCP options field. If sucessful * the TSval and TSecr values will be stored at tsval and tsecr (in network @@ -556,10 +601,10 @@ static int parse_packet_identifier(struct parsing_context *pctx, p_info->pid.flow.ipv = AF_INET6; proto = parse_ip6hdr(&pctx->nh, pctx->data_end, &iph_ptr.ip6h); } else { - return -1; + goto err_not_ip; } if (proto < 0) - return -1; + goto err_not_ip; // IP-header was parsed sucessfully, fill in IP address p_info->pid.flow.proto = proto; @@ -577,6 +622,7 @@ static int parse_packet_identifier(struct parsing_context *pctx, p_info->ip_tos.ipv6_tos = *(__be32 *)iph_ptr.ip6h & IPV6_FLOWINFO_MASK; } + update_global_counters(proto, p_info->pkt_len); // Parse identifer from suitable protocol err = -1; @@ -621,6 +667,10 @@ static int parse_packet_identifier(struct parsing_context *pctx, } return 0; + +err_not_ip: + update_global_counters(0, p_info->pkt_len); + return -1; } /*