diff --git a/pping/pping.c b/pping/pping.c index bcf59e4..ffd74c0 100644 --- a/pping/pping.c +++ b/pping/pping.c @@ -143,7 +143,7 @@ static const char *get_libbpf_strerror(int err) } static int parse_bounded_double(double *res, const char *str, double low, - double high, const char *name) + double high, const char *name) { char *endptr; *res = strtod(str, &endptr); @@ -170,8 +170,8 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config) config->bpf_config.track_tcp = false; config->bpf_config.track_icmp = false; - while ((opt = getopt_long(argc, argv, "hflTCi:r:R:t:c:F:I:", long_options, - NULL)) != -1) { + while ((opt = getopt_long(argc, argv, "hflTCi:r:R:t:c:F:I:", + long_options, NULL)) != -1) { switch (opt) { case 'i': if (strlen(optarg) > IF_NAMESIZE) { @@ -185,7 +185,8 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config) err = -errno; fprintf(stderr, "Could not get index of interface %s: %s\n", - config->ifname, get_libbpf_strerror(err)); + config->ifname, + get_libbpf_strerror(err)); return err; } break; @@ -210,8 +211,7 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config) case 't': if (strcmp(optarg, "min") == 0) { config->bpf_config.use_srtt = false; - } - else if (strcmp(optarg, "smoothed") == 0) { + } else if (strcmp(optarg, "smoothed") == 0) { config->bpf_config.use_srtt = true; } else { fprintf(stderr, @@ -237,7 +237,8 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config) } else if (strcmp(optarg, "ppviz") == 0) { config->output_format = PPING_OUTPUT_PPVIZ; } else { - fprintf(stderr, "format must be \"standard\", \"json\" or \"ppviz\"\n"); + fprintf(stderr, + "format must be \"standard\", \"json\" or \"ppviz\"\n"); return -EINVAL; } break; @@ -247,7 +248,8 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config) } else if (strcmp(optarg, "tc") == 0) { config->ingress_prog = "pping_tc_ingress"; } else { - fprintf(stderr, "ingress-hook must be \"xdp\" or \"tc\"\n"); + fprintf(stderr, + "ingress-hook must be \"xdp\" or \"tc\"\n"); return -EINVAL; } break; @@ -293,10 +295,14 @@ const char *tracked_protocols_to_str(struct pping_config *config) const char *output_format_to_str(enum PPING_OUTPUT_FORMAT format) { switch (format) { - case PPING_OUTPUT_STANDARD: return "standard"; - case PPING_OUTPUT_JSON: return "json"; - case PPING_OUTPUT_PPVIZ: return "ppviz"; - default: return "unkown format"; + case PPING_OUTPUT_STANDARD: + return "standard"; + case PPING_OUTPUT_JSON: + return "json"; + case PPING_OUTPUT_PPVIZ: + return "ppviz"; + default: + return "unkown format"; } } @@ -599,8 +605,8 @@ static int format_ip_address(char *buf, size_t size, int af, const struct in6_addr *addr) { if (af == AF_INET) - return inet_ntop(af, &addr->s6_addr[12], - buf, size) ? -errno : 0; + return inet_ntop(af, &addr->s6_addr[12], buf, size) ? -errno : + 0; else if (af == AF_INET6) return inet_ntop(af, addr, buf, size) ? -errno : 0; return -EINVAL; @@ -641,6 +647,8 @@ static const char *flowevent_to_str(enum flow_event_type fe) static const char *eventreason_to_str(enum flow_event_reason er) { switch (er) { + case EVENT_REASON_NONE: + return "none"; case EVENT_REASON_SYN: return "SYN"; case EVENT_REASON_SYN_ACK: @@ -672,7 +680,8 @@ static const char *eventsource_to_str(enum flow_event_source es) } } -static void print_flow_ppvizformat(FILE *stream, const struct network_tuple *flow) +static void print_flow_ppvizformat(FILE *stream, + const struct network_tuple *flow) { char saddr[INET6_ADDRSTRLEN]; char daddr[INET6_ADDRSTRLEN]; @@ -727,7 +736,8 @@ static void print_event_ppviz(const union pping_event *e) printf("%llu.%09llu %llu.%09llu %llu.%09llu ", time / NS_PER_SECOND, time % NS_PER_SECOND, re->rtt / NS_PER_SECOND, - re->rtt % NS_PER_SECOND, re->min_rtt / NS_PER_SECOND, re->min_rtt); + re->rtt % NS_PER_SECOND, re->min_rtt / NS_PER_SECOND, + re->min_rtt); print_flow_ppvizformat(stdout, &re->flow); printf("\n"); } @@ -768,8 +778,7 @@ static void print_flowevent_fields_json(json_writer_t *ctx, { jsonw_string_field(ctx, "flow_event", flowevent_to_str(fe->flow_event_type)); - jsonw_string_field(ctx, "reason", - eventreason_to_str(fe->reason)); + jsonw_string_field(ctx, "reason", eventreason_to_str(fe->reason)); jsonw_string_field(ctx, "triggered_by", eventsource_to_str(fe->source)); } diff --git a/pping/pping.h b/pping/pping.h index e923f45..21ac3fb 100644 --- a/pping/pping.h +++ b/pping/pping.h @@ -9,7 +9,7 @@ #define NS_PER_SECOND 1000000000UL #define NS_PER_MS 1000000UL #define MS_PER_S 1000UL -#define S_PER_DAY (24*3600UL) +#define S_PER_DAY (24 * 3600UL) typedef __u64 fixpoint64; #define FIXPOINT_SHIFT 16 @@ -30,6 +30,7 @@ enum __attribute__((__packed__)) flow_event_type { }; enum __attribute__((__packed__)) flow_event_reason { + EVENT_REASON_NONE, EVENT_REASON_SYN, EVENT_REASON_SYN_ACK, EVENT_REASON_FIRST_OBS_PCKT, @@ -49,6 +50,13 @@ enum __attribute__((__packed__)) pping_map { PPING_MAP_PACKETTS }; +enum __attribute__((__packed__)) connection_state { + CONNECTION_STATE_EMPTY, + CONNECTION_STATE_WAITOPEN, + CONNECTION_STATE_OPEN, + CONNECTION_STATE_CLOSED +}; + struct bpf_config { __u64 rate_limit; fixpoint64 rtt_rate; @@ -95,11 +103,22 @@ struct flow_state { __u64 rec_bytes; __u32 last_id; __u32 outstanding_timestamps; - bool has_opened; + enum connection_state conn_state; enum flow_event_reason opening_reason; __u8 reserved[6]; }; +/* + * Stores flowstate for both direction (src -> dst and dst -> src) of a flow + * + * Uses two named members instead of array of size 2 to avoid hassels with + * convincing verifier that member access is not out of bounds + */ +struct dual_flow_state { + struct flow_state dir1; + struct flow_state dir2; +}; + struct packet_id { struct network_tuple flow; __u32 identifier; //tsval for TCP packets diff --git a/pping/pping_kern.c b/pping/pping_kern.c index c391b10..1dcb369 100644 --- a/pping/pping_kern.c +++ b/pping/pping_kern.c @@ -39,6 +39,8 @@ #define ICMP_FLOW_LIFETIME (30 * NS_PER_SECOND) // Clear any ICMP flows if they're inactive this long #define UNOPENED_FLOW_LIFETIME (30 * NS_PER_SECOND) // Clear out flows that have not seen a response after this long +#define MAX_MEMCMP_SIZE 128 + /* * Structs for map iteration programs * Copied from /tools/testing/selftest/bpf/progs/bpf_iter.h @@ -95,14 +97,27 @@ struct packet_info { }; __u64 time; // Arrival time of packet __u32 payload; // Size of packet data (excluding headers) - struct packet_id pid; // identifier to timestamp (ex. TSval) - struct packet_id reply_pid; // identifier to match against (ex. TSecr) + struct packet_id pid; // flow + identifier to timestamp (ex. TSval) + struct packet_id reply_pid; // rev. flow + identifier to match against (ex. TSecr) + bool pid_flow_is_dfkey; // Used to determine which member of dualflow state to use for forward direction bool pid_valid; // identifier can be used to timestamp packet bool reply_pid_valid; // reply_identifier can be used to match packet enum flow_event_type event_type; // flow event triggered by packet enum flow_event_reason event_reason; // reason for triggering flow event }; +/* + * Struct filled in by protocol id parsers (ex. parse_tcp_identifier) + */ +struct protocol_info { + __u32 pid; + __u32 reply_pid; + bool pid_valid; + bool reply_pid_valid; + enum flow_event_type event_type; + enum flow_event_reason event_reason; +}; + char _license[] SEC("license") = "GPL"; // Global config struct - set from userspace static volatile const struct bpf_config config = {}; @@ -120,7 +135,7 @@ struct { struct { __uint(type, BPF_MAP_TYPE_HASH); __type(key, struct network_tuple); - __type(value, struct flow_state); + __type(value, struct dual_flow_state); __uint(max_entries, 16384); } flow_state SEC(".maps"); @@ -169,6 +184,83 @@ static void reverse_flow(struct network_tuple *dest, struct network_tuple *src) dest->reserved = 0; } +/* + * Can't seem to get __builtin_memcmp to work, so hacking my own + * + * Based on https://githubhot.com/repo/iovisor/bcc/issues/3559, + * __builtin_memcmp should work constant size but I still get the "failed to + * find BTF for extern" error. + */ +static int my_memcmp(const void *s1_, const void *s2_, __u32 size) +{ + const __u8 *s1 = s1_, *s2 = s2_; + int i; + + for (i = 0; i < MAX_MEMCMP_SIZE && i < size; i++) { + if (s1[i] != s2[i]) + return s1[i] > s2[i] ? 1 : -1; + } + + return 0; +} + +static bool is_dualflow_key(struct network_tuple *flow) +{ + return my_memcmp(&flow->saddr, &flow->daddr, sizeof(flow->saddr)) <= 0; +} + +static void make_dualflow_key(struct network_tuple *key, + struct network_tuple *flow) +{ + if (is_dualflow_key(flow)) + *key = *flow; + else + reverse_flow(key, flow); +} + +static struct flow_state *fstate_from_dfkey(struct dual_flow_state *df_state, + bool is_dfkey) +{ + if (!df_state) + return NULL; + + return is_dfkey ? &df_state->dir1 : &df_state->dir2; +} + +/* + * Get the flow state for flow-direction from df_state + * + * Note: Does not validate that any of the entries in df_state actually matches + * flow, just selects the direction in df_state that best fits the flow. + */ +static struct flow_state * +get_flowstate_from_dualflow(struct dual_flow_state *df_state, + struct network_tuple *flow) +{ + return fstate_from_dfkey(df_state, is_dualflow_key(flow)); +} + +static struct flow_state * +get_flowstate_from_packet(struct dual_flow_state *df_state, + struct packet_info *p_info) +{ + return fstate_from_dfkey(df_state, p_info->pid_flow_is_dfkey); +} + +static struct flow_state * +get_reverse_flowstate_from_packet(struct dual_flow_state *df_state, + struct packet_info *p_info) +{ + return fstate_from_dfkey(df_state, !p_info->pid_flow_is_dfkey); +} + +static struct network_tuple * +get_dualflow_key_from_packet(struct packet_info *p_info) +{ + return p_info->pid_flow_is_dfkey ? &p_info->pid.flow : + &p_info->reply_pid.flow; +} + /* * Parses the TSval and TSecr values from the TCP options field. If sucessful * the TSval and TSecr values will be stored at tsval and tsecr (in network @@ -230,47 +322,49 @@ static int parse_tcp_ts(struct tcphdr *tcph, void *data_end, __u32 *tsval, * Will use the TSval as pid and TSecr as reply_pid, and the TCP source and dest * as port numbers. * - * If successful, the pid (identifer + flow.port), reply_pid, pid_valid, - * reply_pid_valid, event_type and event_reason members of p_info will be set + * If successful, tcph, sport, dport and proto_info will be set * appropriately and 0 will be returned. - * On failure -1 will be returned (no guarantees on values set in p_info). + * On failure -1 will be returned (and arguments will not be set). */ static int parse_tcp_identifier(struct parsing_context *pctx, - struct packet_info *p_info) + struct tcphdr **tcph, __u16 *sport, + __u16 *dport, struct protocol_info *proto_info) { - if (parse_tcphdr(&pctx->nh, pctx->data_end, &p_info->tcph) < 0) + struct tcphdr *hdr; + if (parse_tcphdr(&pctx->nh, pctx->data_end, &hdr) < 0) return -1; - if (parse_tcp_ts(p_info->tcph, pctx->data_end, &p_info->pid.identifier, - &p_info->reply_pid.identifier) < 0) + if (parse_tcp_ts(hdr, pctx->data_end, &proto_info->pid, + &proto_info->reply_pid) < 0) return -1; //Possible TODO, fall back on seq/ack instead - p_info->pid.flow.saddr.port = p_info->tcph->source; - p_info->pid.flow.daddr.port = p_info->tcph->dest; - // Do not timestamp pure ACKs (no payload) - p_info->pid_valid = - pctx->nh.pos - pctx->data < pctx->pkt_len || p_info->tcph->syn; + proto_info->pid_valid = + pctx->nh.pos - pctx->data < pctx->pkt_len || hdr->syn; // Do not match on non-ACKs (TSecr not valid) - p_info->reply_pid_valid = p_info->tcph->ack; + proto_info->reply_pid_valid = hdr->ack; // Check if connection is opening/closing - if (p_info->tcph->rst) { - p_info->event_type = FLOW_EVENT_CLOSING_BOTH; - p_info->event_reason = EVENT_REASON_RST; - } else if (p_info->tcph->fin) { - p_info->event_type = FLOW_EVENT_CLOSING; - p_info->event_reason = EVENT_REASON_FIN; - } else if (p_info->tcph->syn) { - p_info->event_type = FLOW_EVENT_OPENING; - p_info->event_reason = p_info->tcph->ack ? - EVENT_REASON_SYN_ACK : - EVENT_REASON_SYN; + if (hdr->rst) { + proto_info->event_type = FLOW_EVENT_CLOSING_BOTH; + proto_info->event_reason = EVENT_REASON_RST; + } else if (hdr->fin) { + proto_info->event_type = FLOW_EVENT_CLOSING; + proto_info->event_reason = EVENT_REASON_FIN; + } else if (hdr->syn) { + proto_info->event_type = FLOW_EVENT_OPENING; + proto_info->event_reason = + hdr->ack ? EVENT_REASON_SYN_ACK : EVENT_REASON_SYN; } else { - p_info->event_type = FLOW_EVENT_NONE; + proto_info->event_type = FLOW_EVENT_NONE; + proto_info->event_reason = EVENT_REASON_NONE; } + *sport = hdr->source; + *dport = hdr->dest; + *tcph = hdr; + return 0; } @@ -279,41 +373,48 @@ static int parse_tcp_identifier(struct parsing_context *pctx, * request/reply sequence number. * * Will use the echo sequence number as pid/reply_pid and the echo identifier - * as port numbers. Echo requests will only generate a valid pid and echo - * replies will only generate a valid reply_pid. + * as both src and dst port numbers. Echo requests will only generate a valid + * pid and echo replies will only generate a valid reply_pid. * - * If successful, the pid (identifier + flow.port), reply_pid, pid_valid, - * reply pid_valid and event_type of p_info will be set appropriately and 0 - * will be returned. - * On failure, -1 will be returned (no guarantees on p_info members). + * If successful, icmp6h, sport, dport and proto_info will be set appropriately + * and 0 will be returned. + * On failure, -1 will be returned (and arguments will not be set). * * Note: Will store the 16-bit sequence number in network byte order - * in the 32-bit (reply_)pid.identifier. + * in the 32-bit proto_info->(reply_)pid. */ static int parse_icmp6_identifier(struct parsing_context *pctx, - struct packet_info *p_info) + struct icmp6hdr **icmp6h, __u16 *sport, + __u16 *dport, + struct protocol_info *proto_info) { - if (parse_icmp6hdr(&pctx->nh, pctx->data_end, &p_info->icmp6h) < 0) + struct icmp6hdr *hdr; + if (parse_icmp6hdr(&pctx->nh, pctx->data_end, &hdr) < 0) return -1; - if (p_info->icmp6h->icmp6_code != 0) + if (hdr->icmp6_code != 0) return -1; - if (p_info->icmp6h->icmp6_type == ICMPV6_ECHO_REQUEST) { - p_info->pid.identifier = p_info->icmp6h->icmp6_sequence; - p_info->pid_valid = true; - p_info->reply_pid_valid = false; - } else if (p_info->icmp6h->icmp6_type == ICMPV6_ECHO_REPLY) { - p_info->reply_pid.identifier = p_info->icmp6h->icmp6_sequence; - p_info->reply_pid_valid = true; - p_info->pid_valid = false; + if (hdr->icmp6_type == ICMPV6_ECHO_REQUEST) { + proto_info->pid = hdr->icmp6_sequence; + proto_info->pid_valid = true; + proto_info->reply_pid = 0; + proto_info->reply_pid_valid = false; + } else if (hdr->icmp6_type == ICMPV6_ECHO_REPLY) { + proto_info->reply_pid = hdr->icmp6_sequence; + proto_info->reply_pid_valid = true; + proto_info->pid = 0; + proto_info->pid_valid = false; } else { return -1; } - p_info->event_type = FLOW_EVENT_NONE; - p_info->pid.flow.saddr.port = p_info->icmp6h->icmp6_identifier; - p_info->pid.flow.daddr.port = p_info->pid.flow.saddr.port; + proto_info->event_type = FLOW_EVENT_NONE; + proto_info->event_reason = EVENT_REASON_NONE; + *sport = hdr->icmp6_identifier; + *dport = hdr->icmp6_identifier; + *icmp6h = hdr; + return 0; } @@ -321,29 +422,36 @@ static int parse_icmp6_identifier(struct parsing_context *pctx, * Same as parse_icmp6_identifier, but for an ICMP(v4) header instead. */ static int parse_icmp_identifier(struct parsing_context *pctx, - struct packet_info *p_info) + struct icmphdr **icmph, __u16 *sport, + __u16 *dport, struct protocol_info *proto_info) { - if (parse_icmphdr(&pctx->nh, pctx->data_end, &p_info->icmph) < 0) + struct icmphdr *hdr; + if (parse_icmphdr(&pctx->nh, pctx->data_end, &hdr) < 0) return -1; - if (p_info->icmph->code != 0) + if (hdr->code != 0) return -1; - if (p_info->icmph->type == ICMP_ECHO) { - p_info->pid.identifier = p_info->icmph->un.echo.sequence; - p_info->pid_valid = true; - p_info->reply_pid_valid = false; - } else if (p_info->icmph->type == ICMP_ECHOREPLY) { - p_info->reply_pid.identifier = p_info->icmph->un.echo.sequence; - p_info->reply_pid_valid = true; - p_info->pid_valid = false; + if (hdr->type == ICMP_ECHO) { + proto_info->pid = hdr->un.echo.sequence; + proto_info->pid_valid = true; + proto_info->reply_pid = 0; + proto_info->reply_pid_valid = false; + } else if (hdr->type == ICMP_ECHOREPLY) { + proto_info->reply_pid = hdr->un.echo.sequence; + proto_info->reply_pid_valid = true; + proto_info->pid = 0; + proto_info->pid_valid = false; } else { return -1; } - p_info->event_type = FLOW_EVENT_NONE; - p_info->pid.flow.saddr.port = p_info->icmph->un.echo.id; - p_info->pid.flow.daddr.port = p_info->pid.flow.saddr.port; + proto_info->event_type = FLOW_EVENT_NONE; + proto_info->event_reason = EVENT_REASON_NONE; + *sport = hdr->un.echo.id; + *dport = hdr->un.echo.id; + *icmph = hdr; + return 0; } @@ -360,6 +468,7 @@ static int parse_packet_identifier(struct parsing_context *pctx, { int proto, err; struct ethhdr *eth; + struct protocol_info proto_info; p_info->time = bpf_ktime_get_ns(); proto = parse_ethhdr(&pctx->nh, pctx->data_end, ð); @@ -379,20 +488,36 @@ static int parse_packet_identifier(struct parsing_context *pctx, // Parse identifer from suitable protocol if (config.track_tcp && p_info->pid.flow.proto == IPPROTO_TCP) - err = parse_tcp_identifier(pctx, p_info); + err = parse_tcp_identifier(pctx, &p_info->tcph, + &p_info->pid.flow.saddr.port, + &p_info->pid.flow.daddr.port, + &proto_info); else if (config.track_icmp && p_info->pid.flow.proto == IPPROTO_ICMPV6 && p_info->pid.flow.ipv == AF_INET6) - err = parse_icmp6_identifier(pctx, p_info); + err = parse_icmp6_identifier(pctx, &p_info->icmp6h, + &p_info->pid.flow.saddr.port, + &p_info->pid.flow.daddr.port, + &proto_info); else if (config.track_icmp && p_info->pid.flow.proto == IPPROTO_ICMP && p_info->pid.flow.ipv == AF_INET) - err = parse_icmp_identifier(pctx, p_info); + err = parse_icmp_identifier(pctx, &p_info->icmph, + &p_info->pid.flow.saddr.port, + &p_info->pid.flow.daddr.port, + &proto_info); else return -1; // No matching protocol if (err) return -1; // Failed parsing protocol - // Sucessfully parsed packet identifier - fill in IP-addresses and return + // Sucessfully parsed packet identifier - fill in remaining members and return + p_info->pid.identifier = proto_info.pid; + p_info->pid_valid = proto_info.pid_valid; + p_info->reply_pid.identifier = proto_info.reply_pid; + p_info->reply_pid_valid = proto_info.reply_pid_valid; + p_info->event_type = proto_info.event_type; + p_info->event_reason = proto_info.event_reason; + if (p_info->pid.flow.ipv == AF_INET) { map_ipv4_to_ipv6(&p_info->pid.flow.saddr.ip, p_info->iph->saddr); @@ -403,6 +528,8 @@ static int parse_packet_identifier(struct parsing_context *pctx, p_info->pid.flow.daddr.ip = p_info->ip6h->daddr; } + p_info->pid_flow_is_dfkey = is_dualflow_key(&p_info->pid.flow); + reverse_flow(&p_info->reply_pid.flow, &p_info->pid.flow); p_info->payload = remaining_pkt_payload(pctx); @@ -512,101 +639,150 @@ static void send_map_full_event(void *ctx, struct packet_info *p_info, } /* - * Attempt to create a new flow-state. - * Returns a pointer to the flow_state if successful, NULL otherwise + * Initilizes an "empty" flow state based on the forward direction of the + * current packet */ -static struct flow_state *create_flow(void *ctx, struct packet_info *p_info) +static void init_flowstate(struct flow_state *f_state, + struct packet_info *p_info) { - struct flow_state new_state = { 0 }; + f_state->conn_state = CONNECTION_STATE_WAITOPEN; + f_state->last_timestamp = p_info->time; + f_state->opening_reason = p_info->event_type == FLOW_EVENT_OPENING ? + p_info->event_reason : + EVENT_REASON_FIRST_OBS_PCKT; +} - new_state.last_timestamp = p_info->time; - new_state.opening_reason = p_info->event_type == FLOW_EVENT_OPENING ? - p_info->event_reason : - EVENT_REASON_FIRST_OBS_PCKT; +static void init_empty_flowstate(struct flow_state *f_state) +{ + f_state->conn_state = CONNECTION_STATE_EMPTY; +} - if (bpf_map_update_elem(&flow_state, &p_info->pid.flow, &new_state, - BPF_NOEXIST) != 0) { +/* + * Initilize a new (assumed 0-initlized) dual flow state based on the current + * packet. + */ +static void init_dualflow_state(struct dual_flow_state *df_state, + struct packet_info *p_info) +{ + struct flow_state *fw_state = + get_flowstate_from_packet(df_state, p_info); + struct flow_state *rev_state = + get_reverse_flowstate_from_packet(df_state, p_info); + + init_flowstate(fw_state, p_info); + init_empty_flowstate(rev_state); +} + +static struct dual_flow_state * +create_dualflow_state(void *ctx, struct packet_info *p_info, bool *new_flow) +{ + struct network_tuple *key = get_dualflow_key_from_packet(p_info); + struct dual_flow_state new_state = { 0 }; + + init_dualflow_state(&new_state, p_info); + + if (bpf_map_update_elem(&flow_state, key, &new_state, BPF_NOEXIST) == + 0) { + if (new_flow) + *new_flow = true; + } else { send_map_full_event(ctx, p_info, PPING_MAP_FLOWSTATE); return NULL; } - return bpf_map_lookup_elem(&flow_state, &p_info->pid.flow); + return bpf_map_lookup_elem(&flow_state, key); } -static struct flow_state *update_flow(void *ctx, struct packet_info *p_info, - bool *new_flow) +static struct dual_flow_state * +lookup_or_create_dualflow_state(void *ctx, struct packet_info *p_info, + bool *new_flow) { - struct flow_state *f_state; - *new_flow = false; + struct dual_flow_state *df_state; - f_state = bpf_map_lookup_elem(&flow_state, &p_info->pid.flow); + df_state = bpf_map_lookup_elem(&flow_state, + get_dualflow_key_from_packet(p_info)); - // Attempt to create flow if it does not exist - if (!f_state && p_info->pid_valid && - !(p_info->event_type == FLOW_EVENT_CLOSING || - p_info->event_type == FLOW_EVENT_CLOSING_BOTH)) { - *new_flow = true; - f_state = create_flow(ctx, p_info); + if (df_state) + return df_state; + + // Only try to create new state if we have a valid pid + if (!p_info->pid_valid || p_info->event_type == FLOW_EVENT_CLOSING || + p_info->event_type == FLOW_EVENT_CLOSING_BOTH) + return NULL; + + return create_dualflow_state(ctx, p_info, new_flow); +} + +static bool is_flowstate_active(struct flow_state *f_state) +{ + return f_state->conn_state != CONNECTION_STATE_EMPTY && + f_state->conn_state != CONNECTION_STATE_CLOSED; +} + +static void update_forward_flowstate(struct packet_info *p_info, + struct flow_state *f_state, bool *new_flow) +{ + // "Create" flowstate if it's empty + if (f_state->conn_state == CONNECTION_STATE_EMPTY && + p_info->pid_valid) { + init_flowstate(f_state, p_info); + if (new_flow) + *new_flow = true; } - if (!f_state) - return NULL; - - // Update flow state - f_state->sent_pkts++; - f_state->sent_bytes += p_info->payload; - - return f_state; + if (is_flowstate_active(f_state)) { + f_state->sent_pkts++; + f_state->sent_bytes += p_info->payload; + } } -static struct flow_state *update_rev_flow(void *ctx, struct packet_info *p_info) +static void update_reverse_flowstate(void *ctx, struct packet_info *p_info, + struct flow_state *f_state) { - struct flow_state *f_state; + if (!is_flowstate_active(f_state)) + return; - f_state = bpf_map_lookup_elem(&flow_state, &p_info->reply_pid.flow); - if (!f_state) - return NULL; - - // Is a new flow, push opening flow message - if (!f_state->has_opened && + // First time we see reply for flow? + if (f_state->conn_state == CONNECTION_STATE_WAITOPEN && p_info->event_type != FLOW_EVENT_CLOSING_BOTH) { - f_state->has_opened = true; + f_state->conn_state = CONNECTION_STATE_OPEN; send_flow_open_event(ctx, p_info, f_state); } - // Update flow state f_state->rec_pkts++; f_state->rec_bytes += p_info->payload; - - return f_state; } -static void delete_closed_flows(void *ctx, struct packet_info *p_info, - struct flow_state *flow, - struct flow_state *rev_flow) +static bool should_notify_closing(struct flow_state *f_state) { - bool has_opened; + return f_state->conn_state == CONNECTION_STATE_OPEN; +} - // Flow closing - try to delete flow state and push closing-event - if (flow && (p_info->event_type == FLOW_EVENT_CLOSING || - p_info->event_type == FLOW_EVENT_CLOSING_BOTH)) { - has_opened = flow->has_opened; - if (bpf_map_delete_elem(&flow_state, &p_info->pid.flow) == 0) { - debug_increment_autodel(PPING_MAP_FLOWSTATE); - if (has_opened) - send_flow_event(ctx, p_info, false); - } +static void close_and_delete_flows(void *ctx, struct packet_info *p_info, + struct flow_state *fw_flow, + struct flow_state *rev_flow) +{ + // Forward flow closing + if (p_info->event_type == FLOW_EVENT_CLOSING || + p_info->event_type == FLOW_EVENT_CLOSING_BOTH) { + if (should_notify_closing(fw_flow)) + send_flow_event(ctx, p_info, false); + fw_flow->conn_state = CONNECTION_STATE_CLOSED; } - // Also close reverse flow - if (rev_flow && p_info->event_type == FLOW_EVENT_CLOSING_BOTH) { - has_opened = rev_flow->has_opened; - if (bpf_map_delete_elem(&flow_state, &p_info->reply_pid.flow) == - 0) { + // Reverse flow closing + if (p_info->event_type == FLOW_EVENT_CLOSING_BOTH) { + if (should_notify_closing(rev_flow)) + send_flow_event(ctx, p_info, true); + rev_flow->conn_state = CONNECTION_STATE_CLOSED; + } + + // Delete flowstate entry if neither flow is open anymore + if (!is_flowstate_active(fw_flow) && !is_flowstate_active(rev_flow)) { + if (bpf_map_delete_elem(&flow_state, + get_dualflow_key_from_packet(p_info)) == + 0) debug_increment_autodel(PPING_MAP_FLOWSTATE); - if (has_opened) - send_flow_event(ctx, p_info, true); - } } } @@ -660,7 +836,7 @@ static void pping_timestamp_packet(struct flow_state *f_state, void *ctx, struct parsing_context *pctx, struct packet_info *p_info, bool new_flow) { - if (!f_state || !p_info->pid_valid) + if (!is_flowstate_active(f_state) || !p_info->pid_valid) return; if (config.localfilt && !pctx->is_egress && @@ -703,7 +879,7 @@ static void pping_match_packet(struct flow_state *f_state, void *ctx, struct rtt_event re = { 0 }; __u64 *p_ts; - if (!f_state || !p_info->reply_pid_valid) + if (!is_flowstate_active(f_state) || !p_info->reply_pid_valid) return; if (f_state->outstanding_timestamps == 0) @@ -745,20 +921,66 @@ static void pping_match_packet(struct flow_state *f_state, void *ctx, static void pping(void *ctx, struct parsing_context *pctx) { struct packet_info p_info = { 0 }; - struct flow_state *flow, *rev_flow;; - bool new_flow; + struct dual_flow_state *df_state; + struct flow_state *fw_flow, *rev_flow; + bool new_flow = false; - if (parse_packet_identifier(pctx, &p_info) < 0) + if (parse_packet_identifier(pctx, &p_info) < 0) return; - flow = update_flow(ctx, &p_info, &new_flow); - pping_timestamp_packet(flow, ctx, pctx, &p_info, new_flow); + df_state = lookup_or_create_dualflow_state(ctx, &p_info, &new_flow); + if (!df_state) + return; - rev_flow = update_rev_flow(ctx, &p_info); + fw_flow = get_flowstate_from_packet(df_state, &p_info); + update_forward_flowstate(&p_info, fw_flow, &new_flow); + pping_timestamp_packet(fw_flow, ctx, pctx, &p_info, new_flow); + + rev_flow = get_reverse_flowstate_from_packet(df_state, &p_info); + update_reverse_flowstate(ctx, &p_info, rev_flow); pping_match_packet(rev_flow, ctx, pctx, &p_info); - delete_closed_flows(ctx, &p_info, flow, rev_flow); + close_and_delete_flows(ctx, &p_info, fw_flow, rev_flow); +} +static bool is_flow_old(struct network_tuple *flow, struct flow_state *f_state, + __u64 time) +{ + __u64 age; + __u64 ts; + + if (!f_state || !is_flowstate_active(f_state)) + return false; + + ts = f_state->last_timestamp; // To avoid concurrency issue between check and age calculation + if (ts > time) + return false; + age = time - ts; + + return (f_state->conn_state == CONNECTION_STATE_WAITOPEN && + age > UNOPENED_FLOW_LIFETIME) || + ((flow->proto == IPPROTO_ICMP || + flow->proto == IPPROTO_ICMPV6) && + age > ICMP_FLOW_LIFETIME) || + age > FLOW_LIFETIME; +} + +static void send_flow_timeout_message(void *ctx, struct network_tuple *flow, + __u64 time) +{ + struct flow_event fe = { + .event_type = EVENT_TYPE_FLOW, + .flow_event_type = FLOW_EVENT_CLOSING, + .reason = EVENT_REASON_FLOW_TIMEOUT, + .source = EVENT_SOURCE_GC, + .timestamp = time, + .reserved = 0, + }; + + // To be consistent with Kathie's pping we report flow "backwards" + reverse_flow(&fe.flow, flow); + + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &fe, sizeof(fe)); } // Programs @@ -821,6 +1043,8 @@ int tsmap_cleanup(struct bpf_iter__bpf_map_elem *ctx) { struct packet_id local_pid; struct flow_state *f_state; + struct dual_flow_state *df_state; + struct network_tuple df_key; struct packet_id *pid = ctx->key; __u64 *timestamp = ctx->value; __u64 now = bpf_ktime_get_ns(); @@ -835,14 +1059,16 @@ int tsmap_cleanup(struct bpf_iter__bpf_map_elem *ctx) if (now <= *timestamp) return 0; - /* Seems like the key for map lookup operations must be - on the stack, so copy pid to local_pid. */ - __builtin_memcpy(&local_pid, pid, sizeof(local_pid)); - f_state = bpf_map_lookup_elem(&flow_state, &local_pid.flow); + make_dualflow_key(&df_key, &pid->flow); + df_state = bpf_map_lookup_elem(&flow_state, &df_key); + f_state = get_flowstate_from_dualflow(df_state, &pid->flow); rtt = f_state ? f_state->srtt : 0; if ((rtt && now - *timestamp > rtt * TIMESTAMP_RTT_LIFETIME) || now - *timestamp > TIMESTAMP_LIFETIME) { + /* Seems like the key for map lookup operations must be + on the stack, so copy pid to local_pid. */ + __builtin_memcpy(&local_pid, pid, sizeof(local_pid)); if (bpf_map_delete_elem(&packet_ts, &local_pid) == 0) { debug_increment_timeoutdel(PPING_MAP_PACKETTS); @@ -858,49 +1084,39 @@ int tsmap_cleanup(struct bpf_iter__bpf_map_elem *ctx) SEC("iter/bpf_map_elem") int flowmap_cleanup(struct bpf_iter__bpf_map_elem *ctx) { - struct network_tuple local_flow; - struct network_tuple *flow = ctx->key; - struct flow_state *f_state = ctx->value; - struct flow_event fe; + struct network_tuple flow1, flow2; + struct flow_state *f_state1, *f_state2; + struct dual_flow_state *df_state; __u64 now = bpf_ktime_get_ns(); - bool has_opened; - __u64 age; + bool notify1, notify2, timeout1, timeout2; debug_update_mapclean_stats(ctx, &events, !ctx->key || !ctx->value, ctx->meta->seq_num, now, PPING_MAP_FLOWSTATE); - if (!flow || !f_state) - return 0; - if (now < f_state->last_timestamp) + if (!ctx->key || !ctx->value) return 0; - // Check if flow is too old for unopned/ICMP/other flow type - age = now - f_state->last_timestamp; - has_opened = f_state->has_opened; + flow1 = *(struct network_tuple *)ctx->key; + reverse_flow(&flow2, &flow1); - if ((!has_opened && age > UNOPENED_FLOW_LIFETIME) || - ((flow->proto == IPPROTO_ICMP || flow->proto == IPPROTO_ICMPV6) && - age > ICMP_FLOW_LIFETIME) || - age > FLOW_LIFETIME) { - __builtin_memcpy(&local_flow, flow, sizeof(local_flow)); - has_opened = f_state->has_opened; + df_state = ctx->value; + f_state1 = get_flowstate_from_dualflow(df_state, &flow1); + f_state2 = get_flowstate_from_dualflow(df_state, &flow2); - if (bpf_map_delete_elem(&flow_state, &local_flow) == 0) { - debug_increment_timeoutdel(PPING_MAP_FLOWSTATE); + timeout1 = is_flow_old(&flow1, f_state1, now); + timeout2 = is_flow_old(&flow2, f_state2, now); - if (has_opened) { - reverse_flow(&fe.flow, &local_flow); - fe.event_type = EVENT_TYPE_FLOW; - fe.timestamp = now; - fe.flow_event_type = FLOW_EVENT_CLOSING; - fe.reason = EVENT_REASON_FLOW_TIMEOUT; - fe.source = EVENT_SOURCE_GC; - fe.reserved = 0; - bpf_perf_event_output(ctx, &events, - BPF_F_CURRENT_CPU, &fe, - sizeof(fe)); - } + if ((!is_flowstate_active(f_state1) || timeout1) && + (!is_flowstate_active(f_state2) || timeout2)) { + // Entry should be deleted + notify1 = should_notify_closing(f_state1) && timeout1; + notify2 = should_notify_closing(f_state2) && timeout2; + if (bpf_map_delete_elem(&flow_state, &flow1) == 0) { + if (notify1) + send_flow_timeout_message(ctx, &flow1, now); + if (notify2) + send_flow_timeout_message(ctx, &flow2, now); } }