diff --git a/pping/pping.c b/pping/pping.c index 0dd59f7..1742c4f 100644 --- a/pping/pping.c +++ b/pping/pping.c @@ -103,6 +103,12 @@ struct map_cleanup_args { bool valid_thread; }; +struct output_context { + FILE *stream; + json_writer_t *jctx; + enum pping_output_format format; +}; + struct aggregation_config { __u64 aggregation_interval; __u64 timeout_interval; @@ -110,7 +116,6 @@ struct aggregation_config { __u64 bin_width; __u8 ipv4_prefix_len; __u8 ipv6_prefix_len; - enum pping_output_format format; }; struct aggregation_maps { @@ -124,6 +129,7 @@ struct pping_config { struct bpf_config bpf_config; struct bpf_tc_opts tc_ingress_opts; struct bpf_tc_opts tc_egress_opts; + struct output_context out_ctx; struct map_cleanup_args clean_args; struct aggregation_config agg_conf; struct aggregation_maps agg_maps; @@ -140,15 +146,11 @@ struct pping_config { int ingress_prog_id; int egress_prog_id; char ifname[IF_NAMESIZE]; - enum pping_output_format output_format; enum xdp_attach_mode xdp_mode; bool force; bool created_tc_hook; }; -static json_writer_t *json_ctx = NULL; -static void (*print_event_func)(const union pping_event *) = NULL; - static const struct option long_options[] = { { "help", no_argument, NULL, 'h' }, { "interface", required_argument, NULL, 'i' }, // Name of interface to run on @@ -338,11 +340,11 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config) break; case 'F': if (strcmp(optarg, "standard") == 0) { - config->output_format = PPING_OUTPUT_STANDARD; + config->out_ctx.format = PPING_OUTPUT_STANDARD; } else if (strcmp(optarg, "json") == 0) { - config->output_format = PPING_OUTPUT_JSON; + config->out_ctx.format = PPING_OUTPUT_JSON; } else if (strcmp(optarg, "ppviz") == 0) { - config->output_format = PPING_OUTPUT_PPVIZ; + config->out_ctx.format = PPING_OUTPUT_PPVIZ; } else { fprintf(stderr, "format must be \"standard\", \"json\" or \"ppviz\"\n"); @@ -449,8 +451,6 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config) htobe64(0xffffffffffffffffUL << (64 - config->agg_conf.ipv6_prefix_len)); - config->agg_conf.format = config->output_format; - return 0; } @@ -943,30 +943,28 @@ static void print_ns_datetime(FILE *stream, __u64 monotonic_ns) fprintf(stream, "%s.%09llu", timestr, ts % NS_PER_SECOND); } -static void print_event_standard(const union pping_event *e) +static void print_event_standard(FILE *stream, const union pping_event *e) { if (e->event_type == EVENT_TYPE_RTT) { - print_ns_datetime(stdout, e->rtt_event.timestamp); - printf(" %llu.%06llu ms %llu.%06llu ms %s ", - e->rtt_event.rtt / NS_PER_MS, - e->rtt_event.rtt % NS_PER_MS, - e->rtt_event.min_rtt / NS_PER_MS, - e->rtt_event.min_rtt % NS_PER_MS, - proto_to_str(e->rtt_event.flow.proto)); - print_flow_ppvizformat(stdout, &e->rtt_event.flow); - printf("\n"); + print_ns_datetime(stream, e->rtt_event.timestamp); + fprintf(stream, " %.6g ms %.6g ms %s ", + (double)e->rtt_event.rtt / NS_PER_MS, + (double)e->rtt_event.min_rtt / NS_PER_MS, + proto_to_str(e->rtt_event.flow.proto)); + print_flow_ppvizformat(stream, &e->rtt_event.flow); + fprintf(stream, "\n"); } else if (e->event_type == EVENT_TYPE_FLOW) { - print_ns_datetime(stdout, e->flow_event.timestamp); - printf(" %s ", proto_to_str(e->rtt_event.flow.proto)); - print_flow_ppvizformat(stdout, &e->flow_event.flow); - printf(" %s due to %s from %s\n", - flowevent_to_str(e->flow_event.flow_event_type), - eventreason_to_str(e->flow_event.reason), - eventsource_to_str(e->flow_event.source)); + print_ns_datetime(stream, e->flow_event.timestamp); + fprintf(stream, " %s ", proto_to_str(e->rtt_event.flow.proto)); + print_flow_ppvizformat(stream, &e->flow_event.flow); + fprintf(stream, " %s due to %s from %s\n", + flowevent_to_str(e->flow_event.flow_event_type), + eventreason_to_str(e->flow_event.reason), + eventsource_to_str(e->flow_event.source)); } } -static void print_event_ppviz(const union pping_event *e) +static void print_event_ppviz(FILE *stream, const union pping_event *e) { // ppviz format does not support flow events if (e->event_type != EVENT_TYPE_RTT) @@ -975,12 +973,11 @@ static void print_event_ppviz(const union pping_event *e) const struct rtt_event *re = &e->rtt_event; __u64 time = convert_monotonic_to_realtime(re->timestamp); - printf("%llu.%09llu %llu.%09llu %llu.%09llu ", time / NS_PER_SECOND, - time % NS_PER_SECOND, re->rtt / NS_PER_SECOND, - re->rtt % NS_PER_SECOND, re->min_rtt / NS_PER_SECOND, - re->min_rtt); - print_flow_ppvizformat(stdout, &re->flow); - printf("\n"); + fprintf(stream, "%.9g %.9g %.9g ", (double)time / NS_PER_SECOND, + (double)re->rtt / NS_PER_SECOND, + (double)re->min_rtt / NS_PER_SECOND); + print_flow_ppvizformat(stream, &re->flow); + fprintf(stream, "\n"); } static void print_common_fields_json(json_writer_t *ctx, @@ -1023,32 +1020,52 @@ static void print_flowevent_fields_json(json_writer_t *ctx, jsonw_string_field(ctx, "triggered_by", eventsource_to_str(fe->source)); } -static void print_event_json(const union pping_event *e) +static void print_event_json(json_writer_t *jctx, const union pping_event *e) { if (e->event_type != EVENT_TYPE_RTT && e->event_type != EVENT_TYPE_FLOW) return; - jsonw_start_object(json_ctx); - print_common_fields_json(json_ctx, e); + jsonw_start_object(jctx); + print_common_fields_json(jctx, e); if (e->event_type == EVENT_TYPE_RTT) - print_rttevent_fields_json(json_ctx, &e->rtt_event); + print_rttevent_fields_json(jctx, &e->rtt_event); else // flow-event - print_flowevent_fields_json(json_ctx, &e->flow_event); - jsonw_end_object(json_ctx); + print_flowevent_fields_json(jctx, &e->flow_event); + jsonw_end_object(jctx); } -static void warn_map_full(const struct map_full_event *e) +static void print_event(struct output_context *out_ctx, + const union pping_event *pe) { - print_ns_datetime(stderr, e->timestamp); - fprintf(stderr, " Warning: Unable to create %s entry for flow ", + if (!out_ctx->stream) + return; + + switch (out_ctx->format) { + case PPING_OUTPUT_STANDARD: + print_event_standard(out_ctx->stream, pe); + break; + case PPING_OUTPUT_JSON: + if (out_ctx->jctx) + print_event_json(out_ctx->jctx, pe); + break; + case PPING_OUTPUT_PPVIZ: + print_event_ppviz(out_ctx->stream, pe); + break; + } +} + +static void warn_map_full(FILE *stream, const struct map_full_event *e) +{ + print_ns_datetime(stream, e->timestamp); + fprintf(stream, " Warning: Unable to create %s entry for flow ", e->map == PPING_MAP_FLOWSTATE ? "flow" : "timestamp"); - print_flow_ppvizformat(stderr, &e->flow); - fprintf(stderr, "\n"); + print_flow_ppvizformat(stream, &e->flow); + fprintf(stream, "\n"); } -static void print_map_clean_info(const struct map_clean_event *e) +static void print_map_clean_info(FILE *stream, const struct map_clean_event *e) { - fprintf(stderr, + fprintf(stream, "%s: cycle: %u, entries: %u, time: %llu, timeout: %u, tot timeout: %llu, selfdel: %u, tot selfdel: %llu\n", e->map == PPING_MAP_PACKETTS ? "packet_ts" : "flow_state", e->clean_cycles, e->last_processed_entries, e->last_runtime, @@ -1058,6 +1075,7 @@ static void print_map_clean_info(const struct map_clean_event *e) static void handle_event(void *ctx, int cpu, void *data, __u32 data_size) { + struct output_context *out_ctx = ctx; const union pping_event *e = data; if (data_size < sizeof(e->event_type)) @@ -1065,14 +1083,14 @@ static void handle_event(void *ctx, int cpu, void *data, __u32 data_size) switch (e->event_type) { case EVENT_TYPE_MAP_FULL: - warn_map_full(&e->map_event); + warn_map_full(stderr, &e->map_event); break; case EVENT_TYPE_MAP_CLEAN: - print_map_clean_info(&e->map_clean_event); + print_map_clean_info(stderr, &e->map_clean_event); break; case EVENT_TYPE_RTT: case EVENT_TYPE_FLOW: - print_event_func(e); + print_event(out_ctx, e); break; default: fprintf(stderr, "Warning: Unknown event type %llu\n", @@ -1170,12 +1188,16 @@ static void print_aggmetadata_json(json_writer_t *ctx, jsonw_end_object(ctx); } -static void print_aggmetadata(struct aggregation_config *agg_conf) +static void print_aggmetadata(struct output_context *out_ctx, + struct aggregation_config *agg_conf) { - if (agg_conf->format == PPING_OUTPUT_STANDARD) - print_aggmetadata_standard(stdout, agg_conf); - else - print_aggmetadata_json(json_ctx, agg_conf); + if (!out_ctx->stream) + return; + + if (out_ctx->format == PPING_OUTPUT_STANDARD) + print_aggmetadata_standard(out_ctx->stream, agg_conf); + else if (out_ctx->jctx) + print_aggmetadata_json(out_ctx->jctx, agg_conf); } static void print_aggrtts_standard(FILE *stream, __u64 t, const char *prefixstr, @@ -1248,19 +1270,25 @@ exit: jsonw_end_object(ctx); } -static void print_aggregated_rtts(__u64 t, struct ipprefix_key *prefix, int af, +static void print_aggregated_rtts(struct output_context *out_ctx, __u64 t, + struct ipprefix_key *prefix, int af, __u8 prefix_len, struct aggregated_rtt_stats *rtt_stats, struct aggregation_config *agg_conf) { char prefixstr[INET6_PREFIXSTRLEN] = { 0 }; + + if (!out_ctx->stream) + return; + format_ipprefix(prefixstr, sizeof(prefixstr), af, prefix, prefix_len); - if (agg_conf->format == PPING_OUTPUT_STANDARD) - print_aggrtts_standard(stdout, t, prefixstr, rtt_stats, + if (out_ctx->format == PPING_OUTPUT_STANDARD) + print_aggrtts_standard(out_ctx->stream, t, prefixstr, rtt_stats, agg_conf); - else - print_aggrtts_json(json_ctx, t, prefixstr, rtt_stats, agg_conf); + else if (out_ctx->jctx) + print_aggrtts_json(out_ctx->jctx, t, prefixstr, rtt_stats, + agg_conf); } // Stolen from BPF selftests @@ -1299,9 +1327,10 @@ static int switch_agg_map(int map_active_fd) } static void report_aggregated_rtt_mapentry( - struct ipprefix_key *prefix, struct aggregated_rtt_stats *percpu_stats, - int n_cpus, int af, __u8 prefix_len, __u64 t_monotonic, - struct aggregation_config *agg_conf, bool *del_entry) + struct output_context *out_ctx, struct ipprefix_key *prefix, + struct aggregated_rtt_stats *percpu_stats, int n_cpus, int af, + __u8 prefix_len, __u64 t_monotonic, struct aggregation_config *agg_conf, + bool *del_entry) { struct aggregated_rtt_stats merged_stats; struct ipprefix_key backup_key = { 0 }; @@ -1328,8 +1357,8 @@ static void report_aggregated_rtt_mapentry( // Only print and clear prefixes which have RTT samples if (!aggregated_rtt_stats_empty(&merged_stats)) { - print_aggregated_rtts(t_monotonic, prefix, af, prefix_len, - &merged_stats, agg_conf); + print_aggregated_rtts(out_ctx, t_monotonic, prefix, af, + prefix_len, &merged_stats, agg_conf); // Clear out the reported stats if (!*del_entry) @@ -1339,8 +1368,8 @@ static void report_aggregated_rtt_mapentry( } } -static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len, - __u64 t_monotonic, +static int report_aggregated_rtt_map(struct output_context *out_ctx, int map_fd, + int af, __u8 prefix_len, __u64 t_monotonic, struct aggregation_config *agg_conf) { struct aggregated_rtt_stats *values = NULL; @@ -1374,11 +1403,11 @@ static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len, } for (i = 0; i < count; i++) { - report_aggregated_rtt_mapentry(keys + i * keysize, - values + i * n_cpus, - n_cpus, af, prefix_len, - t_monotonic, agg_conf, - &del_key); + report_aggregated_rtt_mapentry( + out_ctx, keys + i * keysize, + values + i * n_cpus, n_cpus, af, prefix_len, + t_monotonic, agg_conf, &del_key); + if (del_key) memcpy(del_keys + del_idx++ * keysize, keys + i * keysize, keysize); @@ -1406,7 +1435,9 @@ exit: return err; } -static int report_aggregated_rtts(struct aggregation_maps *maps, + +static int report_aggregated_rtts(struct output_context *out_ctx, + struct aggregation_maps *maps, struct aggregation_config *agg_conf) { __u64 t = get_time_ns(CLOCK_MONOTONIC); @@ -1416,13 +1447,15 @@ static int report_aggregated_rtts(struct aggregation_maps *maps, if (map_idx < 0) return map_idx; - err = report_aggregated_rtt_map(maps->map_v4_fd[map_idx], AF_INET, - agg_conf->ipv4_prefix_len, t, agg_conf); + err = report_aggregated_rtt_map(out_ctx, maps->map_v4_fd[map_idx], + AF_INET, agg_conf->ipv4_prefix_len, t, + agg_conf); if (err) return err; - err = report_aggregated_rtt_map(maps->map_v6_fd[map_idx], AF_INET6, - agg_conf->ipv6_prefix_len, t, agg_conf); + err = report_aggregated_rtt_map(out_ctx, maps->map_v6_fd[map_idx], + AF_INET6, agg_conf->ipv6_prefix_len, t, + agg_conf); return err; } @@ -1659,8 +1692,8 @@ static int init_perfbuffer(struct bpf_object *obj, struct pping_config *config, pb = perf_buffer__new( bpf_object__find_map_fd_by_name(obj, config->event_map), - PERF_BUFFER_PAGES, handle_event, handle_missed_events, NULL, - NULL); + PERF_BUFFER_PAGES, handle_event, handle_missed_events, + &config->out_ctx, NULL); err = libbpf_get_error(pb); if (err) { fprintf(stderr, "Failed to open perf buffer %s: %s\n", @@ -1816,7 +1849,9 @@ static int init_aggregation_timer(struct bpf_object *obj, return fd; } -static int handle_aggregation_timer(int timer_fd, struct aggregation_maps *maps, +static int handle_aggregation_timer(int timer_fd, + struct output_context *out_ctx, + struct aggregation_maps *maps, struct aggregation_config *agg_conf) { __u64 timer_exps; @@ -1834,7 +1869,7 @@ static int handle_aggregation_timer(int timer_fd, struct aggregation_maps *maps, timer_exps - 1); } - err = report_aggregated_rtts(maps, agg_conf); + err = report_aggregated_rtts(out_ctx, maps, agg_conf); if (err) { fprintf(stderr, "Failed reporting aggregated RTTs: %s\n", get_libbpf_strerror(err)); @@ -1941,7 +1976,8 @@ static int epoll_poll_events(int epfd, struct pping_config *config, case PPING_EPEVENT_TYPE_AGGTIMER: err = handle_aggregation_timer( events[i].data.u64 & PPING_EPEVENT_MASK, - &config->agg_maps, &config->agg_conf); + &config->out_ctx, &config->agg_maps, + &config->agg_conf); break; case PPING_EPEVENT_TYPE_SIGNAL: err = handle_signalfd(events[i].data.u64 & @@ -1978,6 +2014,9 @@ int main(int argc, char *argv[]) .bpf_config = { .rate_limit = 100 * NS_PER_MS, .rtt_rate = 0, .use_srtt = false }, + .out_ctx = { .format = PPING_OUTPUT_STANDARD, + .stream = stdout, + .jctx = NULL }, .clean_args = { .cleanup_interval = 1 * NS_PER_SECOND, .valid_thread = false }, .agg_conf = { .aggregation_interval = 1 * NS_PER_SECOND, @@ -1997,12 +2036,11 @@ int main(int argc, char *argv[]) .tc_ingress_opts = tc_ingress_opts, .tc_egress_opts = tc_egress_opts, .xdp_mode = XDP_MODE_NATIVE, - .output_format = PPING_OUTPUT_STANDARD, }; // Detect if running as root if (geteuid() != 0) { - printf("This program must be run as root.\n"); + fprintf(stderr, "This program must be run as root.\n"); return EXIT_FAILURE; } @@ -2025,7 +2063,7 @@ int main(int argc, char *argv[]) if (!config.bpf_config.track_tcp && !config.bpf_config.track_icmp) config.bpf_config.track_tcp = true; - if (config.output_format == PPING_OUTPUT_PPVIZ) { + if (config.out_ctx.format == PPING_OUTPUT_PPVIZ) { if (config.bpf_config.agg_rtts) { fprintf(stderr, "The ppviz format does not support aggregated output\n"); @@ -2036,29 +2074,17 @@ int main(int argc, char *argv[]) "Warning: ppviz format mainly intended for TCP traffic, but may now include ICMP traffic as well\n"); } - switch (config.output_format) { - case PPING_OUTPUT_STANDARD: - print_event_func = print_event_standard; - break; - case PPING_OUTPUT_JSON: - print_event_func = print_event_json; - break; - case PPING_OUTPUT_PPVIZ: - print_event_func = print_event_ppviz; - break; - } - fprintf(stderr, "Starting ePPing in %s mode tracking %s on %s\n", - output_format_to_str(config.output_format), + output_format_to_str(config.out_ctx.format), tracked_protocols_to_str(&config), config.ifname); - if (config.output_format == PPING_OUTPUT_JSON) { - json_ctx = jsonw_new(stdout); - jsonw_start_array(json_ctx); + if (config.out_ctx.format == PPING_OUTPUT_JSON) { + config.out_ctx.jctx = jsonw_new(config.out_ctx.stream); + jsonw_start_array(config.out_ctx.jctx); } if (config.bpf_config.agg_rtts) - print_aggmetadata(&config.agg_conf); + print_aggmetadata(&config.out_ctx, &config.agg_conf); // Setup signalhandling (allow graceful shutdown on SIGINT/SIGTERM) sigfd = init_signalfd(); @@ -2179,9 +2205,9 @@ cleanup_sigfd: close(sigfd); cleanup_output: - if (config.output_format == PPING_OUTPUT_JSON && json_ctx) { - jsonw_end_array(json_ctx); - jsonw_destroy(&json_ctx); + if (config.out_ctx.format == PPING_OUTPUT_JSON && config.out_ctx.jctx) { + jsonw_end_array(config.out_ctx.jctx); + jsonw_destroy(&config.out_ctx.jctx); } return err != 0 || detach_err != 0;