pping: Refactor output handling

Refactor code for various output functions to avoid hard-coding stdout
and relying on global variables. Collect output-related parameters
into a new output_context struct which can easily be passed as an
explicit argument to output related functions. Get rid of the global
json_ctx and print_event_func variables, as corresponding information
is now included in the output_context which is directly passed to the
required functions.

Overall, these changes aim to make it more flexible how and where
output is written. This will make it easier to for example add support
for writing directly to files in the future.

Signed-off-by: Simon Sundberg <simon.sundberg@kau.se>
This commit is contained in:
Simon Sundberg
2023-06-21 19:48:14 +02:00
parent 1319871358
commit 1e704be790

View File

@@ -103,6 +103,12 @@ struct map_cleanup_args {
bool valid_thread;
};
struct output_context {
FILE *stream;
json_writer_t *jctx;
enum pping_output_format format;
};
struct aggregation_config {
__u64 aggregation_interval;
__u64 timeout_interval;
@@ -110,7 +116,6 @@ struct aggregation_config {
__u64 bin_width;
__u8 ipv4_prefix_len;
__u8 ipv6_prefix_len;
enum pping_output_format format;
};
struct aggregation_maps {
@@ -124,6 +129,7 @@ struct pping_config {
struct bpf_config bpf_config;
struct bpf_tc_opts tc_ingress_opts;
struct bpf_tc_opts tc_egress_opts;
struct output_context out_ctx;
struct map_cleanup_args clean_args;
struct aggregation_config agg_conf;
struct aggregation_maps agg_maps;
@@ -140,15 +146,11 @@ struct pping_config {
int ingress_prog_id;
int egress_prog_id;
char ifname[IF_NAMESIZE];
enum pping_output_format output_format;
enum xdp_attach_mode xdp_mode;
bool force;
bool created_tc_hook;
};
static json_writer_t *json_ctx = NULL;
static void (*print_event_func)(const union pping_event *) = NULL;
static const struct option long_options[] = {
{ "help", no_argument, NULL, 'h' },
{ "interface", required_argument, NULL, 'i' }, // Name of interface to run on
@@ -338,11 +340,11 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config)
break;
case 'F':
if (strcmp(optarg, "standard") == 0) {
config->output_format = PPING_OUTPUT_STANDARD;
config->out_ctx.format = PPING_OUTPUT_STANDARD;
} else if (strcmp(optarg, "json") == 0) {
config->output_format = PPING_OUTPUT_JSON;
config->out_ctx.format = PPING_OUTPUT_JSON;
} else if (strcmp(optarg, "ppviz") == 0) {
config->output_format = PPING_OUTPUT_PPVIZ;
config->out_ctx.format = PPING_OUTPUT_PPVIZ;
} else {
fprintf(stderr,
"format must be \"standard\", \"json\" or \"ppviz\"\n");
@@ -449,8 +451,6 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config)
htobe64(0xffffffffffffffffUL
<< (64 - config->agg_conf.ipv6_prefix_len));
config->agg_conf.format = config->output_format;
return 0;
}
@@ -943,30 +943,28 @@ static void print_ns_datetime(FILE *stream, __u64 monotonic_ns)
fprintf(stream, "%s.%09llu", timestr, ts % NS_PER_SECOND);
}
static void print_event_standard(const union pping_event *e)
static void print_event_standard(FILE *stream, const union pping_event *e)
{
if (e->event_type == EVENT_TYPE_RTT) {
print_ns_datetime(stdout, e->rtt_event.timestamp);
printf(" %llu.%06llu ms %llu.%06llu ms %s ",
e->rtt_event.rtt / NS_PER_MS,
e->rtt_event.rtt % NS_PER_MS,
e->rtt_event.min_rtt / NS_PER_MS,
e->rtt_event.min_rtt % NS_PER_MS,
proto_to_str(e->rtt_event.flow.proto));
print_flow_ppvizformat(stdout, &e->rtt_event.flow);
printf("\n");
print_ns_datetime(stream, e->rtt_event.timestamp);
fprintf(stream, " %.6g ms %.6g ms %s ",
(double)e->rtt_event.rtt / NS_PER_MS,
(double)e->rtt_event.min_rtt / NS_PER_MS,
proto_to_str(e->rtt_event.flow.proto));
print_flow_ppvizformat(stream, &e->rtt_event.flow);
fprintf(stream, "\n");
} else if (e->event_type == EVENT_TYPE_FLOW) {
print_ns_datetime(stdout, e->flow_event.timestamp);
printf(" %s ", proto_to_str(e->rtt_event.flow.proto));
print_flow_ppvizformat(stdout, &e->flow_event.flow);
printf(" %s due to %s from %s\n",
flowevent_to_str(e->flow_event.flow_event_type),
eventreason_to_str(e->flow_event.reason),
eventsource_to_str(e->flow_event.source));
print_ns_datetime(stream, e->flow_event.timestamp);
fprintf(stream, " %s ", proto_to_str(e->rtt_event.flow.proto));
print_flow_ppvizformat(stream, &e->flow_event.flow);
fprintf(stream, " %s due to %s from %s\n",
flowevent_to_str(e->flow_event.flow_event_type),
eventreason_to_str(e->flow_event.reason),
eventsource_to_str(e->flow_event.source));
}
}
static void print_event_ppviz(const union pping_event *e)
static void print_event_ppviz(FILE *stream, const union pping_event *e)
{
// ppviz format does not support flow events
if (e->event_type != EVENT_TYPE_RTT)
@@ -975,12 +973,11 @@ static void print_event_ppviz(const union pping_event *e)
const struct rtt_event *re = &e->rtt_event;
__u64 time = convert_monotonic_to_realtime(re->timestamp);
printf("%llu.%09llu %llu.%09llu %llu.%09llu ", time / NS_PER_SECOND,
time % NS_PER_SECOND, re->rtt / NS_PER_SECOND,
re->rtt % NS_PER_SECOND, re->min_rtt / NS_PER_SECOND,
re->min_rtt);
print_flow_ppvizformat(stdout, &re->flow);
printf("\n");
fprintf(stream, "%.9g %.9g %.9g ", (double)time / NS_PER_SECOND,
(double)re->rtt / NS_PER_SECOND,
(double)re->min_rtt / NS_PER_SECOND);
print_flow_ppvizformat(stream, &re->flow);
fprintf(stream, "\n");
}
static void print_common_fields_json(json_writer_t *ctx,
@@ -1023,32 +1020,52 @@ static void print_flowevent_fields_json(json_writer_t *ctx,
jsonw_string_field(ctx, "triggered_by", eventsource_to_str(fe->source));
}
static void print_event_json(const union pping_event *e)
static void print_event_json(json_writer_t *jctx, const union pping_event *e)
{
if (e->event_type != EVENT_TYPE_RTT && e->event_type != EVENT_TYPE_FLOW)
return;
jsonw_start_object(json_ctx);
print_common_fields_json(json_ctx, e);
jsonw_start_object(jctx);
print_common_fields_json(jctx, e);
if (e->event_type == EVENT_TYPE_RTT)
print_rttevent_fields_json(json_ctx, &e->rtt_event);
print_rttevent_fields_json(jctx, &e->rtt_event);
else // flow-event
print_flowevent_fields_json(json_ctx, &e->flow_event);
jsonw_end_object(json_ctx);
print_flowevent_fields_json(jctx, &e->flow_event);
jsonw_end_object(jctx);
}
static void warn_map_full(const struct map_full_event *e)
static void print_event(struct output_context *out_ctx,
const union pping_event *pe)
{
print_ns_datetime(stderr, e->timestamp);
fprintf(stderr, " Warning: Unable to create %s entry for flow ",
if (!out_ctx->stream)
return;
switch (out_ctx->format) {
case PPING_OUTPUT_STANDARD:
print_event_standard(out_ctx->stream, pe);
break;
case PPING_OUTPUT_JSON:
if (out_ctx->jctx)
print_event_json(out_ctx->jctx, pe);
break;
case PPING_OUTPUT_PPVIZ:
print_event_ppviz(out_ctx->stream, pe);
break;
}
}
static void warn_map_full(FILE *stream, const struct map_full_event *e)
{
print_ns_datetime(stream, e->timestamp);
fprintf(stream, " Warning: Unable to create %s entry for flow ",
e->map == PPING_MAP_FLOWSTATE ? "flow" : "timestamp");
print_flow_ppvizformat(stderr, &e->flow);
fprintf(stderr, "\n");
print_flow_ppvizformat(stream, &e->flow);
fprintf(stream, "\n");
}
static void print_map_clean_info(const struct map_clean_event *e)
static void print_map_clean_info(FILE *stream, const struct map_clean_event *e)
{
fprintf(stderr,
fprintf(stream,
"%s: cycle: %u, entries: %u, time: %llu, timeout: %u, tot timeout: %llu, selfdel: %u, tot selfdel: %llu\n",
e->map == PPING_MAP_PACKETTS ? "packet_ts" : "flow_state",
e->clean_cycles, e->last_processed_entries, e->last_runtime,
@@ -1058,6 +1075,7 @@ static void print_map_clean_info(const struct map_clean_event *e)
static void handle_event(void *ctx, int cpu, void *data, __u32 data_size)
{
struct output_context *out_ctx = ctx;
const union pping_event *e = data;
if (data_size < sizeof(e->event_type))
@@ -1065,14 +1083,14 @@ static void handle_event(void *ctx, int cpu, void *data, __u32 data_size)
switch (e->event_type) {
case EVENT_TYPE_MAP_FULL:
warn_map_full(&e->map_event);
warn_map_full(stderr, &e->map_event);
break;
case EVENT_TYPE_MAP_CLEAN:
print_map_clean_info(&e->map_clean_event);
print_map_clean_info(stderr, &e->map_clean_event);
break;
case EVENT_TYPE_RTT:
case EVENT_TYPE_FLOW:
print_event_func(e);
print_event(out_ctx, e);
break;
default:
fprintf(stderr, "Warning: Unknown event type %llu\n",
@@ -1170,12 +1188,16 @@ static void print_aggmetadata_json(json_writer_t *ctx,
jsonw_end_object(ctx);
}
static void print_aggmetadata(struct aggregation_config *agg_conf)
static void print_aggmetadata(struct output_context *out_ctx,
struct aggregation_config *agg_conf)
{
if (agg_conf->format == PPING_OUTPUT_STANDARD)
print_aggmetadata_standard(stdout, agg_conf);
else
print_aggmetadata_json(json_ctx, agg_conf);
if (!out_ctx->stream)
return;
if (out_ctx->format == PPING_OUTPUT_STANDARD)
print_aggmetadata_standard(out_ctx->stream, agg_conf);
else if (out_ctx->jctx)
print_aggmetadata_json(out_ctx->jctx, agg_conf);
}
static void print_aggrtts_standard(FILE *stream, __u64 t, const char *prefixstr,
@@ -1248,19 +1270,25 @@ exit:
jsonw_end_object(ctx);
}
static void print_aggregated_rtts(__u64 t, struct ipprefix_key *prefix, int af,
static void print_aggregated_rtts(struct output_context *out_ctx, __u64 t,
struct ipprefix_key *prefix, int af,
__u8 prefix_len,
struct aggregated_rtt_stats *rtt_stats,
struct aggregation_config *agg_conf)
{
char prefixstr[INET6_PREFIXSTRLEN] = { 0 };
if (!out_ctx->stream)
return;
format_ipprefix(prefixstr, sizeof(prefixstr), af, prefix, prefix_len);
if (agg_conf->format == PPING_OUTPUT_STANDARD)
print_aggrtts_standard(stdout, t, prefixstr, rtt_stats,
if (out_ctx->format == PPING_OUTPUT_STANDARD)
print_aggrtts_standard(out_ctx->stream, t, prefixstr, rtt_stats,
agg_conf);
else
print_aggrtts_json(json_ctx, t, prefixstr, rtt_stats, agg_conf);
else if (out_ctx->jctx)
print_aggrtts_json(out_ctx->jctx, t, prefixstr, rtt_stats,
agg_conf);
}
// Stolen from BPF selftests
@@ -1299,9 +1327,10 @@ static int switch_agg_map(int map_active_fd)
}
static void report_aggregated_rtt_mapentry(
struct ipprefix_key *prefix, struct aggregated_rtt_stats *percpu_stats,
int n_cpus, int af, __u8 prefix_len, __u64 t_monotonic,
struct aggregation_config *agg_conf, bool *del_entry)
struct output_context *out_ctx, struct ipprefix_key *prefix,
struct aggregated_rtt_stats *percpu_stats, int n_cpus, int af,
__u8 prefix_len, __u64 t_monotonic, struct aggregation_config *agg_conf,
bool *del_entry)
{
struct aggregated_rtt_stats merged_stats;
struct ipprefix_key backup_key = { 0 };
@@ -1328,8 +1357,8 @@ static void report_aggregated_rtt_mapentry(
// Only print and clear prefixes which have RTT samples
if (!aggregated_rtt_stats_empty(&merged_stats)) {
print_aggregated_rtts(t_monotonic, prefix, af, prefix_len,
&merged_stats, agg_conf);
print_aggregated_rtts(out_ctx, t_monotonic, prefix, af,
prefix_len, &merged_stats, agg_conf);
// Clear out the reported stats
if (!*del_entry)
@@ -1339,8 +1368,8 @@ static void report_aggregated_rtt_mapentry(
}
}
static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len,
__u64 t_monotonic,
static int report_aggregated_rtt_map(struct output_context *out_ctx, int map_fd,
int af, __u8 prefix_len, __u64 t_monotonic,
struct aggregation_config *agg_conf)
{
struct aggregated_rtt_stats *values = NULL;
@@ -1374,11 +1403,11 @@ static int report_aggregated_rtt_map(int map_fd, int af, __u8 prefix_len,
}
for (i = 0; i < count; i++) {
report_aggregated_rtt_mapentry(keys + i * keysize,
values + i * n_cpus,
n_cpus, af, prefix_len,
t_monotonic, agg_conf,
&del_key);
report_aggregated_rtt_mapentry(
out_ctx, keys + i * keysize,
values + i * n_cpus, n_cpus, af, prefix_len,
t_monotonic, agg_conf, &del_key);
if (del_key)
memcpy(del_keys + del_idx++ * keysize,
keys + i * keysize, keysize);
@@ -1406,7 +1435,9 @@ exit:
return err;
}
static int report_aggregated_rtts(struct aggregation_maps *maps,
static int report_aggregated_rtts(struct output_context *out_ctx,
struct aggregation_maps *maps,
struct aggregation_config *agg_conf)
{
__u64 t = get_time_ns(CLOCK_MONOTONIC);
@@ -1416,13 +1447,15 @@ static int report_aggregated_rtts(struct aggregation_maps *maps,
if (map_idx < 0)
return map_idx;
err = report_aggregated_rtt_map(maps->map_v4_fd[map_idx], AF_INET,
agg_conf->ipv4_prefix_len, t, agg_conf);
err = report_aggregated_rtt_map(out_ctx, maps->map_v4_fd[map_idx],
AF_INET, agg_conf->ipv4_prefix_len, t,
agg_conf);
if (err)
return err;
err = report_aggregated_rtt_map(maps->map_v6_fd[map_idx], AF_INET6,
agg_conf->ipv6_prefix_len, t, agg_conf);
err = report_aggregated_rtt_map(out_ctx, maps->map_v6_fd[map_idx],
AF_INET6, agg_conf->ipv6_prefix_len, t,
agg_conf);
return err;
}
@@ -1659,8 +1692,8 @@ static int init_perfbuffer(struct bpf_object *obj, struct pping_config *config,
pb = perf_buffer__new(
bpf_object__find_map_fd_by_name(obj, config->event_map),
PERF_BUFFER_PAGES, handle_event, handle_missed_events, NULL,
NULL);
PERF_BUFFER_PAGES, handle_event, handle_missed_events,
&config->out_ctx, NULL);
err = libbpf_get_error(pb);
if (err) {
fprintf(stderr, "Failed to open perf buffer %s: %s\n",
@@ -1816,7 +1849,9 @@ static int init_aggregation_timer(struct bpf_object *obj,
return fd;
}
static int handle_aggregation_timer(int timer_fd, struct aggregation_maps *maps,
static int handle_aggregation_timer(int timer_fd,
struct output_context *out_ctx,
struct aggregation_maps *maps,
struct aggregation_config *agg_conf)
{
__u64 timer_exps;
@@ -1834,7 +1869,7 @@ static int handle_aggregation_timer(int timer_fd, struct aggregation_maps *maps,
timer_exps - 1);
}
err = report_aggregated_rtts(maps, agg_conf);
err = report_aggregated_rtts(out_ctx, maps, agg_conf);
if (err) {
fprintf(stderr, "Failed reporting aggregated RTTs: %s\n",
get_libbpf_strerror(err));
@@ -1941,7 +1976,8 @@ static int epoll_poll_events(int epfd, struct pping_config *config,
case PPING_EPEVENT_TYPE_AGGTIMER:
err = handle_aggregation_timer(
events[i].data.u64 & PPING_EPEVENT_MASK,
&config->agg_maps, &config->agg_conf);
&config->out_ctx, &config->agg_maps,
&config->agg_conf);
break;
case PPING_EPEVENT_TYPE_SIGNAL:
err = handle_signalfd(events[i].data.u64 &
@@ -1978,6 +2014,9 @@ int main(int argc, char *argv[])
.bpf_config = { .rate_limit = 100 * NS_PER_MS,
.rtt_rate = 0,
.use_srtt = false },
.out_ctx = { .format = PPING_OUTPUT_STANDARD,
.stream = stdout,
.jctx = NULL },
.clean_args = { .cleanup_interval = 1 * NS_PER_SECOND,
.valid_thread = false },
.agg_conf = { .aggregation_interval = 1 * NS_PER_SECOND,
@@ -1997,12 +2036,11 @@ int main(int argc, char *argv[])
.tc_ingress_opts = tc_ingress_opts,
.tc_egress_opts = tc_egress_opts,
.xdp_mode = XDP_MODE_NATIVE,
.output_format = PPING_OUTPUT_STANDARD,
};
// Detect if running as root
if (geteuid() != 0) {
printf("This program must be run as root.\n");
fprintf(stderr, "This program must be run as root.\n");
return EXIT_FAILURE;
}
@@ -2025,7 +2063,7 @@ int main(int argc, char *argv[])
if (!config.bpf_config.track_tcp && !config.bpf_config.track_icmp)
config.bpf_config.track_tcp = true;
if (config.output_format == PPING_OUTPUT_PPVIZ) {
if (config.out_ctx.format == PPING_OUTPUT_PPVIZ) {
if (config.bpf_config.agg_rtts) {
fprintf(stderr,
"The ppviz format does not support aggregated output\n");
@@ -2036,29 +2074,17 @@ int main(int argc, char *argv[])
"Warning: ppviz format mainly intended for TCP traffic, but may now include ICMP traffic as well\n");
}
switch (config.output_format) {
case PPING_OUTPUT_STANDARD:
print_event_func = print_event_standard;
break;
case PPING_OUTPUT_JSON:
print_event_func = print_event_json;
break;
case PPING_OUTPUT_PPVIZ:
print_event_func = print_event_ppviz;
break;
}
fprintf(stderr, "Starting ePPing in %s mode tracking %s on %s\n",
output_format_to_str(config.output_format),
output_format_to_str(config.out_ctx.format),
tracked_protocols_to_str(&config), config.ifname);
if (config.output_format == PPING_OUTPUT_JSON) {
json_ctx = jsonw_new(stdout);
jsonw_start_array(json_ctx);
if (config.out_ctx.format == PPING_OUTPUT_JSON) {
config.out_ctx.jctx = jsonw_new(config.out_ctx.stream);
jsonw_start_array(config.out_ctx.jctx);
}
if (config.bpf_config.agg_rtts)
print_aggmetadata(&config.agg_conf);
print_aggmetadata(&config.out_ctx, &config.agg_conf);
// Setup signalhandling (allow graceful shutdown on SIGINT/SIGTERM)
sigfd = init_signalfd();
@@ -2179,9 +2205,9 @@ cleanup_sigfd:
close(sigfd);
cleanup_output:
if (config.output_format == PPING_OUTPUT_JSON && json_ctx) {
jsonw_end_array(json_ctx);
jsonw_destroy(&json_ctx);
if (config.out_ctx.format == PPING_OUTPUT_JSON && config.out_ctx.jctx) {
jsonw_end_array(config.out_ctx.jctx);
jsonw_destroy(&config.out_ctx.jctx);
}
return err != 0 || detach_err != 0;