1
0
mirror of https://gitlab.labs.nic.cz/labs/bird.git synced 2024-05-11 16:54:54 +00:00

Event lists rewritten to a single linked list

In multithreaded environment, we need to pass messages between workers.
This is done by queuing events to their respective queues. The
double-linked list is not really useful for that as it needs locking
everywhere.

This commit rewrites the event subsystem to use a single-linked list
where events are enqueued by a single atomic instruction and the queue
is processed after atomically moving the whole queue aside.
This commit is contained in:
Maria Matejka
2022-06-24 19:53:34 +02:00
parent 08c8484608
commit e91754f5b9
6 changed files with 159 additions and 135 deletions

View File

@@ -23,27 +23,91 @@
#include "nest/bird.h"
#include "lib/event.h"
#include "lib/locking.h"
#include "lib/io-loop.h"
extern _Thread_local struct coroutine *this_coro;
event_list global_event_list;
event_list global_work_list;
STATIC_ASSERT(OFFSETOF(event_list, _sentinel.next) >= OFFSETOF(event_list, _end[0]));
void
ev_init_list(event_list *el, struct birdloop *loop, const char *name)
{
el->name = name;
el->loop = loop;
atomic_store_explicit(&el->receiver, &el->_sentinel, memory_order_relaxed);
atomic_store_explicit(&el->_executor, &el->_sentinel, memory_order_relaxed);
atomic_store_explicit(&el->_sentinel.next, NULL, memory_order_relaxed);
}
/*
* The event list should work as a message passing point. Sending a message
* must be a fairly fast process with no locks and low waiting times. OTOH,
* processing messages always involves running the assigned code and the
* receiver is always a single one thread with no concurrency at all. There is
* also a postponing requirement to synchronously remove an event from a queue,
* yet we allow this only when the caller has its receiver event loop locked.
* It still means that the event may get postponed from other event in the same
* list, therefore we have to be careful.
*/
static inline int
ev_remove_from(event *e, event * _Atomic * head)
{
/* The head pointer stores where cur is pointed to from */
event * _Atomic *prev = head;
/* The current event in queue to check */
event *cur = atomic_load_explicit(prev, memory_order_acquire);
/* Pre-loaded next pointer; if NULL, this is sentinel */
event *next = atomic_load_explicit(&cur->next, memory_order_acquire);
while (next)
{
if (e == cur)
{
/* Check whether we have collided with somebody else
* adding an item to the queue. */
if (!atomic_compare_exchange_strong_explicit(
prev, &cur, next,
memory_order_acq_rel, memory_order_acquire))
{
/* This may happen only on list head */
ASSERT_DIE(prev == head);
/* Restart. The collision should never happen again. */
return ev_remove_from(e, head);
}
/* Successfully removed from the list; inactivate this event. */
atomic_store_explicit(&cur->next, NULL, memory_order_release);
return 1;
}
/* Go to the next event. */
prev = &cur->next;
cur = next;
next = atomic_load_explicit(&cur->next, memory_order_acquire);
}
return 0;
}
inline void
ev_postpone(event *e)
{
event_list *el = e->list;
if (!el)
/* Find the list to remove the event from */
event_list *sl = ev_get_list(e);
if (!sl)
return;
ASSERT_DIE(birdloop_inside(el->loop));
/* Postponing allowed only from the target loop */
ASSERT_DIE(birdloop_inside(sl->loop));
LOCK_DOMAIN(event, el->lock);
if (ev_active(e))
rem_node(&e->n);
UNLOCK_DOMAIN(event, el->lock);
/* Remove from one of these lists. */
ASSERT(ev_remove_from(e, &sl->_executor) || ev_remove_from(e, &sl->receiver));
}
static void
@@ -54,7 +118,7 @@ ev_dump(resource *r)
debug("(code %p, data %p, %s)\n",
e->hook,
e->data,
e->n.next ? "scheduled" : "inactive");
atomic_load_explicit(&e->next, memory_order_relaxed) ? "scheduled" : "inactive");
}
static struct resclass ev_class = {
@@ -108,23 +172,17 @@ ev_run(event *e)
inline void
ev_send(event_list *l, event *e)
{
DBG("ev_send(%p, %p)\n", l, e);
ASSERT_DIE(e->hook);
ASSERT_DIE(!e->list || (e->list == l) || (e->list->loop == l->loop));
e->list = l;
LOCK_DOMAIN(event, l->lock);
if (enlisted(&e->n))
{
UNLOCK_DOMAIN(event, l->lock);
event_list *sl = ev_get_list(e);
if (sl == l)
return;
}
if (sl)
bug("Queuing an already queued event to another queue is not supported.");
add_tail(&l->events, &e->n);
UNLOCK_DOMAIN(event, l->lock);
birdloop_ping(l->loop);
event *next = atomic_load_explicit(&l->receiver, memory_order_acquire);
do atomic_store_explicit(&e->next, next, memory_order_relaxed);
while (!atomic_compare_exchange_strong_explicit(
&l->receiver, &next, e,
memory_order_acq_rel, memory_order_acquire));
}
void io_log_event(void *hook, void *data);
@@ -135,94 +193,57 @@ void io_log_event(void *hook, void *data);
*
* This function calls ev_run() for all events enqueued in the list @l.
*/
int
ev_run_list(event_list *l)
{
const _Bool legacy = LEGACY_EVENT_LIST(l);
if (legacy)
ASSERT_THE_BIRD_LOCKED;
node *n;
list tmp_list;
init_list(&tmp_list);
/* Move the event list contents to a local list to avoid executing repeatedly added events */
LOCK_DOMAIN(event, l->lock);
add_tail_list(&tmp_list, &l->events);
init_list(&l->events);
UNLOCK_DOMAIN(event, l->lock);
WALK_LIST_FIRST(n, tmp_list)
{
event *e = SKIP_BACK(event, n, n);
if (legacy)
{
/* The legacy way of event execution */
io_log_event(e->hook, e->data);
ev_postpone(e);
e->hook(e->data);
}
else
{
// io_log_event(e->hook, e->data); /* TODO: add support for event logging in other io loops */
ASSERT_DIE(e->list == l);
LOCK_DOMAIN(event, l->lock);
rem_node(&e->n);
UNLOCK_DOMAIN(event, l->lock);
e->hook(e->data);
}
tmp_flush();
}
LOCK_DOMAIN(event, l->lock);
int repeat = ! EMPTY_LIST(l->events);
UNLOCK_DOMAIN(event, l->lock);
return repeat;
}
int
ev_run_list_limited(event_list *l, uint limit)
{
ASSERT_DIE(LEGACY_EVENT_LIST(l));
ASSERT_THE_BIRD_LOCKED;
event * _Atomic *ep = &l->_executor;
node *n;
list tmp_list;
/* No pending events, refill the queue. */
if (atomic_load_explicit(ep, memory_order_relaxed) == &l->_sentinel)
{
/* Move the current event list aside and create a new one. */
event *received = atomic_exchange_explicit(
&l->receiver, &l->_sentinel, memory_order_acq_rel);
LOCK_DOMAIN(event, l->lock);
init_list(&tmp_list);
add_tail_list(&tmp_list, &l->events);
init_list(&l->events);
UNLOCK_DOMAIN(event, l->lock);
/* No event to run. */
if (received == &l->_sentinel)
return 0;
WALK_LIST_FIRST(n, tmp_list)
/* Setup the executor queue */
event *head = &l->_sentinel;
/* Flip the order of the events by relinking them one by one (push-pop) */
while (received != &l->_sentinel)
{
event *e = SKIP_BACK(event, n, n);
if (!limit)
break;
io_log_event(e->hook, e->data);
ev_run(e);
tmp_flush();
limit--;
event *cur = received;
received = atomic_exchange_explicit(&cur->next, head, memory_order_relaxed);
head = cur;
}
LOCK_DOMAIN(event, l->lock);
if (!EMPTY_LIST(tmp_list))
{
/* Attach new items after the unprocessed old items */
add_tail_list(&tmp_list, &l->events);
init_list(&l->events);
add_tail_list(&l->events, &tmp_list);
/* Store the executor queue to its designated place */
atomic_store_explicit(ep, head, memory_order_relaxed);
}
int repeat = ! EMPTY_LIST(l->events);
UNLOCK_DOMAIN(event, l->lock);
/* Run the events in order. */
event *e;
while ((e = atomic_load_explicit(ep, memory_order_relaxed)) != &l->_sentinel)
{
/* Check limit */
if (!--limit)
return 1;
return repeat;
/* This is ugly hack, we want to log just events executed from the main I/O loop */
if ((l == &global_event_list) || (l == &global_work_list))
io_log_event(e->hook, e->data);
/* Inactivate the event */
atomic_store_explicit(ep, atomic_load_explicit(&e->next, memory_order_relaxed), memory_order_relaxed);
atomic_store_explicit(&e->next, NULL, memory_order_relaxed);
/* Run the event */
e->hook(e->data);
tmp_flush();
}
return atomic_load_explicit(&l->receiver, memory_order_relaxed) != &l->_sentinel;
}