pping: Add support for "flow events"

Add "flow events" (flow opening or closing so far) which will trigger
a printout of message.

Note: The ppviz format will only print out the traditional rtt events
as the format does not include opening/closing messages.

Signed-off-by: Simon Sundberg <simon.sundberg@kau.se>
This commit is contained in:
Simon Sundberg
2021-06-22 15:22:11 +02:00
parent 399c9dc935
commit 543f75c9d8
3 changed files with 242 additions and 74 deletions

View File

@@ -74,7 +74,7 @@ struct pping_config {
char *pin_dir;
char *packet_map;
char *flow_map;
char *rtt_map;
char *event_map;
int xdp_flags;
int ifindex;
char ifname[IF_NAMESIZE];
@@ -377,8 +377,10 @@ static bool packet_ts_timeout(void *val_ptr, __u64 now)
static bool flow_timeout(void *val_ptr, __u64 now)
{
__u64 ts = ((struct flow_state *)val_ptr)->last_timestamp;
if (now > ts && now - ts > FLOW_LIFETIME)
if (now > ts && now - ts > FLOW_LIFETIME) {
//TODO - create and "push" flow-closing event
return true;
}
return false;
}
@@ -517,34 +519,84 @@ static const char *proto_to_str(__u16 proto)
}
}
static void print_rtt_event_standard(void *ctx, int cpu, void *data,
__u32 data_size)
static const char *flowevent_to_str(enum flow_event_type fe)
{
const struct rtt_event *e = data;
switch (fe) {
case FLOW_EVENT_UNSPECIFIED:
return "unspecified event";
case FLOW_EVENT_OPENING:
return "opening";
case FLOW_EVENT_CLOSING:
return "closing";
default:
return "unknown";
}
}
static const char *eventreason_to_str(enum flow_event_reason er)
{
switch (er) {
case EVENT_REASON_SYN:
return "SYN";
case EVENT_REASON_SYN_ACK:
return "SYN-ACK";
case EVENT_REASON_FIRST_OBS_PCKT:
return "first observed packet";
case EVENT_REASON_FIN:
return "FIN";
case EVENT_REASON_FIN_ACK:
return "FIN-ACK";
case EVENT_REASON_RST:
return "RST";
case EVENT_REASON_FLOW_TIMEOUT:
return "flow timeout";
default:
return "unknown";
}
}
static void print_event_standard(void *ctx, int cpu, void *data,
__u32 data_size)
{
const struct flow_event *fe = data;
const struct rtt_event *re = data;
char saddr[INET6_ADDRSTRLEN];
char daddr[INET6_ADDRSTRLEN];
char timestr[9];
__u64 ts = convert_monotonic_to_realtime(e->timestamp);
__u64 ts = convert_monotonic_to_realtime(fe->timestamp);
time_t ts_s = ts / NS_PER_SECOND;
format_ip_address(saddr, sizeof(saddr), e->flow.ipv, &e->flow.saddr.ip);
format_ip_address(daddr, sizeof(daddr), e->flow.ipv, &e->flow.daddr.ip);
format_ip_address(saddr, sizeof(saddr), fe->flow.ipv,
&fe->flow.saddr.ip);
format_ip_address(daddr, sizeof(daddr), fe->flow.ipv,
&fe->flow.daddr.ip);
strftime(timestr, sizeof(timestr), "%H:%M:%S", localtime(&ts_s));
printf("%s.%09llu %llu.%06llu ms %llu.%06llu ms %s:%d+%s:%d\n", timestr,
ts % NS_PER_SECOND, e->rtt / NS_PER_MS, e->rtt % NS_PER_MS,
e->min_rtt / NS_PER_MS, e->min_rtt % NS_PER_MS, saddr,
ntohs(e->flow.saddr.port), daddr, ntohs(e->flow.daddr.port));
if (fe->event_type == EVENT_TYPE_RTT)
printf("%s.%09llu %llu.%06llu ms %llu.%06llu ms %s:%d+%s:%d\n",
timestr, ts % NS_PER_SECOND, re->rtt / NS_PER_MS,
re->rtt % NS_PER_MS, re->min_rtt / NS_PER_MS,
re->min_rtt % NS_PER_MS, saddr, ntohs(re->flow.saddr.port),
daddr, ntohs(re->flow.daddr.port));
else if (fe->event_type == EVENT_TYPE_FLOW)
printf("%s.%09llu %s:%d+%s:%d flow %s due to %s from %s\n",
timestr, ts & NS_PER_SECOND, saddr,
ntohs(fe->flow.saddr.port), daddr,
ntohs(fe->flow.daddr.port), flowevent_to_str(fe->event),
eventreason_to_str(fe->reason),
fe->from_egress ? "src" : "dest");
}
static void print_rtt_event_ppviz(void *ctx, int cpu, void *data,
__u32 data_size)
static void print_event_ppviz(void *ctx, int cpu, void *data, __u32 data_size)
{
const struct rtt_event *e = data;
char saddr[INET6_ADDRSTRLEN];
char daddr[INET6_ADDRSTRLEN];
__u64 time = convert_monotonic_to_realtime(e->timestamp);
if (e->event_type != EVENT_TYPE_RTT)
return;
format_ip_address(saddr, sizeof(saddr), e->flow.ipv, &e->flow.saddr.ip);
format_ip_address(daddr, sizeof(daddr), e->flow.ipv, &e->flow.daddr.ip);
@@ -555,16 +607,22 @@ static void print_rtt_event_ppviz(void *ctx, int cpu, void *data,
ntohs(e->flow.saddr.port), daddr, ntohs(e->flow.daddr.port));
}
static void print_rtt_event_json(void *ctx, int cpu, void *data,
__u32 data_size)
static void print_event_json(void *ctx, int cpu, void *data, __u32 data_size)
{
const struct rtt_event *e = data;
const struct flow_event *fe = data;
const struct rtt_event *re = data;
char saddr[INET6_ADDRSTRLEN];
char daddr[INET6_ADDRSTRLEN];
__u64 time = convert_monotonic_to_realtime(e->timestamp);
__u64 time = convert_monotonic_to_realtime(fe->timestamp);
format_ip_address(saddr, sizeof(saddr), e->flow.ipv, &e->flow.saddr.ip);
format_ip_address(daddr, sizeof(daddr), e->flow.ipv, &e->flow.daddr.ip);
if (fe->event_type != EVENT_TYPE_RTT &&
fe->event_type != EVENT_TYPE_FLOW)
return;
format_ip_address(saddr, sizeof(saddr), fe->flow.ipv,
&fe->flow.saddr.ip);
format_ip_address(daddr, sizeof(daddr), fe->flow.ipv,
&fe->flow.daddr.ip);
if (!json_ctx) {
json_ctx = jsonw_new(stdout);
@@ -573,17 +631,29 @@ static void print_rtt_event_json(void *ctx, int cpu, void *data,
jsonw_start_object(json_ctx);
jsonw_u64_field(json_ctx, "timestamp", time);
jsonw_u64_field(json_ctx, "rtt", e->rtt);
jsonw_u64_field(json_ctx, "min_rtt", e->min_rtt);
jsonw_string_field(json_ctx, "src_ip", saddr);
jsonw_hu_field(json_ctx, "src_port", ntohs(e->flow.saddr.port));
jsonw_hu_field(json_ctx, "src_port", ntohs(fe->flow.saddr.port));
jsonw_string_field(json_ctx, "dest_ip", daddr);
jsonw_hu_field(json_ctx, "dest_port", ntohs(e->flow.daddr.port));
jsonw_string_field(json_ctx, "protocol", proto_to_str(e->flow.proto));
jsonw_u64_field(json_ctx, "sent_packets", e->sent_pkts);
jsonw_u64_field(json_ctx, "sent_bytes", e->sent_bytes);
jsonw_u64_field(json_ctx, "rec_packets", e->rec_pkts);
jsonw_u64_field(json_ctx, "rec_bytes", e->rec_bytes);
jsonw_hu_field(json_ctx, "dest_port", ntohs(fe->flow.daddr.port));
jsonw_string_field(json_ctx, "protocol", proto_to_str(fe->flow.proto));
if (fe->event_type == EVENT_TYPE_RTT) {
jsonw_u64_field(json_ctx, "rtt", re->rtt);
jsonw_u64_field(json_ctx, "min_rtt", re->min_rtt);
jsonw_u64_field(json_ctx, "sent_packets", re->sent_pkts);
jsonw_u64_field(json_ctx, "sent_bytes", re->sent_bytes);
jsonw_u64_field(json_ctx, "rec_packets", re->rec_pkts);
jsonw_u64_field(json_ctx, "rec_bytes", re->rec_bytes);
} else if (fe->event_type == EVENT_TYPE_FLOW) {
jsonw_string_field(json_ctx, "flow_event",
flowevent_to_str(fe->event));
jsonw_string_field(json_ctx, "reason",
eventreason_to_str(fe->reason));
jsonw_string_field(json_ctx, "from",
fe->from_egress ? "src" : "dest");
}
jsonw_end_object(json_ctx);
}
@@ -710,13 +780,13 @@ int main(int argc, char *argv[])
.pin_dir = "/sys/fs/bpf/pping",
.packet_map = "packet_ts",
.flow_map = "flow_state",
.rtt_map = "rtt_events",
.event_map = "events",
.xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
};
struct perf_buffer *pb = NULL;
struct perf_buffer_opts pb_opts = {
.sample_cb = print_rtt_event_standard,
.sample_cb = print_event_standard,
.lost_cb = handle_missed_rtt_event,
};
@@ -743,9 +813,9 @@ int main(int argc, char *argv[])
}
if (config.json_format)
pb_opts.sample_cb = print_rtt_event_json;
pb_opts.sample_cb = print_event_json;
else if (config.ppviz_format)
pb_opts.sample_cb = print_rtt_event_ppviz;
pb_opts.sample_cb = print_event_ppviz;
err = load_attach_bpfprogs(&obj, &config, &tc_attached, &xdp_attached);
if (err) {
@@ -764,13 +834,13 @@ int main(int argc, char *argv[])
// Set up perf buffer
pb = perf_buffer__new(bpf_object__find_map_fd_by_name(obj,
config.rtt_map),
config.event_map),
PERF_BUFFER_PAGES, &pb_opts);
err = libbpf_get_error(pb);
if (err) {
pb = NULL;
fprintf(stderr, "Failed to open perf buffer %s: %s\n",
config.rtt_map, strerror(err));
config.event_map, strerror(err));
goto cleanup;
}