pping: Only consider flow opened on reply

Wait with sending a flow open message until a reply has been seen for
the flow. Likewise, only emit a flow closing event if the flow has
first been opened (that is, a reply has been seen).

This introduces potential (but unlikely) concurrency issues for flow
opening/closing messages which are further described in the README.

Signed-off-by: Simon Sundberg <simon.sundberg@kau.se>
This commit is contained in:
Simon Sundberg
2022-02-03 12:03:22 +01:00
parent 8a8f538759
commit 32bdf11a96
4 changed files with 87 additions and 30 deletions

View File

@ -253,6 +253,20 @@ estimate anyways (may never calculate RTT for packet with the true minimum
RTT). And even without sampling there is some inherent sampling due to TCP
timestamps only being updated at a limited rate (1000 Hz).
#### Outputting flow opening/closing events
A flow is not considered opened until a reply has been seen for it. The
`flow_state` map keeps information about if the flow has been opened or not,
which is checked and updated for each reply. The check and update of this
information is not performed atomically, which may result in multiple replies
thinking they are the first, emitting multiple flow-opened events, in case they
are processed concurrently.
Likewise, when flows are closed it checks if the flow has been opened to
determine if a flow closing message should be sent. If multiple replies are
processed concurrently, it's possible one of them will update the flow-open
information and emit a flow opening message, but another reply closing the flow
without thinking it's ever been opened, thus not sending a flow closing message.
## Similar projects
Passively measuring the RTT for TCP traffic is not a novel concept, and there
exists a number of other tools that can do so. A good overview of how passive

View File

@ -500,10 +500,11 @@ static bool packet_ts_timeout(void *key_ptr, void *val_ptr, __u64 now)
static bool flow_timeout(void *key_ptr, void *val_ptr, __u64 now)
{
struct flow_event fe;
__u64 ts = ((struct flow_state *)val_ptr)->last_timestamp;
struct flow_state *f_state = val_ptr;
if (now > ts && now - ts > FLOW_LIFETIME) {
if (print_event_func) {
if (now > f_state->last_timestamp &&
now - f_state->last_timestamp > FLOW_LIFETIME) {
if (print_event_func && f_state->has_opened) {
fe.event_type = EVENT_TYPE_FLOW;
fe.timestamp = now;
reverse_flow(&fe.flow, key_ptr);

View File

@ -82,7 +82,9 @@ struct flow_state {
__u64 rec_pkts;
__u64 rec_bytes;
__u32 last_id;
__u32 reserved;
bool has_opened;
enum flow_event_reason opening_reason;
__u16 reserved;
};
struct packet_id {

View File

@ -390,6 +390,28 @@ static bool is_rate_limited(__u64 now, __u64 last_ts, __u64 rtt)
return now - last_ts < config.rate_limit;
}
/*
* Send a flow opening event through the perf-buffer.
* As these events are only sent upon receiving a reply, need to access state
* of the reverse flow to get reason flow was opened and when the original
* packet opening the flow was sent.
*/
static void send_flow_open_event(void *ctx, struct packet_info *p_info,
struct flow_state *rev_flow)
{
struct flow_event fe = {
.event_type = EVENT_TYPE_FLOW,
.flow_event_type = FLOW_EVENT_OPENING,
.source = EVENT_SOURCE_PKT_DEST,
.flow = p_info->pid.flow,
.reason = rev_flow->opening_reason,
.timestamp = rev_flow->last_timestamp,
.reserved = 0,
};
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &fe, sizeof(fe));
}
/*
* Sends a flow-event message based on p_info.
*
@ -420,24 +442,22 @@ static void send_flow_event(void *ctx, struct packet_info *p_info,
}
/*
* Attempt to create a new flow-state and push flow-opening message
* Attempt to create a new flow-state.
* Returns a pointer to the flow_state if successful, NULL otherwise
*/
static struct flow_state *create_flow(void *ctx, struct packet_info *p_info)
static struct flow_state *create_flow(struct packet_info *p_info)
{
struct flow_state new_state = { 0 };
new_state.last_timestamp = p_info->time;
new_state.opening_reason = p_info->event_type == FLOW_EVENT_OPENING ?
p_info->event_reason :
EVENT_REASON_FIRST_OBS_PCKT;
if (bpf_map_update_elem(&flow_state, &p_info->pid.flow, &new_state,
BPF_NOEXIST) != 0)
return NULL;
if (p_info->event_type != FLOW_EVENT_OPENING) {
p_info->event_type = FLOW_EVENT_OPENING;
p_info->event_reason = EVENT_REASON_FIRST_OBS_PCKT;
}
send_flow_event(ctx, p_info, false);
return bpf_map_lookup_elem(&flow_state, &p_info->pid.flow);
}
@ -448,21 +468,26 @@ static struct flow_state *update_flow(void *ctx, struct packet_info *p_info,
*new_flow = false;
f_state = bpf_map_lookup_elem(&flow_state, &p_info->pid.flow);
if (!f_state && p_info->pid_valid) {
// Attempt to create flow if it does not exist
if (!f_state && p_info->pid_valid &&
!(p_info->event_type == FLOW_EVENT_CLOSING ||
p_info->event_type == FLOW_EVENT_CLOSING_BOTH)) {
*new_flow = true;
f_state = create_flow(ctx, p_info);
f_state = create_flow(p_info);
}
if (!f_state)
return NULL;
// Update flow state
f_state->sent_pkts++;
f_state->sent_bytes += p_info->payload;
return f_state;
}
static struct flow_state *update_rev_flow(struct packet_info *p_info)
static struct flow_state *update_rev_flow(void *ctx, struct packet_info *p_info)
{
struct flow_state *f_state;
@ -470,24 +495,41 @@ static struct flow_state *update_rev_flow(struct packet_info *p_info)
if (!f_state)
return NULL;
// Is a new flow, push opening flow message
if (!f_state->has_opened &&
p_info->event_type != FLOW_EVENT_CLOSING_BOTH) {
f_state->has_opened = true;
send_flow_open_event(ctx, p_info, f_state);
}
// Update flow state
f_state->rec_pkts++;
f_state->rec_bytes += p_info->payload;
return f_state;
}
static void delete_closed_flows(void *ctx, struct packet_info *p_info)
static void delete_closed_flows(void *ctx, struct packet_info *p_info,
struct flow_state *flow,
struct flow_state *rev_flow)
{
bool has_opened;
// Flow closing - try to delete flow state and push closing-event
if (p_info->event_type == FLOW_EVENT_CLOSING ||
p_info->event_type == FLOW_EVENT_CLOSING_BOTH) {
if (!bpf_map_delete_elem(&flow_state, &p_info->pid.flow))
if (flow && (p_info->event_type == FLOW_EVENT_CLOSING ||
p_info->event_type == FLOW_EVENT_CLOSING_BOTH)) {
has_opened = flow->has_opened;
if (!bpf_map_delete_elem(&flow_state, &p_info->pid.flow) &&
has_opened)
send_flow_event(ctx, p_info, false);
}
// Also close reverse flow
if (p_info->event_type == FLOW_EVENT_CLOSING_BOTH) {
if (!bpf_map_delete_elem(&flow_state, &p_info->reply_pid.flow))
if (rev_flow && p_info->event_type == FLOW_EVENT_CLOSING_BOTH) {
has_opened = rev_flow->has_opened;
if (!bpf_map_delete_elem(&flow_state,
&p_info->reply_pid.flow) &&
has_opened)
send_flow_event(ctx, p_info, true);
}
}
@ -617,22 +659,20 @@ static void pping_match_packet(struct flow_state *f_state, void *ctx,
static void pping(void *ctx, struct parsing_context *pctx)
{
struct packet_info p_info = { 0 };
struct flow_state *f_state;
struct flow_state *flow, *rev_flow;;
bool new_flow;
if (parse_packet_identifier(pctx, &p_info) < 0)
return;
if (p_info.event_type != FLOW_EVENT_CLOSING &&
p_info.event_type != FLOW_EVENT_CLOSING_BOTH) {
f_state = update_flow(ctx, &p_info, &new_flow);
pping_timestamp_packet(f_state, ctx, pctx, &p_info, new_flow);
}
flow = update_flow(ctx, &p_info, &new_flow);
pping_timestamp_packet(flow, ctx, pctx, &p_info, new_flow);
f_state = update_rev_flow(&p_info);
pping_match_packet(f_state, ctx, pctx, &p_info);
rev_flow = update_rev_flow(ctx, &p_info);
pping_match_packet(rev_flow, ctx, pctx, &p_info);
delete_closed_flows(ctx, &p_info, flow, rev_flow);
delete_closed_flows(ctx, &p_info);
}
// Programs