diff --git a/pping/pping.c b/pping/pping.c index 9aa9e58..c29d193 100644 --- a/pping/pping.c +++ b/pping/pping.c @@ -74,7 +74,7 @@ struct pping_config { char *pin_dir; char *packet_map; char *flow_map; - char *rtt_map; + char *event_map; int xdp_flags; int ifindex; char ifname[IF_NAMESIZE]; @@ -377,8 +377,10 @@ static bool packet_ts_timeout(void *val_ptr, __u64 now) static bool flow_timeout(void *val_ptr, __u64 now) { __u64 ts = ((struct flow_state *)val_ptr)->last_timestamp; - if (now > ts && now - ts > FLOW_LIFETIME) + if (now > ts && now - ts > FLOW_LIFETIME) { + //TODO - create and "push" flow-closing event return true; + } return false; } @@ -517,34 +519,84 @@ static const char *proto_to_str(__u16 proto) } } -static void print_rtt_event_standard(void *ctx, int cpu, void *data, - __u32 data_size) +static const char *flowevent_to_str(enum flow_event_type fe) { - const struct rtt_event *e = data; + switch (fe) { + case FLOW_EVENT_UNSPECIFIED: + return "unspecified event"; + case FLOW_EVENT_OPENING: + return "opening"; + case FLOW_EVENT_CLOSING: + return "closing"; + default: + return "unknown"; + } +} + +static const char *eventreason_to_str(enum flow_event_reason er) +{ + switch (er) { + case EVENT_REASON_SYN: + return "SYN"; + case EVENT_REASON_SYN_ACK: + return "SYN-ACK"; + case EVENT_REASON_FIRST_OBS_PCKT: + return "first observed packet"; + case EVENT_REASON_FIN: + return "FIN"; + case EVENT_REASON_FIN_ACK: + return "FIN-ACK"; + case EVENT_REASON_RST: + return "RST"; + case EVENT_REASON_FLOW_TIMEOUT: + return "flow timeout"; + default: + return "unknown"; + } +} + +static void print_event_standard(void *ctx, int cpu, void *data, + __u32 data_size) +{ + const struct flow_event *fe = data; + const struct rtt_event *re = data; char saddr[INET6_ADDRSTRLEN]; char daddr[INET6_ADDRSTRLEN]; char timestr[9]; - __u64 ts = convert_monotonic_to_realtime(e->timestamp); + __u64 ts = convert_monotonic_to_realtime(fe->timestamp); time_t ts_s = ts / NS_PER_SECOND; - format_ip_address(saddr, sizeof(saddr), e->flow.ipv, &e->flow.saddr.ip); - format_ip_address(daddr, sizeof(daddr), e->flow.ipv, &e->flow.daddr.ip); + format_ip_address(saddr, sizeof(saddr), fe->flow.ipv, + &fe->flow.saddr.ip); + format_ip_address(daddr, sizeof(daddr), fe->flow.ipv, + &fe->flow.daddr.ip); strftime(timestr, sizeof(timestr), "%H:%M:%S", localtime(&ts_s)); - printf("%s.%09llu %llu.%06llu ms %llu.%06llu ms %s:%d+%s:%d\n", timestr, - ts % NS_PER_SECOND, e->rtt / NS_PER_MS, e->rtt % NS_PER_MS, - e->min_rtt / NS_PER_MS, e->min_rtt % NS_PER_MS, saddr, - ntohs(e->flow.saddr.port), daddr, ntohs(e->flow.daddr.port)); + if (fe->event_type == EVENT_TYPE_RTT) + printf("%s.%09llu %llu.%06llu ms %llu.%06llu ms %s:%d+%s:%d\n", + timestr, ts % NS_PER_SECOND, re->rtt / NS_PER_MS, + re->rtt % NS_PER_MS, re->min_rtt / NS_PER_MS, + re->min_rtt % NS_PER_MS, saddr, ntohs(re->flow.saddr.port), + daddr, ntohs(re->flow.daddr.port)); + else if (fe->event_type == EVENT_TYPE_FLOW) + printf("%s.%09llu %s:%d+%s:%d flow %s due to %s from %s\n", + timestr, ts & NS_PER_SECOND, saddr, + ntohs(fe->flow.saddr.port), daddr, + ntohs(fe->flow.daddr.port), flowevent_to_str(fe->event), + eventreason_to_str(fe->reason), + fe->from_egress ? "src" : "dest"); } -static void print_rtt_event_ppviz(void *ctx, int cpu, void *data, - __u32 data_size) +static void print_event_ppviz(void *ctx, int cpu, void *data, __u32 data_size) { const struct rtt_event *e = data; char saddr[INET6_ADDRSTRLEN]; char daddr[INET6_ADDRSTRLEN]; __u64 time = convert_monotonic_to_realtime(e->timestamp); + if (e->event_type != EVENT_TYPE_RTT) + return; + format_ip_address(saddr, sizeof(saddr), e->flow.ipv, &e->flow.saddr.ip); format_ip_address(daddr, sizeof(daddr), e->flow.ipv, &e->flow.daddr.ip); @@ -555,16 +607,22 @@ static void print_rtt_event_ppviz(void *ctx, int cpu, void *data, ntohs(e->flow.saddr.port), daddr, ntohs(e->flow.daddr.port)); } -static void print_rtt_event_json(void *ctx, int cpu, void *data, - __u32 data_size) +static void print_event_json(void *ctx, int cpu, void *data, __u32 data_size) { - const struct rtt_event *e = data; + const struct flow_event *fe = data; + const struct rtt_event *re = data; char saddr[INET6_ADDRSTRLEN]; char daddr[INET6_ADDRSTRLEN]; - __u64 time = convert_monotonic_to_realtime(e->timestamp); + __u64 time = convert_monotonic_to_realtime(fe->timestamp); - format_ip_address(saddr, sizeof(saddr), e->flow.ipv, &e->flow.saddr.ip); - format_ip_address(daddr, sizeof(daddr), e->flow.ipv, &e->flow.daddr.ip); + if (fe->event_type != EVENT_TYPE_RTT && + fe->event_type != EVENT_TYPE_FLOW) + return; + + format_ip_address(saddr, sizeof(saddr), fe->flow.ipv, + &fe->flow.saddr.ip); + format_ip_address(daddr, sizeof(daddr), fe->flow.ipv, + &fe->flow.daddr.ip); if (!json_ctx) { json_ctx = jsonw_new(stdout); @@ -573,17 +631,29 @@ static void print_rtt_event_json(void *ctx, int cpu, void *data, jsonw_start_object(json_ctx); jsonw_u64_field(json_ctx, "timestamp", time); - jsonw_u64_field(json_ctx, "rtt", e->rtt); - jsonw_u64_field(json_ctx, "min_rtt", e->min_rtt); jsonw_string_field(json_ctx, "src_ip", saddr); - jsonw_hu_field(json_ctx, "src_port", ntohs(e->flow.saddr.port)); + jsonw_hu_field(json_ctx, "src_port", ntohs(fe->flow.saddr.port)); jsonw_string_field(json_ctx, "dest_ip", daddr); - jsonw_hu_field(json_ctx, "dest_port", ntohs(e->flow.daddr.port)); - jsonw_string_field(json_ctx, "protocol", proto_to_str(e->flow.proto)); - jsonw_u64_field(json_ctx, "sent_packets", e->sent_pkts); - jsonw_u64_field(json_ctx, "sent_bytes", e->sent_bytes); - jsonw_u64_field(json_ctx, "rec_packets", e->rec_pkts); - jsonw_u64_field(json_ctx, "rec_bytes", e->rec_bytes); + jsonw_hu_field(json_ctx, "dest_port", ntohs(fe->flow.daddr.port)); + jsonw_string_field(json_ctx, "protocol", proto_to_str(fe->flow.proto)); + + if (fe->event_type == EVENT_TYPE_RTT) { + jsonw_u64_field(json_ctx, "rtt", re->rtt); + jsonw_u64_field(json_ctx, "min_rtt", re->min_rtt); + jsonw_u64_field(json_ctx, "sent_packets", re->sent_pkts); + jsonw_u64_field(json_ctx, "sent_bytes", re->sent_bytes); + jsonw_u64_field(json_ctx, "rec_packets", re->rec_pkts); + jsonw_u64_field(json_ctx, "rec_bytes", re->rec_bytes); + + } else if (fe->event_type == EVENT_TYPE_FLOW) { + jsonw_string_field(json_ctx, "flow_event", + flowevent_to_str(fe->event)); + jsonw_string_field(json_ctx, "reason", + eventreason_to_str(fe->reason)); + jsonw_string_field(json_ctx, "from", + fe->from_egress ? "src" : "dest"); + } + jsonw_end_object(json_ctx); } @@ -710,13 +780,13 @@ int main(int argc, char *argv[]) .pin_dir = "/sys/fs/bpf/pping", .packet_map = "packet_ts", .flow_map = "flow_state", - .rtt_map = "rtt_events", + .event_map = "events", .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST, }; struct perf_buffer *pb = NULL; struct perf_buffer_opts pb_opts = { - .sample_cb = print_rtt_event_standard, + .sample_cb = print_event_standard, .lost_cb = handle_missed_rtt_event, }; @@ -743,9 +813,9 @@ int main(int argc, char *argv[]) } if (config.json_format) - pb_opts.sample_cb = print_rtt_event_json; + pb_opts.sample_cb = print_event_json; else if (config.ppviz_format) - pb_opts.sample_cb = print_rtt_event_ppviz; + pb_opts.sample_cb = print_event_ppviz; err = load_attach_bpfprogs(&obj, &config, &tc_attached, &xdp_attached); if (err) { @@ -764,13 +834,13 @@ int main(int argc, char *argv[]) // Set up perf buffer pb = perf_buffer__new(bpf_object__find_map_fd_by_name(obj, - config.rtt_map), + config.event_map), PERF_BUFFER_PAGES, &pb_opts); err = libbpf_get_error(pb); if (err) { pb = NULL; fprintf(stderr, "Failed to open perf buffer %s: %s\n", - config.rtt_map, strerror(err)); + config.event_map, strerror(err)); goto cleanup; } diff --git a/pping/pping.h b/pping/pping.h index 7f078c2..d77ddd1 100644 --- a/pping/pping.h +++ b/pping/pping.h @@ -4,10 +4,31 @@ #include #include +#include #define INGRESS_PROG_SEC "xdp" #define EGRESS_PROG_SEC "classifier" +/* For the event_type members of rtt_event and flow_event */ +#define EVENT_TYPE_FLOW 1 +#define EVENT_TYPE_RTT 2 + +enum __attribute__((__packed__)) flow_event_type { + FLOW_EVENT_UNSPECIFIED, + FLOW_EVENT_OPENING, + FLOW_EVENT_CLOSING +}; + +enum __attribute__ ((__packed__)) flow_event_reason { + EVENT_REASON_SYN, + EVENT_REASON_SYN_ACK, + EVENT_REASON_FIRST_OBS_PCKT, + EVENT_REASON_FIN, + EVENT_REASON_FIN_ACK, + EVENT_REASON_RST, + EVENT_REASON_FLOW_TIMEOUT +}; + struct bpf_config { __u64 rate_limit; }; @@ -54,16 +75,42 @@ struct packet_id { __u32 identifier; //tsval for TCP packets }; +/* + * An RTT event message that can be passed from the bpf-programs to user-space. + * The initial event_type memeber is used to allow multiplexing between + * different event types in a single perf buffer. Memebers up to and including + * flow are identical to other event types. + * Uses explicit padding instead of packing based on recommendations in cilium's + * BPF reference documentation at https://docs.cilium.io/en/stable/bpf/#llvm. + */ struct rtt_event { + __u64 event_type; + __u64 timestamp; + struct network_tuple flow; + __u32 padding; __u64 rtt; __u64 min_rtt; __u64 sent_pkts; __u64 sent_bytes; __u64 rec_pkts; __u64 rec_bytes; - __u64 timestamp; - struct network_tuple flow; __u32 reserved; }; +/* + * A flow event message that can be passed from the bpf-programs to user-space. + * The initial event_type memeber is used to allow multiplexing between + * different event types in a single perf buffer. Memebers up to and including + * flow are identical to other event types. + */ +struct flow_event { + __u64 event_type; + __u64 timestamp; + struct network_tuple flow; + enum flow_event_type event; + enum flow_event_reason reason; + bool from_egress; + __u8 reserved; +}; + #endif diff --git a/pping/pping_kern.c b/pping/pping_kern.c index 9f9edb9..ee741b9 100644 --- a/pping/pping_kern.c +++ b/pping/pping_kern.c @@ -60,7 +60,7 @@ struct { __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); __uint(key_size, sizeof(__u32)); __uint(value_size, sizeof(__u32)); -} rtt_events SEC(".maps"); +} events SEC(".maps"); // Help functions @@ -137,7 +137,7 @@ static int parse_tcp_ts(struct tcphdr *tcph, void *data_end, __u32 *tsval, * flow_closing to true. */ static int parse_tcp_identifier(struct parsing_context *ctx, __be16 *sport, - __be16 *dport, bool *flow_closing, + __be16 *dport, struct flow_event *fe, __u32 *identifier) { __u32 tsval, tsecr; @@ -146,8 +146,24 @@ static int parse_tcp_identifier(struct parsing_context *ctx, __be16 *sport, if (parse_tcphdr(&ctx->nh, ctx->data_end, &tcph) < 0) return -1; + // Check if connection is opening + if (tcph->syn) { + fe->event = FLOW_EVENT_OPENING; + fe->reason = tcph->ack ? EVENT_REASON_SYN_ACK : EVENT_REASON_SYN; + } // Check if connection is closing - *flow_closing = tcph->rst || (!ctx->is_egress && tcph->fin); + if (tcph->rst) { + //bpf_printk("RST from %d\n", ctx->is_egress); + fe->event = FLOW_EVENT_CLOSING; + fe->reason = EVENT_REASON_RST; + } + else if (!ctx->is_egress && tcph->fin) { + //bpf_printk("Fin from %d\n", ctx->is_egress); + fe->event = FLOW_EVENT_CLOSING; + fe->reason = tcph->ack ? EVENT_REASON_FIN_ACK : EVENT_REASON_FIN; + } + if (tcph->rst || (!ctx->is_egress && tcph->fin)) { + } // Do not timestamp pure ACKs if (ctx->is_egress && ctx->nh.pos - ctx->data >= ctx->pkt_len && @@ -175,7 +191,8 @@ static int parse_tcp_identifier(struct parsing_context *ctx, __be16 *sport, * set to the identifier of a response. */ static int parse_packet_identifier(struct parsing_context *ctx, - struct packet_id *p_id, bool *flow_closing) + struct packet_id *p_id, + struct flow_event *fe) { int proto, err; struct ethhdr *eth; @@ -208,7 +225,7 @@ static int parse_packet_identifier(struct parsing_context *ctx, // Add new protocols here if (p_id->flow.proto == IPPROTO_TCP) { err = parse_tcp_identifier(ctx, &saddr->port, &daddr->port, - flow_closing, &p_id->identifier); + fe, &p_id->identifier); if (err) return -1; } else { @@ -237,6 +254,21 @@ static __u32 remaining_pkt_payload(struct parsing_context *ctx) return parsed_bytes < ctx->pkt_len ? ctx->pkt_len - parsed_bytes : 0; } +/* + * Fills in event_type, timestamp, flow, from_egress and reserved. + * The members event and reason are assumed to have been set already (by + * parse_packet_identifier). + */ +static void fill_flow_event(struct flow_event *fe, __u64 timestamp, + struct network_tuple *flow, bool is_egress) +{ + fe->event_type = EVENT_TYPE_FLOW; + fe->timestamp = timestamp; + __builtin_memcpy(&fe->flow, flow, sizeof(struct network_tuple)); + fe->from_egress = is_egress; + fe->reserved = 0; // Make sure it's initilized +} + // Programs // TC-BFP for parsing packet identifier from egress traffic and add to map @@ -244,7 +276,8 @@ SEC(EGRESS_PROG_SEC) int pping_egress(struct __sk_buff *skb) { struct packet_id p_id = { 0 }; - __u64 p_ts; + struct flow_event fe = { .event = FLOW_EVENT_UNSPECIFIED }; + __u64 now; struct parsing_context pctx = { .data = (void *)(long)skb->data, .data_end = (void *)(long)skb->data_end, @@ -252,27 +285,42 @@ int pping_egress(struct __sk_buff *skb) .nh = { .pos = pctx.data }, .is_egress = true, }; - bool flow_closing = false; struct flow_state *f_state; struct flow_state new_state = { 0 }; - if (parse_packet_identifier(&pctx, &p_id, &flow_closing) < 0) + if (parse_packet_identifier(&pctx, &p_id, &fe) < 0) goto out; - // Delete flow and create no timestamp entry if flow is closing - if (flow_closing) { - bpf_map_delete_elem(&flow_state, &p_id.flow); + now = bpf_ktime_get_ns(); // or bpf_ktime_get_boot_ns + f_state = bpf_map_lookup_elem(&flow_state, &p_id.flow); + + // Flow closing - try to delete flow state and push closing-event + if (fe.event == FLOW_EVENT_CLOSING) { + if (!f_state) { + bpf_map_delete_elem(&flow_state, &p_id.flow); + fill_flow_event(&fe, now, &p_id.flow, true); + bpf_perf_event_output(skb, &events, BPF_F_CURRENT_CPU, + &fe, sizeof(fe)); + } goto out; } - // Check flow state - f_state = bpf_map_lookup_elem(&flow_state, &p_id.flow); - if (!f_state) { // No previous state - attempt to create it + // No previous state - attempt to create it and push flow-opening event + if (!f_state) { bpf_map_update_elem(&flow_state, &p_id.flow, &new_state, BPF_NOEXIST); f_state = bpf_map_lookup_elem(&flow_state, &p_id.flow); - if (!f_state) + + if (!f_state) // Creation failed goto out; + + if (fe.event != FLOW_EVENT_OPENING) { + fe.event = FLOW_EVENT_OPENING; + fe.reason = EVENT_REASON_FIRST_OBS_PCKT; + } + fill_flow_event(&fe, now, &p_id.flow, true); + bpf_perf_event_output(skb, &events, BPF_F_CURRENT_CPU, &fe, + sizeof(fe)); } f_state->sent_pkts++; @@ -284,9 +332,8 @@ int pping_egress(struct __sk_buff *skb) f_state->last_id = p_id.identifier; // Check rate-limit - p_ts = bpf_ktime_get_ns(); // or bpf_ktime_get_boot_ns - if (p_ts < f_state->last_timestamp || - p_ts - f_state->last_timestamp < config.rate_limit) + if (now < f_state->last_timestamp || + now - f_state->last_timestamp < config.rate_limit) goto out; /* @@ -295,8 +342,8 @@ int pping_egress(struct __sk_buff *skb) * the next available map slot somewhat fairer between heavy and sparse * flows. */ - f_state->last_timestamp = p_ts; - bpf_map_update_elem(&packet_ts, &p_id, &p_ts, BPF_NOEXIST); + f_state->last_timestamp = now; + bpf_map_update_elem(&packet_ts, &p_id, &now, BPF_NOEXIST); out: return BPF_OK; @@ -308,7 +355,8 @@ int pping_ingress(struct xdp_md *ctx) { struct packet_id p_id = { 0 }; __u64 *p_ts; - struct rtt_event event = { 0 }; + struct flow_event fe = { .event = FLOW_EVENT_UNSPECIFIED }; + struct rtt_event re = { 0 }; struct flow_state *f_state; struct parsing_context pctx = { .data = (void *)(long)ctx->data, @@ -317,15 +365,14 @@ int pping_ingress(struct xdp_md *ctx) .nh = { .pos = pctx.data }, .is_egress = false, }; - bool flow_closing = false; __u64 now; - if (parse_packet_identifier(&pctx, &p_id, &flow_closing) < 0) + if (parse_packet_identifier(&pctx, &p_id, &fe) < 0) goto out; f_state = bpf_map_lookup_elem(&flow_state, &p_id.flow); if (!f_state) - goto validflow_out; + goto out; f_state->rec_pkts++; f_state->rec_bytes += remaining_pkt_payload(&pctx); @@ -335,30 +382,34 @@ int pping_ingress(struct xdp_md *ctx) if (!p_ts || now < *p_ts) goto validflow_out; - event.rtt = now - *p_ts; - event.timestamp = now; + re.rtt = now - *p_ts; // Delete timestamp entry as soon as RTT is calculated bpf_map_delete_elem(&packet_ts, &p_id); - if (f_state->min_rtt == 0 || event.rtt < f_state->min_rtt) - f_state->min_rtt = event.rtt; + if (f_state->min_rtt == 0 || re.rtt < f_state->min_rtt) + f_state->min_rtt = re.rtt; - event.min_rtt = f_state->min_rtt; - event.sent_pkts = f_state->sent_pkts; - event.sent_bytes = f_state->sent_bytes; - event.rec_pkts = f_state->rec_pkts; - event.rec_bytes = f_state->rec_bytes; + re.event_type = EVENT_TYPE_RTT; + re.timestamp = now; + re.min_rtt = f_state->min_rtt; + re.sent_pkts = f_state->sent_pkts; + re.sent_bytes = f_state->sent_bytes; + re.rec_pkts = f_state->rec_pkts; + re.rec_bytes = f_state->rec_bytes; // Push event to perf-buffer - __builtin_memcpy(&event.flow, &p_id.flow, sizeof(struct network_tuple)); - bpf_perf_event_output(ctx, &rtt_events, BPF_F_CURRENT_CPU, &event, - sizeof(event)); + __builtin_memcpy(&re.flow, &p_id.flow, sizeof(struct network_tuple)); + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &re, sizeof(re)); validflow_out: // Wait with deleting flow until having pushed final RTT message - if (flow_closing) + if (fe.event == FLOW_EVENT_CLOSING && f_state) { bpf_map_delete_elem(&flow_state, &p_id.flow); + fill_flow_event(&fe, now, &p_id.flow, false); + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &fe, + sizeof(fe)); + } out: return XDP_PASS;