Merge pull request #43 from simosund/pping_dualdirection_flowstate

PPing dualdirection flowstate
This commit is contained in:
Toke Høiland-Jørgensen
2022-03-31 17:58:25 +02:00
committed by GitHub
3 changed files with 439 additions and 195 deletions

View File

@@ -143,7 +143,7 @@ static const char *get_libbpf_strerror(int err)
}
static int parse_bounded_double(double *res, const char *str, double low,
double high, const char *name)
double high, const char *name)
{
char *endptr;
*res = strtod(str, &endptr);
@@ -170,8 +170,8 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config)
config->bpf_config.track_tcp = false;
config->bpf_config.track_icmp = false;
while ((opt = getopt_long(argc, argv, "hflTCi:r:R:t:c:F:I:", long_options,
NULL)) != -1) {
while ((opt = getopt_long(argc, argv, "hflTCi:r:R:t:c:F:I:",
long_options, NULL)) != -1) {
switch (opt) {
case 'i':
if (strlen(optarg) > IF_NAMESIZE) {
@@ -185,7 +185,8 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config)
err = -errno;
fprintf(stderr,
"Could not get index of interface %s: %s\n",
config->ifname, get_libbpf_strerror(err));
config->ifname,
get_libbpf_strerror(err));
return err;
}
break;
@@ -210,8 +211,7 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config)
case 't':
if (strcmp(optarg, "min") == 0) {
config->bpf_config.use_srtt = false;
}
else if (strcmp(optarg, "smoothed") == 0) {
} else if (strcmp(optarg, "smoothed") == 0) {
config->bpf_config.use_srtt = true;
} else {
fprintf(stderr,
@@ -237,7 +237,8 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config)
} else if (strcmp(optarg, "ppviz") == 0) {
config->output_format = PPING_OUTPUT_PPVIZ;
} else {
fprintf(stderr, "format must be \"standard\", \"json\" or \"ppviz\"\n");
fprintf(stderr,
"format must be \"standard\", \"json\" or \"ppviz\"\n");
return -EINVAL;
}
break;
@@ -247,7 +248,8 @@ static int parse_arguments(int argc, char *argv[], struct pping_config *config)
} else if (strcmp(optarg, "tc") == 0) {
config->ingress_prog = "pping_tc_ingress";
} else {
fprintf(stderr, "ingress-hook must be \"xdp\" or \"tc\"\n");
fprintf(stderr,
"ingress-hook must be \"xdp\" or \"tc\"\n");
return -EINVAL;
}
break;
@@ -293,10 +295,14 @@ const char *tracked_protocols_to_str(struct pping_config *config)
const char *output_format_to_str(enum PPING_OUTPUT_FORMAT format)
{
switch (format) {
case PPING_OUTPUT_STANDARD: return "standard";
case PPING_OUTPUT_JSON: return "json";
case PPING_OUTPUT_PPVIZ: return "ppviz";
default: return "unkown format";
case PPING_OUTPUT_STANDARD:
return "standard";
case PPING_OUTPUT_JSON:
return "json";
case PPING_OUTPUT_PPVIZ:
return "ppviz";
default:
return "unkown format";
}
}
@@ -599,8 +605,8 @@ static int format_ip_address(char *buf, size_t size, int af,
const struct in6_addr *addr)
{
if (af == AF_INET)
return inet_ntop(af, &addr->s6_addr[12],
buf, size) ? -errno : 0;
return inet_ntop(af, &addr->s6_addr[12], buf, size) ? -errno :
0;
else if (af == AF_INET6)
return inet_ntop(af, addr, buf, size) ? -errno : 0;
return -EINVAL;
@@ -641,6 +647,8 @@ static const char *flowevent_to_str(enum flow_event_type fe)
static const char *eventreason_to_str(enum flow_event_reason er)
{
switch (er) {
case EVENT_REASON_NONE:
return "none";
case EVENT_REASON_SYN:
return "SYN";
case EVENT_REASON_SYN_ACK:
@@ -672,7 +680,8 @@ static const char *eventsource_to_str(enum flow_event_source es)
}
}
static void print_flow_ppvizformat(FILE *stream, const struct network_tuple *flow)
static void print_flow_ppvizformat(FILE *stream,
const struct network_tuple *flow)
{
char saddr[INET6_ADDRSTRLEN];
char daddr[INET6_ADDRSTRLEN];
@@ -727,7 +736,8 @@ static void print_event_ppviz(const union pping_event *e)
printf("%llu.%09llu %llu.%09llu %llu.%09llu ", time / NS_PER_SECOND,
time % NS_PER_SECOND, re->rtt / NS_PER_SECOND,
re->rtt % NS_PER_SECOND, re->min_rtt / NS_PER_SECOND, re->min_rtt);
re->rtt % NS_PER_SECOND, re->min_rtt / NS_PER_SECOND,
re->min_rtt);
print_flow_ppvizformat(stdout, &re->flow);
printf("\n");
}
@@ -768,8 +778,7 @@ static void print_flowevent_fields_json(json_writer_t *ctx,
{
jsonw_string_field(ctx, "flow_event",
flowevent_to_str(fe->flow_event_type));
jsonw_string_field(ctx, "reason",
eventreason_to_str(fe->reason));
jsonw_string_field(ctx, "reason", eventreason_to_str(fe->reason));
jsonw_string_field(ctx, "triggered_by", eventsource_to_str(fe->source));
}

View File

@@ -9,7 +9,7 @@
#define NS_PER_SECOND 1000000000UL
#define NS_PER_MS 1000000UL
#define MS_PER_S 1000UL
#define S_PER_DAY (24*3600UL)
#define S_PER_DAY (24 * 3600UL)
typedef __u64 fixpoint64;
#define FIXPOINT_SHIFT 16
@@ -30,6 +30,7 @@ enum __attribute__((__packed__)) flow_event_type {
};
enum __attribute__((__packed__)) flow_event_reason {
EVENT_REASON_NONE,
EVENT_REASON_SYN,
EVENT_REASON_SYN_ACK,
EVENT_REASON_FIRST_OBS_PCKT,
@@ -49,6 +50,13 @@ enum __attribute__((__packed__)) pping_map {
PPING_MAP_PACKETTS
};
enum __attribute__((__packed__)) connection_state {
CONNECTION_STATE_EMPTY,
CONNECTION_STATE_WAITOPEN,
CONNECTION_STATE_OPEN,
CONNECTION_STATE_CLOSED
};
struct bpf_config {
__u64 rate_limit;
fixpoint64 rtt_rate;
@@ -95,11 +103,22 @@ struct flow_state {
__u64 rec_bytes;
__u32 last_id;
__u32 outstanding_timestamps;
bool has_opened;
enum connection_state conn_state;
enum flow_event_reason opening_reason;
__u8 reserved[6];
};
/*
* Stores flowstate for both direction (src -> dst and dst -> src) of a flow
*
* Uses two named members instead of array of size 2 to avoid hassels with
* convincing verifier that member access is not out of bounds
*/
struct dual_flow_state {
struct flow_state dir1;
struct flow_state dir2;
};
struct packet_id {
struct network_tuple flow;
__u32 identifier; //tsval for TCP packets

View File

@@ -39,6 +39,8 @@
#define ICMP_FLOW_LIFETIME (30 * NS_PER_SECOND) // Clear any ICMP flows if they're inactive this long
#define UNOPENED_FLOW_LIFETIME (30 * NS_PER_SECOND) // Clear out flows that have not seen a response after this long
#define MAX_MEMCMP_SIZE 128
/*
* Structs for map iteration programs
* Copied from /tools/testing/selftest/bpf/progs/bpf_iter.h
@@ -95,14 +97,27 @@ struct packet_info {
};
__u64 time; // Arrival time of packet
__u32 payload; // Size of packet data (excluding headers)
struct packet_id pid; // identifier to timestamp (ex. TSval)
struct packet_id reply_pid; // identifier to match against (ex. TSecr)
struct packet_id pid; // flow + identifier to timestamp (ex. TSval)
struct packet_id reply_pid; // rev. flow + identifier to match against (ex. TSecr)
bool pid_flow_is_dfkey; // Used to determine which member of dualflow state to use for forward direction
bool pid_valid; // identifier can be used to timestamp packet
bool reply_pid_valid; // reply_identifier can be used to match packet
enum flow_event_type event_type; // flow event triggered by packet
enum flow_event_reason event_reason; // reason for triggering flow event
};
/*
* Struct filled in by protocol id parsers (ex. parse_tcp_identifier)
*/
struct protocol_info {
__u32 pid;
__u32 reply_pid;
bool pid_valid;
bool reply_pid_valid;
enum flow_event_type event_type;
enum flow_event_reason event_reason;
};
char _license[] SEC("license") = "GPL";
// Global config struct - set from userspace
static volatile const struct bpf_config config = {};
@@ -120,7 +135,7 @@ struct {
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct network_tuple);
__type(value, struct flow_state);
__type(value, struct dual_flow_state);
__uint(max_entries, 16384);
} flow_state SEC(".maps");
@@ -169,6 +184,83 @@ static void reverse_flow(struct network_tuple *dest, struct network_tuple *src)
dest->reserved = 0;
}
/*
* Can't seem to get __builtin_memcmp to work, so hacking my own
*
* Based on https://githubhot.com/repo/iovisor/bcc/issues/3559,
* __builtin_memcmp should work constant size but I still get the "failed to
* find BTF for extern" error.
*/
static int my_memcmp(const void *s1_, const void *s2_, __u32 size)
{
const __u8 *s1 = s1_, *s2 = s2_;
int i;
for (i = 0; i < MAX_MEMCMP_SIZE && i < size; i++) {
if (s1[i] != s2[i])
return s1[i] > s2[i] ? 1 : -1;
}
return 0;
}
static bool is_dualflow_key(struct network_tuple *flow)
{
return my_memcmp(&flow->saddr, &flow->daddr, sizeof(flow->saddr)) <= 0;
}
static void make_dualflow_key(struct network_tuple *key,
struct network_tuple *flow)
{
if (is_dualflow_key(flow))
*key = *flow;
else
reverse_flow(key, flow);
}
static struct flow_state *fstate_from_dfkey(struct dual_flow_state *df_state,
bool is_dfkey)
{
if (!df_state)
return NULL;
return is_dfkey ? &df_state->dir1 : &df_state->dir2;
}
/*
* Get the flow state for flow-direction from df_state
*
* Note: Does not validate that any of the entries in df_state actually matches
* flow, just selects the direction in df_state that best fits the flow.
*/
static struct flow_state *
get_flowstate_from_dualflow(struct dual_flow_state *df_state,
struct network_tuple *flow)
{
return fstate_from_dfkey(df_state, is_dualflow_key(flow));
}
static struct flow_state *
get_flowstate_from_packet(struct dual_flow_state *df_state,
struct packet_info *p_info)
{
return fstate_from_dfkey(df_state, p_info->pid_flow_is_dfkey);
}
static struct flow_state *
get_reverse_flowstate_from_packet(struct dual_flow_state *df_state,
struct packet_info *p_info)
{
return fstate_from_dfkey(df_state, !p_info->pid_flow_is_dfkey);
}
static struct network_tuple *
get_dualflow_key_from_packet(struct packet_info *p_info)
{
return p_info->pid_flow_is_dfkey ? &p_info->pid.flow :
&p_info->reply_pid.flow;
}
/*
* Parses the TSval and TSecr values from the TCP options field. If sucessful
* the TSval and TSecr values will be stored at tsval and tsecr (in network
@@ -230,47 +322,49 @@ static int parse_tcp_ts(struct tcphdr *tcph, void *data_end, __u32 *tsval,
* Will use the TSval as pid and TSecr as reply_pid, and the TCP source and dest
* as port numbers.
*
* If successful, the pid (identifer + flow.port), reply_pid, pid_valid,
* reply_pid_valid, event_type and event_reason members of p_info will be set
* If successful, tcph, sport, dport and proto_info will be set
* appropriately and 0 will be returned.
* On failure -1 will be returned (no guarantees on values set in p_info).
* On failure -1 will be returned (and arguments will not be set).
*/
static int parse_tcp_identifier(struct parsing_context *pctx,
struct packet_info *p_info)
struct tcphdr **tcph, __u16 *sport,
__u16 *dport, struct protocol_info *proto_info)
{
if (parse_tcphdr(&pctx->nh, pctx->data_end, &p_info->tcph) < 0)
struct tcphdr *hdr;
if (parse_tcphdr(&pctx->nh, pctx->data_end, &hdr) < 0)
return -1;
if (parse_tcp_ts(p_info->tcph, pctx->data_end, &p_info->pid.identifier,
&p_info->reply_pid.identifier) < 0)
if (parse_tcp_ts(hdr, pctx->data_end, &proto_info->pid,
&proto_info->reply_pid) < 0)
return -1; //Possible TODO, fall back on seq/ack instead
p_info->pid.flow.saddr.port = p_info->tcph->source;
p_info->pid.flow.daddr.port = p_info->tcph->dest;
// Do not timestamp pure ACKs (no payload)
p_info->pid_valid =
pctx->nh.pos - pctx->data < pctx->pkt_len || p_info->tcph->syn;
proto_info->pid_valid =
pctx->nh.pos - pctx->data < pctx->pkt_len || hdr->syn;
// Do not match on non-ACKs (TSecr not valid)
p_info->reply_pid_valid = p_info->tcph->ack;
proto_info->reply_pid_valid = hdr->ack;
// Check if connection is opening/closing
if (p_info->tcph->rst) {
p_info->event_type = FLOW_EVENT_CLOSING_BOTH;
p_info->event_reason = EVENT_REASON_RST;
} else if (p_info->tcph->fin) {
p_info->event_type = FLOW_EVENT_CLOSING;
p_info->event_reason = EVENT_REASON_FIN;
} else if (p_info->tcph->syn) {
p_info->event_type = FLOW_EVENT_OPENING;
p_info->event_reason = p_info->tcph->ack ?
EVENT_REASON_SYN_ACK :
EVENT_REASON_SYN;
if (hdr->rst) {
proto_info->event_type = FLOW_EVENT_CLOSING_BOTH;
proto_info->event_reason = EVENT_REASON_RST;
} else if (hdr->fin) {
proto_info->event_type = FLOW_EVENT_CLOSING;
proto_info->event_reason = EVENT_REASON_FIN;
} else if (hdr->syn) {
proto_info->event_type = FLOW_EVENT_OPENING;
proto_info->event_reason =
hdr->ack ? EVENT_REASON_SYN_ACK : EVENT_REASON_SYN;
} else {
p_info->event_type = FLOW_EVENT_NONE;
proto_info->event_type = FLOW_EVENT_NONE;
proto_info->event_reason = EVENT_REASON_NONE;
}
*sport = hdr->source;
*dport = hdr->dest;
*tcph = hdr;
return 0;
}
@@ -279,41 +373,48 @@ static int parse_tcp_identifier(struct parsing_context *pctx,
* request/reply sequence number.
*
* Will use the echo sequence number as pid/reply_pid and the echo identifier
* as port numbers. Echo requests will only generate a valid pid and echo
* replies will only generate a valid reply_pid.
* as both src and dst port numbers. Echo requests will only generate a valid
* pid and echo replies will only generate a valid reply_pid.
*
* If successful, the pid (identifier + flow.port), reply_pid, pid_valid,
* reply pid_valid and event_type of p_info will be set appropriately and 0
* will be returned.
* On failure, -1 will be returned (no guarantees on p_info members).
* If successful, icmp6h, sport, dport and proto_info will be set appropriately
* and 0 will be returned.
* On failure, -1 will be returned (and arguments will not be set).
*
* Note: Will store the 16-bit sequence number in network byte order
* in the 32-bit (reply_)pid.identifier.
* in the 32-bit proto_info->(reply_)pid.
*/
static int parse_icmp6_identifier(struct parsing_context *pctx,
struct packet_info *p_info)
struct icmp6hdr **icmp6h, __u16 *sport,
__u16 *dport,
struct protocol_info *proto_info)
{
if (parse_icmp6hdr(&pctx->nh, pctx->data_end, &p_info->icmp6h) < 0)
struct icmp6hdr *hdr;
if (parse_icmp6hdr(&pctx->nh, pctx->data_end, &hdr) < 0)
return -1;
if (p_info->icmp6h->icmp6_code != 0)
if (hdr->icmp6_code != 0)
return -1;
if (p_info->icmp6h->icmp6_type == ICMPV6_ECHO_REQUEST) {
p_info->pid.identifier = p_info->icmp6h->icmp6_sequence;
p_info->pid_valid = true;
p_info->reply_pid_valid = false;
} else if (p_info->icmp6h->icmp6_type == ICMPV6_ECHO_REPLY) {
p_info->reply_pid.identifier = p_info->icmp6h->icmp6_sequence;
p_info->reply_pid_valid = true;
p_info->pid_valid = false;
if (hdr->icmp6_type == ICMPV6_ECHO_REQUEST) {
proto_info->pid = hdr->icmp6_sequence;
proto_info->pid_valid = true;
proto_info->reply_pid = 0;
proto_info->reply_pid_valid = false;
} else if (hdr->icmp6_type == ICMPV6_ECHO_REPLY) {
proto_info->reply_pid = hdr->icmp6_sequence;
proto_info->reply_pid_valid = true;
proto_info->pid = 0;
proto_info->pid_valid = false;
} else {
return -1;
}
p_info->event_type = FLOW_EVENT_NONE;
p_info->pid.flow.saddr.port = p_info->icmp6h->icmp6_identifier;
p_info->pid.flow.daddr.port = p_info->pid.flow.saddr.port;
proto_info->event_type = FLOW_EVENT_NONE;
proto_info->event_reason = EVENT_REASON_NONE;
*sport = hdr->icmp6_identifier;
*dport = hdr->icmp6_identifier;
*icmp6h = hdr;
return 0;
}
@@ -321,29 +422,36 @@ static int parse_icmp6_identifier(struct parsing_context *pctx,
* Same as parse_icmp6_identifier, but for an ICMP(v4) header instead.
*/
static int parse_icmp_identifier(struct parsing_context *pctx,
struct packet_info *p_info)
struct icmphdr **icmph, __u16 *sport,
__u16 *dport, struct protocol_info *proto_info)
{
if (parse_icmphdr(&pctx->nh, pctx->data_end, &p_info->icmph) < 0)
struct icmphdr *hdr;
if (parse_icmphdr(&pctx->nh, pctx->data_end, &hdr) < 0)
return -1;
if (p_info->icmph->code != 0)
if (hdr->code != 0)
return -1;
if (p_info->icmph->type == ICMP_ECHO) {
p_info->pid.identifier = p_info->icmph->un.echo.sequence;
p_info->pid_valid = true;
p_info->reply_pid_valid = false;
} else if (p_info->icmph->type == ICMP_ECHOREPLY) {
p_info->reply_pid.identifier = p_info->icmph->un.echo.sequence;
p_info->reply_pid_valid = true;
p_info->pid_valid = false;
if (hdr->type == ICMP_ECHO) {
proto_info->pid = hdr->un.echo.sequence;
proto_info->pid_valid = true;
proto_info->reply_pid = 0;
proto_info->reply_pid_valid = false;
} else if (hdr->type == ICMP_ECHOREPLY) {
proto_info->reply_pid = hdr->un.echo.sequence;
proto_info->reply_pid_valid = true;
proto_info->pid = 0;
proto_info->pid_valid = false;
} else {
return -1;
}
p_info->event_type = FLOW_EVENT_NONE;
p_info->pid.flow.saddr.port = p_info->icmph->un.echo.id;
p_info->pid.flow.daddr.port = p_info->pid.flow.saddr.port;
proto_info->event_type = FLOW_EVENT_NONE;
proto_info->event_reason = EVENT_REASON_NONE;
*sport = hdr->un.echo.id;
*dport = hdr->un.echo.id;
*icmph = hdr;
return 0;
}
@@ -360,6 +468,7 @@ static int parse_packet_identifier(struct parsing_context *pctx,
{
int proto, err;
struct ethhdr *eth;
struct protocol_info proto_info;
p_info->time = bpf_ktime_get_ns();
proto = parse_ethhdr(&pctx->nh, pctx->data_end, &eth);
@@ -379,20 +488,36 @@ static int parse_packet_identifier(struct parsing_context *pctx,
// Parse identifer from suitable protocol
if (config.track_tcp && p_info->pid.flow.proto == IPPROTO_TCP)
err = parse_tcp_identifier(pctx, p_info);
err = parse_tcp_identifier(pctx, &p_info->tcph,
&p_info->pid.flow.saddr.port,
&p_info->pid.flow.daddr.port,
&proto_info);
else if (config.track_icmp &&
p_info->pid.flow.proto == IPPROTO_ICMPV6 &&
p_info->pid.flow.ipv == AF_INET6)
err = parse_icmp6_identifier(pctx, p_info);
err = parse_icmp6_identifier(pctx, &p_info->icmp6h,
&p_info->pid.flow.saddr.port,
&p_info->pid.flow.daddr.port,
&proto_info);
else if (config.track_icmp && p_info->pid.flow.proto == IPPROTO_ICMP &&
p_info->pid.flow.ipv == AF_INET)
err = parse_icmp_identifier(pctx, p_info);
err = parse_icmp_identifier(pctx, &p_info->icmph,
&p_info->pid.flow.saddr.port,
&p_info->pid.flow.daddr.port,
&proto_info);
else
return -1; // No matching protocol
if (err)
return -1; // Failed parsing protocol
// Sucessfully parsed packet identifier - fill in IP-addresses and return
// Sucessfully parsed packet identifier - fill in remaining members and return
p_info->pid.identifier = proto_info.pid;
p_info->pid_valid = proto_info.pid_valid;
p_info->reply_pid.identifier = proto_info.reply_pid;
p_info->reply_pid_valid = proto_info.reply_pid_valid;
p_info->event_type = proto_info.event_type;
p_info->event_reason = proto_info.event_reason;
if (p_info->pid.flow.ipv == AF_INET) {
map_ipv4_to_ipv6(&p_info->pid.flow.saddr.ip,
p_info->iph->saddr);
@@ -403,6 +528,8 @@ static int parse_packet_identifier(struct parsing_context *pctx,
p_info->pid.flow.daddr.ip = p_info->ip6h->daddr;
}
p_info->pid_flow_is_dfkey = is_dualflow_key(&p_info->pid.flow);
reverse_flow(&p_info->reply_pid.flow, &p_info->pid.flow);
p_info->payload = remaining_pkt_payload(pctx);
@@ -512,101 +639,150 @@ static void send_map_full_event(void *ctx, struct packet_info *p_info,
}
/*
* Attempt to create a new flow-state.
* Returns a pointer to the flow_state if successful, NULL otherwise
* Initilizes an "empty" flow state based on the forward direction of the
* current packet
*/
static struct flow_state *create_flow(void *ctx, struct packet_info *p_info)
static void init_flowstate(struct flow_state *f_state,
struct packet_info *p_info)
{
struct flow_state new_state = { 0 };
f_state->conn_state = CONNECTION_STATE_WAITOPEN;
f_state->last_timestamp = p_info->time;
f_state->opening_reason = p_info->event_type == FLOW_EVENT_OPENING ?
p_info->event_reason :
EVENT_REASON_FIRST_OBS_PCKT;
}
new_state.last_timestamp = p_info->time;
new_state.opening_reason = p_info->event_type == FLOW_EVENT_OPENING ?
p_info->event_reason :
EVENT_REASON_FIRST_OBS_PCKT;
static void init_empty_flowstate(struct flow_state *f_state)
{
f_state->conn_state = CONNECTION_STATE_EMPTY;
}
if (bpf_map_update_elem(&flow_state, &p_info->pid.flow, &new_state,
BPF_NOEXIST) != 0) {
/*
* Initilize a new (assumed 0-initlized) dual flow state based on the current
* packet.
*/
static void init_dualflow_state(struct dual_flow_state *df_state,
struct packet_info *p_info)
{
struct flow_state *fw_state =
get_flowstate_from_packet(df_state, p_info);
struct flow_state *rev_state =
get_reverse_flowstate_from_packet(df_state, p_info);
init_flowstate(fw_state, p_info);
init_empty_flowstate(rev_state);
}
static struct dual_flow_state *
create_dualflow_state(void *ctx, struct packet_info *p_info, bool *new_flow)
{
struct network_tuple *key = get_dualflow_key_from_packet(p_info);
struct dual_flow_state new_state = { 0 };
init_dualflow_state(&new_state, p_info);
if (bpf_map_update_elem(&flow_state, key, &new_state, BPF_NOEXIST) ==
0) {
if (new_flow)
*new_flow = true;
} else {
send_map_full_event(ctx, p_info, PPING_MAP_FLOWSTATE);
return NULL;
}
return bpf_map_lookup_elem(&flow_state, &p_info->pid.flow);
return bpf_map_lookup_elem(&flow_state, key);
}
static struct flow_state *update_flow(void *ctx, struct packet_info *p_info,
bool *new_flow)
static struct dual_flow_state *
lookup_or_create_dualflow_state(void *ctx, struct packet_info *p_info,
bool *new_flow)
{
struct flow_state *f_state;
*new_flow = false;
struct dual_flow_state *df_state;
f_state = bpf_map_lookup_elem(&flow_state, &p_info->pid.flow);
df_state = bpf_map_lookup_elem(&flow_state,
get_dualflow_key_from_packet(p_info));
// Attempt to create flow if it does not exist
if (!f_state && p_info->pid_valid &&
!(p_info->event_type == FLOW_EVENT_CLOSING ||
p_info->event_type == FLOW_EVENT_CLOSING_BOTH)) {
*new_flow = true;
f_state = create_flow(ctx, p_info);
if (df_state)
return df_state;
// Only try to create new state if we have a valid pid
if (!p_info->pid_valid || p_info->event_type == FLOW_EVENT_CLOSING ||
p_info->event_type == FLOW_EVENT_CLOSING_BOTH)
return NULL;
return create_dualflow_state(ctx, p_info, new_flow);
}
static bool is_flowstate_active(struct flow_state *f_state)
{
return f_state->conn_state != CONNECTION_STATE_EMPTY &&
f_state->conn_state != CONNECTION_STATE_CLOSED;
}
static void update_forward_flowstate(struct packet_info *p_info,
struct flow_state *f_state, bool *new_flow)
{
// "Create" flowstate if it's empty
if (f_state->conn_state == CONNECTION_STATE_EMPTY &&
p_info->pid_valid) {
init_flowstate(f_state, p_info);
if (new_flow)
*new_flow = true;
}
if (!f_state)
return NULL;
// Update flow state
f_state->sent_pkts++;
f_state->sent_bytes += p_info->payload;
return f_state;
if (is_flowstate_active(f_state)) {
f_state->sent_pkts++;
f_state->sent_bytes += p_info->payload;
}
}
static struct flow_state *update_rev_flow(void *ctx, struct packet_info *p_info)
static void update_reverse_flowstate(void *ctx, struct packet_info *p_info,
struct flow_state *f_state)
{
struct flow_state *f_state;
if (!is_flowstate_active(f_state))
return;
f_state = bpf_map_lookup_elem(&flow_state, &p_info->reply_pid.flow);
if (!f_state)
return NULL;
// Is a new flow, push opening flow message
if (!f_state->has_opened &&
// First time we see reply for flow?
if (f_state->conn_state == CONNECTION_STATE_WAITOPEN &&
p_info->event_type != FLOW_EVENT_CLOSING_BOTH) {
f_state->has_opened = true;
f_state->conn_state = CONNECTION_STATE_OPEN;
send_flow_open_event(ctx, p_info, f_state);
}
// Update flow state
f_state->rec_pkts++;
f_state->rec_bytes += p_info->payload;
return f_state;
}
static void delete_closed_flows(void *ctx, struct packet_info *p_info,
struct flow_state *flow,
struct flow_state *rev_flow)
static bool should_notify_closing(struct flow_state *f_state)
{
bool has_opened;
return f_state->conn_state == CONNECTION_STATE_OPEN;
}
// Flow closing - try to delete flow state and push closing-event
if (flow && (p_info->event_type == FLOW_EVENT_CLOSING ||
p_info->event_type == FLOW_EVENT_CLOSING_BOTH)) {
has_opened = flow->has_opened;
if (bpf_map_delete_elem(&flow_state, &p_info->pid.flow) == 0) {
debug_increment_autodel(PPING_MAP_FLOWSTATE);
if (has_opened)
send_flow_event(ctx, p_info, false);
}
static void close_and_delete_flows(void *ctx, struct packet_info *p_info,
struct flow_state *fw_flow,
struct flow_state *rev_flow)
{
// Forward flow closing
if (p_info->event_type == FLOW_EVENT_CLOSING ||
p_info->event_type == FLOW_EVENT_CLOSING_BOTH) {
if (should_notify_closing(fw_flow))
send_flow_event(ctx, p_info, false);
fw_flow->conn_state = CONNECTION_STATE_CLOSED;
}
// Also close reverse flow
if (rev_flow && p_info->event_type == FLOW_EVENT_CLOSING_BOTH) {
has_opened = rev_flow->has_opened;
if (bpf_map_delete_elem(&flow_state, &p_info->reply_pid.flow) ==
0) {
// Reverse flow closing
if (p_info->event_type == FLOW_EVENT_CLOSING_BOTH) {
if (should_notify_closing(rev_flow))
send_flow_event(ctx, p_info, true);
rev_flow->conn_state = CONNECTION_STATE_CLOSED;
}
// Delete flowstate entry if neither flow is open anymore
if (!is_flowstate_active(fw_flow) && !is_flowstate_active(rev_flow)) {
if (bpf_map_delete_elem(&flow_state,
get_dualflow_key_from_packet(p_info)) ==
0)
debug_increment_autodel(PPING_MAP_FLOWSTATE);
if (has_opened)
send_flow_event(ctx, p_info, true);
}
}
}
@@ -660,7 +836,7 @@ static void pping_timestamp_packet(struct flow_state *f_state, void *ctx,
struct parsing_context *pctx,
struct packet_info *p_info, bool new_flow)
{
if (!f_state || !p_info->pid_valid)
if (!is_flowstate_active(f_state) || !p_info->pid_valid)
return;
if (config.localfilt && !pctx->is_egress &&
@@ -703,7 +879,7 @@ static void pping_match_packet(struct flow_state *f_state, void *ctx,
struct rtt_event re = { 0 };
__u64 *p_ts;
if (!f_state || !p_info->reply_pid_valid)
if (!is_flowstate_active(f_state) || !p_info->reply_pid_valid)
return;
if (f_state->outstanding_timestamps == 0)
@@ -745,20 +921,66 @@ static void pping_match_packet(struct flow_state *f_state, void *ctx,
static void pping(void *ctx, struct parsing_context *pctx)
{
struct packet_info p_info = { 0 };
struct flow_state *flow, *rev_flow;;
bool new_flow;
struct dual_flow_state *df_state;
struct flow_state *fw_flow, *rev_flow;
bool new_flow = false;
if (parse_packet_identifier(pctx, &p_info) < 0)
if (parse_packet_identifier(pctx, &p_info) < 0)
return;
flow = update_flow(ctx, &p_info, &new_flow);
pping_timestamp_packet(flow, ctx, pctx, &p_info, new_flow);
df_state = lookup_or_create_dualflow_state(ctx, &p_info, &new_flow);
if (!df_state)
return;
rev_flow = update_rev_flow(ctx, &p_info);
fw_flow = get_flowstate_from_packet(df_state, &p_info);
update_forward_flowstate(&p_info, fw_flow, &new_flow);
pping_timestamp_packet(fw_flow, ctx, pctx, &p_info, new_flow);
rev_flow = get_reverse_flowstate_from_packet(df_state, &p_info);
update_reverse_flowstate(ctx, &p_info, rev_flow);
pping_match_packet(rev_flow, ctx, pctx, &p_info);
delete_closed_flows(ctx, &p_info, flow, rev_flow);
close_and_delete_flows(ctx, &p_info, fw_flow, rev_flow);
}
static bool is_flow_old(struct network_tuple *flow, struct flow_state *f_state,
__u64 time)
{
__u64 age;
__u64 ts;
if (!f_state || !is_flowstate_active(f_state))
return false;
ts = f_state->last_timestamp; // To avoid concurrency issue between check and age calculation
if (ts > time)
return false;
age = time - ts;
return (f_state->conn_state == CONNECTION_STATE_WAITOPEN &&
age > UNOPENED_FLOW_LIFETIME) ||
((flow->proto == IPPROTO_ICMP ||
flow->proto == IPPROTO_ICMPV6) &&
age > ICMP_FLOW_LIFETIME) ||
age > FLOW_LIFETIME;
}
static void send_flow_timeout_message(void *ctx, struct network_tuple *flow,
__u64 time)
{
struct flow_event fe = {
.event_type = EVENT_TYPE_FLOW,
.flow_event_type = FLOW_EVENT_CLOSING,
.reason = EVENT_REASON_FLOW_TIMEOUT,
.source = EVENT_SOURCE_GC,
.timestamp = time,
.reserved = 0,
};
// To be consistent with Kathie's pping we report flow "backwards"
reverse_flow(&fe.flow, flow);
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &fe, sizeof(fe));
}
// Programs
@@ -821,6 +1043,8 @@ int tsmap_cleanup(struct bpf_iter__bpf_map_elem *ctx)
{
struct packet_id local_pid;
struct flow_state *f_state;
struct dual_flow_state *df_state;
struct network_tuple df_key;
struct packet_id *pid = ctx->key;
__u64 *timestamp = ctx->value;
__u64 now = bpf_ktime_get_ns();
@@ -835,14 +1059,16 @@ int tsmap_cleanup(struct bpf_iter__bpf_map_elem *ctx)
if (now <= *timestamp)
return 0;
/* Seems like the key for map lookup operations must be
on the stack, so copy pid to local_pid. */
__builtin_memcpy(&local_pid, pid, sizeof(local_pid));
f_state = bpf_map_lookup_elem(&flow_state, &local_pid.flow);
make_dualflow_key(&df_key, &pid->flow);
df_state = bpf_map_lookup_elem(&flow_state, &df_key);
f_state = get_flowstate_from_dualflow(df_state, &pid->flow);
rtt = f_state ? f_state->srtt : 0;
if ((rtt && now - *timestamp > rtt * TIMESTAMP_RTT_LIFETIME) ||
now - *timestamp > TIMESTAMP_LIFETIME) {
/* Seems like the key for map lookup operations must be
on the stack, so copy pid to local_pid. */
__builtin_memcpy(&local_pid, pid, sizeof(local_pid));
if (bpf_map_delete_elem(&packet_ts, &local_pid) == 0) {
debug_increment_timeoutdel(PPING_MAP_PACKETTS);
@@ -858,49 +1084,39 @@ int tsmap_cleanup(struct bpf_iter__bpf_map_elem *ctx)
SEC("iter/bpf_map_elem")
int flowmap_cleanup(struct bpf_iter__bpf_map_elem *ctx)
{
struct network_tuple local_flow;
struct network_tuple *flow = ctx->key;
struct flow_state *f_state = ctx->value;
struct flow_event fe;
struct network_tuple flow1, flow2;
struct flow_state *f_state1, *f_state2;
struct dual_flow_state *df_state;
__u64 now = bpf_ktime_get_ns();
bool has_opened;
__u64 age;
bool notify1, notify2, timeout1, timeout2;
debug_update_mapclean_stats(ctx, &events, !ctx->key || !ctx->value,
ctx->meta->seq_num, now,
PPING_MAP_FLOWSTATE);
if (!flow || !f_state)
return 0;
if (now < f_state->last_timestamp)
if (!ctx->key || !ctx->value)
return 0;
// Check if flow is too old for unopned/ICMP/other flow type
age = now - f_state->last_timestamp;
has_opened = f_state->has_opened;
flow1 = *(struct network_tuple *)ctx->key;
reverse_flow(&flow2, &flow1);
if ((!has_opened && age > UNOPENED_FLOW_LIFETIME) ||
((flow->proto == IPPROTO_ICMP || flow->proto == IPPROTO_ICMPV6) &&
age > ICMP_FLOW_LIFETIME) ||
age > FLOW_LIFETIME) {
__builtin_memcpy(&local_flow, flow, sizeof(local_flow));
has_opened = f_state->has_opened;
df_state = ctx->value;
f_state1 = get_flowstate_from_dualflow(df_state, &flow1);
f_state2 = get_flowstate_from_dualflow(df_state, &flow2);
if (bpf_map_delete_elem(&flow_state, &local_flow) == 0) {
debug_increment_timeoutdel(PPING_MAP_FLOWSTATE);
timeout1 = is_flow_old(&flow1, f_state1, now);
timeout2 = is_flow_old(&flow2, f_state2, now);
if (has_opened) {
reverse_flow(&fe.flow, &local_flow);
fe.event_type = EVENT_TYPE_FLOW;
fe.timestamp = now;
fe.flow_event_type = FLOW_EVENT_CLOSING;
fe.reason = EVENT_REASON_FLOW_TIMEOUT;
fe.source = EVENT_SOURCE_GC;
fe.reserved = 0;
bpf_perf_event_output(ctx, &events,
BPF_F_CURRENT_CPU, &fe,
sizeof(fe));
}
if ((!is_flowstate_active(f_state1) || timeout1) &&
(!is_flowstate_active(f_state2) || timeout2)) {
// Entry should be deleted
notify1 = should_notify_closing(f_state1) && timeout1;
notify2 = should_notify_closing(f_state2) && timeout2;
if (bpf_map_delete_elem(&flow_state, &flow1) == 0) {
if (notify1)
send_flow_timeout_message(ctx, &flow1, now);
if (notify2)
send_flow_timeout_message(ctx, &flow2, now);
}
}