/* * BIRD -- Route Export Mechanisms * * (c) 2024 Maria Matejka * * Can be freely distributed and used under the terms of the GNU GPL. */ #include "nest/bird.h" #include "nest/route.h" _Thread_local static struct rt_export_union reu; static inline enum rt_export_state rt_export_change_state(struct rt_export_receiver *r, u32 expected_mask, enum rt_export_state state) { r->last_state_change = current_time(); enum rt_export_state old = atomic_exchange_explicit(&r->export_state, state, memory_order_acq_rel); if (!((1 << old) & expected_mask)) bug("Unexpected export state change from %s to %s, expected mask %02x", rt_export_state_name(old), rt_export_state_name(state), expected_mask ); return old; } const struct rt_export_union * rt_export_get(struct rt_export_receiver *r) { switch (rt_export_get_state_r(r)) { case TES_DOWN: bug("Tried to get an export from a stopped receiver"); case TES_STOP: reu.kind = RT_EXPORT_STOP; return &reu; case TES_PARTIAL: case TES_FEEDING: /* TODO: implement feeds */ bug("not implemented yet"); case TES_READY: break; case TES_HUNGRY: case TES_MAX: bug("invalid export state"); } /* Process sequence number reset event */ if (lfjour_reset_seqno(&r->r)) bmap_reset(&r->seq_map, 4); reu.update = SKIP_BACK(struct rt_export_item, li, lfjour_get(&r->r)); if (!reu.update) return NULL; reu.kind = RT_EXPORT_UPDATE; if (bmap_test(&r->seq_map, reu.update->seq)) { // log(L_DEBUG "seen, ignoring %p", reu.update); rt_export_release(r, &reu); return rt_export_get(r); } else { // log(L_DEBUG "getting %p", reu.update); return &reu; } } void rt_export_release(struct rt_export_receiver *r, const struct rt_export_union *u) { ASSERT_DIE(u == &reu); switch (reu.kind) { case RT_EXPORT_UPDATE: // log(L_DEBUG "releaseing %p", u.update); lfjour_release(&r->r); reu.kind = RT_EXPORT_NONE; return; case RT_EXPORT_FEED: bug("not implemented yet"); case RT_EXPORT_STOP: /* Checking that we have indeed stopped the exporter */ ASSERT_DIE(rt_export_get_state_r(r) == TES_DOWN); break; default: bug("strange export kind"); } } void rt_export_processed(struct rt_export_receiver *r, const struct rt_export_item *u) { // log(L_DEBUG "processed %p", u); /* Check sequence number reset event */ if (lfjour_reset_seqno(&r->r)) bmap_reset(&r->seq_map, 4); ASSERT_DIE(!bmap_test(&r->seq_map, u->seq)); bmap_set(&r->seq_map, u->seq); } void rt_export_subscribe(struct rt_exporter *e, struct rt_export_receiver *r) { rt_export_change_state(r, BIT32_ALL(TES_DOWN), TES_FEEDING); lfjour_register(&e->journal, &r->r); r->stats = (struct rt_export_stats) {}; r->last_state_change = current_time(); bmap_init(&r->seq_map, r->pool, 4); /* We should init feeding but this is not implemented yet */ rt_export_change_state(r, BIT32_ALL(TES_FEEDING), TES_READY); } void rt_export_unsubscribe(struct rt_exporter *e UNUSED, struct rt_export_receiver *r) { switch (rt_export_change_state(r, BIT32_ALL(TES_FEEDING, TES_PARTIAL, TES_READY, TES_STOP), TES_DOWN)) { case TES_FEEDING: case TES_PARTIAL: case TES_READY: case TES_STOP: lfjour_unregister(&r->r); break; default: bug("not implemented"); } } static void rt_exporter_cleanup_done(struct lfjour *j, u64 begin_seq UNUSED, u64 end_seq UNUSED) { SKIP_BACK_DECLARE(struct rt_exporter, e, journal, j); /* TODO: log the begin_seq / end_seq values */ CALL(e->cleanup_done, e); if (e->stopped && (lfjour_count_recipients(j) == 0)) { settle_cancel(&j->announce_timer); ev_postpone(&j->cleanup_event); e->stopped(e); } } void rt_exporter_init(struct rt_exporter *e, struct settle_config *scf) { e->journal.cleanup_done = rt_exporter_cleanup_done; lfjour_init(&e->journal, scf); } const struct rt_export_item * rt_exporter_push(struct rt_exporter *e, const struct rt_export_item *uit) { /* Get the object */ SKIP_BACK_DECLARE(struct rt_export_item, it, li, lfjour_push_prepare(&e->journal)); /* Copy the data, keeping the header */ memcpy(&it->data, &uit->data, e->journal.item_size - OFFSETOF(struct rt_export_item, data)); /* Commit the update */ /* log(L_DEBUG "pushing %p %016lx %016lx %016lx %016lx", it, ((u64 *)it->data)[0], ((u64 *)it->data)[1], ((u64 *)it->data)[2], ((u64 *)it->data)[3]);*/ lfjour_push_commit(&e->journal); /* Return the update pointer */ return it; } void rt_exporter_shutdown(struct rt_exporter *e, void (*stopped)(struct rt_exporter *)) { /* We have to tell every receiver to stop */ _Bool done = 1; WALK_TLIST(lfjour_recipient, r, &e->journal.recipients) { done = 0; rt_export_change_state( SKIP_BACK(struct rt_export_receiver, r, r), BIT32_ALL(TES_FEEDING, TES_PARTIAL, TES_READY, TES_STOP), TES_STOP); } /* The rest is done via the cleanup routine */ lfjour_do_cleanup_now(&e->journal); if (done) { ev_postpone(&e->journal.cleanup_event); settle_cancel(&e->journal.announce_timer); CALL(stopped, e); } else // e->stopped = stopped; bug("not implemented yet"); }