1
0
mirror of https://gitlab.labs.nic.cz/labs/bird.git synced 2024-05-11 16:54:54 +00:00

Merge commit '94eb0858' into thread-next

This commit is contained in:
Maria Matejka
2022-07-18 12:33:00 +02:00
38 changed files with 788 additions and 469 deletions

View File

@@ -1,4 +1,4 @@
src := alloc.c io.c krt.c log.c main.c random.c coroutine.c
src := alloc.c io.c io-loop.c krt.c log.c main.c random.c coroutine.c
obj := $(src-o-files)
$(all-daemon)
$(cf-local)

View File

@@ -97,7 +97,7 @@ alloc_page(void)
struct free_page *fp = SKIP_BACK(struct free_page, n, HEAD(fps->pages));
rem_node(&fp->n);
if ((--fps->cnt < fps->min) && !shutting_down)
ev_schedule(&fps->cleanup);
ev_send(&global_work_list, &fps->cleanup);
bzero(fp, page_size);
return fp;
@@ -124,7 +124,7 @@ free_page(void *ptr)
add_tail(&fps->pages, &fp->n);
if ((++fps->cnt > fps->max) && !shutting_down)
ev_schedule(&fps->cleanup);
ev_send(&global_work_list, &fps->cleanup);
#endif
}

View File

@@ -21,10 +21,9 @@
#include "lib/resource.h"
#include "lib/timer.h"
/* Using a rather big stack for coroutines to allow for stack-local allocations.
* In real world, the kernel doesn't alloc this memory until it is used.
* */
#define CORO_STACK_SIZE 1048576
#include "conf/conf.h"
#define CORO_STACK_SIZE 65536
/*
* Implementation of coroutines based on POSIX threads
@@ -79,6 +78,11 @@ domain_free(struct domain_generic *dg)
xfree(dg);
}
uint dg_order(struct domain_generic *dg)
{
return dg->order;
}
void do_lock(struct domain_generic *dg, struct domain_generic **lsp)
{
if ((char *) lsp - (char *) &locking_stack != dg->order)
@@ -89,7 +93,11 @@ void do_lock(struct domain_generic *dg, struct domain_generic **lsp)
if (*lsp)
bug("Inconsistent locking stack state on lock");
btime lock_begin = current_time();
pthread_mutex_lock(&dg->mutex);
btime duration = current_time() - lock_begin;
if (config && (duration > config->watchdog_warning))
log(L_WARN "Locking of %s took %d ms", dg->name, (int) (duration TO_MS));
if (dg->prev || dg->locked_by)
bug("Previous unlock not finished correctly");
@@ -140,11 +148,16 @@ static struct resclass coro_class = {
.free = coro_free,
};
_Thread_local struct coroutine *this_coro = NULL;
static void *coro_entry(void *p)
{
struct coroutine *c = p;
ASSERT_DIE(c->entry);
this_coro = c;
c->entry(c->data);
ASSERT_DIE(coro_cleaned_up);

537
sysdep/unix/io-loop.c Normal file
View File

@@ -0,0 +1,537 @@
/*
* BIRD -- I/O and event loop
*
* Can be freely distributed and used under the terms of the GNU GPL.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <pthread.h>
#include <time.h>
#include <sys/time.h>
#include "nest/bird.h"
#include "lib/buffer.h"
#include "lib/coro.h"
#include "lib/lists.h"
#include "lib/resource.h"
#include "lib/event.h"
#include "lib/timer.h"
#include "lib/socket.h"
#include "lib/io-loop.h"
#include "sysdep/unix/io-loop.h"
#include "conf/conf.h"
/*
* Current thread context
*/
_Thread_local struct birdloop *birdloop_current;
static _Thread_local struct birdloop *birdloop_wakeup_masked;
static _Thread_local uint birdloop_wakeup_masked_count;
event_list *
birdloop_event_list(struct birdloop *loop)
{
return &loop->event_list;
}
struct timeloop *
birdloop_time_loop(struct birdloop *loop)
{
return &loop->time;
}
_Bool
birdloop_inside(struct birdloop *loop)
{
for (struct birdloop *c = birdloop_current; c; c = c->prev_loop)
if (loop == c)
return 1;
return 0;
}
/*
* Wakeup code for birdloop
*/
static void
pipe_new(int *pfds)
{
int rv = pipe(pfds);
if (rv < 0)
die("pipe: %m");
if (fcntl(pfds[0], F_SETFL, O_NONBLOCK) < 0)
die("fcntl(O_NONBLOCK): %m");
if (fcntl(pfds[1], F_SETFL, O_NONBLOCK) < 0)
die("fcntl(O_NONBLOCK): %m");
}
void
pipe_drain(int fd)
{
char buf[64];
int rv;
try:
rv = read(fd, buf, 64);
if (rv < 0)
{
if (errno == EINTR)
goto try;
if (errno == EAGAIN)
return;
die("wakeup read: %m");
}
if (rv == 64)
goto try;
}
void
pipe_kick(int fd)
{
u64 v = 1;
int rv;
try:
rv = write(fd, &v, sizeof(u64));
if (rv < 0)
{
if (errno == EINTR)
goto try;
if (errno == EAGAIN)
return;
die("wakeup write: %m");
}
}
static inline void
wakeup_init(struct birdloop *loop)
{
pipe_new(loop->wakeup_fds);
}
static inline void
wakeup_drain(struct birdloop *loop)
{
pipe_drain(loop->wakeup_fds[0]);
}
static inline void
wakeup_do_kick(struct birdloop *loop)
{
pipe_kick(loop->wakeup_fds[1]);
}
void
birdloop_ping(struct birdloop *loop)
{
u32 ping_sent = atomic_fetch_add_explicit(&loop->ping_sent, 1, memory_order_acq_rel);
if (ping_sent)
return;
if (loop == birdloop_wakeup_masked)
birdloop_wakeup_masked_count++;
else
wakeup_do_kick(loop);
}
/*
* Sockets
*/
static void
sockets_init(struct birdloop *loop)
{
init_list(&loop->sock_list);
loop->sock_num = 0;
BUFFER_INIT(loop->poll_sk, loop->pool, 4);
BUFFER_INIT(loop->poll_fd, loop->pool, 4);
loop->poll_changed = 1; /* add wakeup fd */
}
static void
sockets_add(struct birdloop *loop, sock *s)
{
add_tail(&loop->sock_list, &s->n);
loop->sock_num++;
s->index = -1;
loop->poll_changed = 1;
birdloop_ping(loop);
}
void
sk_start(sock *s)
{
ASSERT_DIE(birdloop_current != &main_birdloop);
sockets_add(birdloop_current, s);
}
static void
sockets_remove(struct birdloop *loop, sock *s)
{
rem_node(&s->n);
loop->sock_num--;
if (s->index >= 0)
{
loop->poll_sk.data[s->index] = NULL;
s->index = -1;
loop->poll_changed = 1;
loop->close_scheduled = 1;
birdloop_ping(loop);
}
else
close(s->fd);
}
void
sk_stop(sock *s)
{
sockets_remove(birdloop_current, s);
}
static inline uint sk_want_events(sock *s)
{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
/*
FIXME: this should be called from sock code
static void
sockets_update(struct birdloop *loop, sock *s)
{
if (s->index >= 0)
loop->poll_fd.data[s->index].events = sk_want_events(s);
}
*/
static void
sockets_prepare(struct birdloop *loop)
{
BUFFER_SET(loop->poll_sk, loop->sock_num + 1);
BUFFER_SET(loop->poll_fd, loop->sock_num + 1);
struct pollfd *pfd = loop->poll_fd.data;
sock **psk = loop->poll_sk.data;
uint i = 0;
node *n;
WALK_LIST(n, loop->sock_list)
{
sock *s = SKIP_BACK(sock, n, n);
ASSERT(i < loop->sock_num);
s->index = i;
*psk = s;
pfd->fd = s->fd;
pfd->events = sk_want_events(s);
pfd->revents = 0;
pfd++;
psk++;
i++;
}
ASSERT(i == loop->sock_num);
/* Add internal wakeup fd */
*psk = NULL;
pfd->fd = loop->wakeup_fds[0];
pfd->events = POLLIN;
pfd->revents = 0;
loop->poll_changed = 0;
}
static void
sockets_close_fds(struct birdloop *loop)
{
struct pollfd *pfd = loop->poll_fd.data;
sock **psk = loop->poll_sk.data;
int poll_num = loop->poll_fd.used - 1;
int i;
for (i = 0; i < poll_num; i++)
if (psk[i] == NULL)
close(pfd[i].fd);
loop->close_scheduled = 0;
}
int sk_read(sock *s, int revents);
int sk_write(sock *s);
static void
sockets_fire(struct birdloop *loop)
{
struct pollfd *pfd = loop->poll_fd.data;
sock **psk = loop->poll_sk.data;
int poll_num = loop->poll_fd.used - 1;
times_update();
/* Last fd is internal wakeup fd */
if (pfd[poll_num].revents & POLLIN)
wakeup_drain(loop);
int i;
for (i = 0; i < poll_num; pfd++, psk++, i++)
{
int e = 1;
if (! pfd->revents)
continue;
if (pfd->revents & POLLNVAL)
die("poll: invalid fd %d", pfd->fd);
if (pfd->revents & POLLIN)
while (e && *psk && (*psk)->rx_hook)
e = sk_read(*psk, pfd->revents);
e = 1;
if (pfd->revents & POLLOUT)
{
loop->poll_changed = 1;
while (e && *psk)
e = sk_write(*psk);
}
}
}
/*
* Birdloop
*/
struct birdloop main_birdloop;
static void birdloop_enter_locked(struct birdloop *loop);
void
birdloop_init(void)
{
wakeup_init(&main_birdloop);
main_birdloop.time.domain = the_bird_domain.the_bird;
main_birdloop.time.loop = &main_birdloop;
times_update();
timers_init(&main_birdloop.time, &root_pool);
birdloop_enter_locked(&main_birdloop);
}
static void birdloop_main(void *arg);
struct birdloop *
birdloop_new(pool *pp, uint order, const char *name)
{
struct domain_generic *dg = domain_new(name, order);
pool *p = rp_new(pp, name);
struct birdloop *loop = mb_allocz(p, sizeof(struct birdloop));
loop->pool = p;
loop->time.domain = dg;
loop->time.loop = loop;
birdloop_enter(loop);
wakeup_init(loop);
ev_init_list(&loop->event_list, loop, name);
timers_init(&loop->time, p);
sockets_init(loop);
loop->time.coro = coro_run(p, birdloop_main, loop);
birdloop_leave(loop);
return loop;
}
static void
birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
{
loop->stopped = stopped;
loop->stop_data = data;
wakeup_do_kick(loop);
}
void
birdloop_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
{
DG_LOCK(loop->time.domain);
birdloop_do_stop(loop, stopped, data);
DG_UNLOCK(loop->time.domain);
}
void
birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *data)
{
ASSERT_DIE(loop == birdloop_current);
ASSERT_DIE(DG_IS_LOCKED(loop->time.domain));
birdloop_do_stop(loop, stopped, data);
}
void
birdloop_free(struct birdloop *loop)
{
ASSERT_DIE(loop->links == 0);
domain_free(loop->time.domain);
rfree(loop->pool);
}
static void
birdloop_enter_locked(struct birdloop *loop)
{
ASSERT_DIE(DG_IS_LOCKED(loop->time.domain));
ASSERT_DIE(!birdloop_inside(loop));
/* Store the old context */
loop->prev_loop = birdloop_current;
/* Put the new context */
birdloop_current = loop;
}
void
birdloop_enter(struct birdloop *loop)
{
DG_LOCK(loop->time.domain);
return birdloop_enter_locked(loop);
}
static void
birdloop_leave_locked(struct birdloop *loop)
{
/* Check the current context */
ASSERT_DIE(birdloop_current == loop);
/* Restore the old context */
birdloop_current = loop->prev_loop;
}
void
birdloop_leave(struct birdloop *loop)
{
birdloop_leave_locked(loop);
DG_UNLOCK(loop->time.domain);
}
void
birdloop_mask_wakeups(struct birdloop *loop)
{
ASSERT_DIE(birdloop_wakeup_masked == NULL);
birdloop_wakeup_masked = loop;
}
void
birdloop_unmask_wakeups(struct birdloop *loop)
{
ASSERT_DIE(birdloop_wakeup_masked == loop);
birdloop_wakeup_masked = NULL;
if (birdloop_wakeup_masked_count)
wakeup_do_kick(loop);
birdloop_wakeup_masked_count = 0;
}
void
birdloop_link(struct birdloop *loop)
{
ASSERT_DIE(birdloop_inside(loop));
loop->links++;
}
void
birdloop_unlink(struct birdloop *loop)
{
ASSERT_DIE(birdloop_inside(loop));
loop->links--;
}
static void
birdloop_main(void *arg)
{
struct birdloop *loop = arg;
timer *t;
int rv, timeout;
btime loop_begin = current_time();
tmp_init(loop->pool);
birdloop_enter(loop);
while (1)
{
timers_fire(&loop->time, 0);
if (ev_run_list(&loop->event_list))
timeout = 0;
else if (t = timers_first(&loop->time))
timeout = (tm_remains(t) TO_MS) + 1;
else
timeout = -1;
if (loop->poll_changed)
sockets_prepare(loop);
btime duration = current_time() - loop_begin;
if (duration > config->watchdog_warning)
log(L_WARN "I/O loop cycle took %d ms", (int) (duration TO_MS));
birdloop_leave(loop);
try:
rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout);
if (rv < 0)
{
if (errno == EINTR || errno == EAGAIN)
goto try;
die("poll: %m");
}
birdloop_enter(loop);
if (loop->close_scheduled)
sockets_close_fds(loop);
if (loop->stopped)
break;
loop_begin = current_time();
if (rv)
sockets_fire(loop);
atomic_exchange_explicit(&loop->ping_sent, 0, memory_order_acq_rel);
}
/* Flush remaining events */
ASSERT_DIE(!ev_run_list(&loop->event_list));
/* No timers allowed */
ASSERT_DIE(timers_count(&loop->time) == 0);
ASSERT_DIE(EMPTY_LIST(loop->sock_list));
ASSERT_DIE(loop->sock_num == 0);
birdloop_leave(loop);
loop->stopped(loop->stop_data);
}

35
sysdep/unix/io-loop.h Normal file
View File

@@ -0,0 +1,35 @@
/*
* BIRD -- I/O and event loop
*
* Can be freely distributed and used under the terms of the GNU GPL.
*/
#ifndef _BIRD_SYSDEP_UNIX_IO_LOOP_H_
#define _BIRD_SYSDEP_UNIX_IO_LOOP_H_
struct birdloop
{
pool *pool;
struct timeloop time;
event_list event_list;
list sock_list;
uint sock_num;
BUFFER(sock *) poll_sk;
BUFFER(struct pollfd) poll_fd;
u8 poll_changed;
u8 close_scheduled;
_Atomic u32 ping_sent;
int wakeup_fds[2];
uint links;
void (*stopped)(void *data);
void *stop_data;
struct birdloop *prev_loop;
};
#endif

View File

@@ -43,6 +43,7 @@
#include "conf/conf.h"
#include "sysdep/unix/unix.h"
#include "sysdep/unix/io-loop.h"
#include CONFIG_INCLUDE_SYSIO_H
/* Maximum number of calls of tx handler for one socket in one
@@ -800,18 +801,16 @@ sk_free(resource *r)
sk_ssh_free(s);
#endif
if (s->fd < 0)
if ((s->fd < 0) || (s->flags & SKF_THREAD))
return;
/* FIXME: we should call sk_stop() for SKF_THREAD sockets */
if (!(s->flags & SKF_THREAD))
{
if (s == current_sock)
current_sock = sk_next(s);
if (s == stored_sock)
stored_sock = sk_next(s);
if (s == current_sock)
current_sock = sk_next(s);
if (s == stored_sock)
stored_sock = sk_next(s);
if (enlisted(&s->n))
rem_node(&s->n);
}
if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE)
close(s->fd);
@@ -1104,7 +1103,11 @@ sk_passive_connected(sock *s, int type)
return 1;
}
sk_insert(t);
if (s->flags & SKF_PASSIVE_THREAD)
t->flags |= SKF_THREAD;
else
sk_insert(t);
sk_alloc_bufs(t);
s->rx_hook(t, 0);
return 1;
@@ -1512,6 +1515,36 @@ sk_open_unix(sock *s, char *name)
return 0;
}
static void
sk_reloop_hook(void *_vs)
{
sock *s = _vs;
if (birdloop_inside(&main_birdloop))
{
s->flags &= ~SKF_THREAD;
sk_insert(s);
}
else
{
s->flags |= SKF_THREAD;
sk_start(s);
}
}
void
sk_reloop(sock *s, struct birdloop *loop)
{
if (enlisted(&s->n))
rem_node(&s->n);
s->reloop = (event) {
.hook = sk_reloop_hook,
.data = s,
};
ev_send_loop(loop, &s->reloop);
}
#define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \
CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL)
@@ -2164,8 +2197,9 @@ void
io_init(void)
{
init_list(&sock_list);
init_list(&global_event_list);
init_list(&global_work_list);
ev_init_list(&global_event_list, &main_birdloop, "Global event list");
ev_init_list(&global_work_list, &main_birdloop, "Global work list");
ev_init_list(&main_birdloop.event_list, &main_birdloop, "Global fast event list");
krt_io_init();
// XXX init_times();
// XXX update_times();
@@ -2179,14 +2213,7 @@ static int short_loops = 0;
#define SHORT_LOOP_MAX 10
#define WORK_EVENTS_MAX 10
static int poll_reload_pipe[2];
void
io_loop_reload(void)
{
char b;
write(poll_reload_pipe[1], &b, 1);
}
void pipe_drain(int fd);
void
io_loop(void)
@@ -2199,21 +2226,19 @@ io_loop(void)
int fdmax = 256;
struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd));
if (pipe(poll_reload_pipe) < 0)
die("pipe(poll_reload_pipe) failed: %m");
watchdog_start1();
for(;;)
{
times_update();
events = ev_run_list(&global_event_list);
events = ev_run_list_limited(&global_work_list, WORK_EVENTS_MAX) || events;
timers_fire(&main_timeloop);
events = ev_run_list(&main_birdloop.event_list) || events;
timers_fire(&main_birdloop.time, 1);
io_close_event();
// FIXME
poll_tout = (events ? 0 : 3000); /* Time in milliseconds */
if (t = timers_first(&main_timeloop))
if (t = timers_first(&main_birdloop.time))
{
times_update();
timeout = (tm_remains(t) TO_MS) + 1;
@@ -2221,7 +2246,7 @@ io_loop(void)
}
/* A hack to reload main io_loop() when something has changed asynchronously. */
pfd[0].fd = poll_reload_pipe[0];
pfd[0].fd = main_birdloop.wakeup_fds[0];
pfd[0].events = POLLIN;
nfds = 1;
@@ -2284,9 +2309,9 @@ io_loop(void)
/* And finally enter poll() to find active sockets */
watchdog_stop();
the_bird_unlock();
birdloop_leave(&main_birdloop);
pout = poll(pfd, nfds, poll_tout);
the_bird_lock();
birdloop_enter(&main_birdloop);
watchdog_start();
if (pout < 0)
@@ -2300,8 +2325,8 @@ io_loop(void)
if (pfd[0].revents & POLLIN)
{
/* IO loop reload requested */
char b;
read(poll_reload_pipe[0], &b, 1);
pipe_drain(main_birdloop.wakeup_fds[0]);
atomic_exchange_explicit(&main_birdloop.ping_sent, 0, memory_order_acq_rel);
continue;
}

View File

@@ -878,10 +878,12 @@ main(int argc, char **argv)
parse_args(argc, argv);
log_switch(1, NULL, NULL);
the_bird_lock();
random_init();
net_init();
resource_init();
timer_init();
birdloop_init();
olock_init();
rt_init();
io_init();
@@ -929,7 +931,6 @@ main(int argc, char **argv)
dup2(0, 2);
}
the_bird_lock();
main_thread_init();

View File

@@ -106,7 +106,6 @@ extern volatile sig_atomic_t async_shutdown_flag;
void io_init(void);
void io_loop(void);
void io_loop_reload(void);
void io_log_dump(void);
int sk_open_unix(struct birdsock *s, char *name);
struct rfile *rf_open(struct pool *, const char *name, const char *mode);