1
0
mirror of https://github.com/alice-lg/alice-lg.git synced 2024-05-11 05:55:03 +00:00
alice-lg-alice-lg/pkg/store/routes_store.go

308 lines
7.2 KiB
Go
Raw Normal View History

2021-10-20 18:36:51 +00:00
package store
2017-06-23 11:12:24 +02:00
import (
2021-11-19 22:21:18 +01:00
"context"
2022-01-10 17:13:00 +01:00
"errors"
2017-06-23 11:12:24 +02:00
"log"
2021-11-24 11:48:49 +01:00
"math/rand"
2017-06-23 11:12:24 +02:00
"time"
2017-07-14 14:18:47 +02:00
2021-03-22 17:35:20 +01:00
"github.com/alice-lg/alice-lg/pkg/api"
2021-10-20 20:26:37 +00:00
"github.com/alice-lg/alice-lg/pkg/config"
2022-01-10 17:13:00 +01:00
"github.com/alice-lg/alice-lg/pkg/sources"
2017-06-23 11:12:24 +02:00
)
2021-11-19 22:21:18 +01:00
// RoutesStoreBackend interface
type RoutesStoreBackend interface {
// SetRoutes updates the routes in the store after a refresh.
SetRoutes(
ctx context.Context,
sourceID string,
2021-12-02 16:11:15 +01:00
routes api.LookupRoutes,
2021-11-19 22:21:18 +01:00
) error
2021-11-24 11:48:49 +01:00
// CountRoutesAt returns the number of imported
// and filtered routes for a given route server.
// Example: (imported, filtered, error)
CountRoutesAt(
ctx context.Context,
2021-11-24 17:59:32 +01:00
sourceID string,
2021-12-02 16:11:15 +01:00
) (uint, uint, error)
2021-11-24 17:59:32 +01:00
2021-12-03 18:17:41 +01:00
// FindByNeighbors retrieves the prefixes
2021-11-24 17:59:32 +01:00
// announced by the neighbor at a given source
2021-12-03 18:17:41 +01:00
FindByNeighbors(
2021-11-24 17:59:32 +01:00
ctx context.Context,
2021-12-03 18:17:41 +01:00
neighborIDs []string,
) (api.LookupRoutes, error)
// FindByPrefix
FindByPrefix(
ctx context.Context,
prefix string,
2021-11-24 17:59:32 +01:00
) (api.LookupRoutes, error)
2021-11-19 22:21:18 +01:00
}
2021-03-22 17:35:20 +01:00
// The RoutesStore holds a mapping of routes,
2021-10-20 20:26:37 +00:00
// status and cfgs and will be queried instead
2021-03-22 17:35:20 +01:00
// of a backend by the API
2017-06-23 11:12:24 +02:00
type RoutesStore struct {
2021-12-02 16:11:15 +01:00
backend RoutesStoreBackend
sources *SourcesStore
2021-11-24 17:59:32 +01:00
neighbors *NeighborsStore
2017-06-23 11:12:24 +02:00
}
2021-03-22 17:35:20 +01:00
// NewRoutesStore makes a new store instance
2021-10-20 20:26:37 +00:00
// with a cfg.
func NewRoutesStore(
2021-12-02 16:11:15 +01:00
neighbors *NeighborsStore,
2021-10-22 22:17:04 +02:00
cfg *config.Config,
2021-11-19 22:21:18 +01:00
backend RoutesStoreBackend,
2021-10-20 20:26:37 +00:00
) *RoutesStore {
2018-07-07 11:51:13 +02:00
// Set refresh interval as duration, fall back to
// five minutes if no interval is set.
refreshInterval := time.Duration(
2021-10-20 20:26:37 +00:00
cfg.Server.RoutesStoreRefreshInterval) * time.Minute
2018-07-07 11:51:13 +02:00
if refreshInterval == 0 {
refreshInterval = time.Duration(5) * time.Minute
}
2021-11-19 22:21:18 +01:00
2021-11-24 11:48:49 +01:00
log.Println("Routes refresh interval set to:", refreshInterval)
2021-11-19 22:21:18 +01:00
// Store refresh information per store
sources := NewSourcesStore(cfg, refreshInterval)
2017-06-23 11:12:24 +02:00
store := &RoutesStore{
2021-12-02 16:11:15 +01:00
backend: backend,
sources: sources,
neighbors: neighbors,
2017-06-23 11:12:24 +02:00
}
return store
}
2021-03-22 17:35:20 +01:00
// Start starts the routes store
2021-11-19 22:21:18 +01:00
func (s *RoutesStore) Start() {
2017-06-23 11:12:24 +02:00
log.Println("Starting local routes store")
2021-11-19 22:21:18 +01:00
// Periodically trigger updates
2017-06-23 16:25:21 +02:00
for {
2021-11-19 22:21:18 +01:00
s.update()
time.Sleep(time.Second)
2017-06-23 16:25:21 +02:00
}
2017-06-23 11:12:24 +02:00
}
2021-11-24 11:48:49 +01:00
// Update all routes from all sources, where the
// sources last refresh is longer ago than the configured
// refresh period. This is totally the same as the
// NeighborsStore.update and maybe these functions can be merged (TODO)
2021-11-19 22:21:18 +01:00
func (s *RoutesStore) update() {
2021-11-24 11:48:49 +01:00
for _, id := range s.sources.GetSourceIDs() {
go s.safeUpdateSource(id)
}
}
2018-10-01 12:02:14 +02:00
2021-11-24 11:48:49 +01:00
// safeUpdateSource will try to update a source but
// will recover from a panic if something goes wrong.
// In that case, the LastError and State will be updated.
// Again. The similarity to the NeighborsStore is really sus.
func (s *RoutesStore) safeUpdateSource(id string) {
ctx := context.TODO()
2017-06-23 18:01:49 +02:00
2021-11-24 11:48:49 +01:00
if !s.sources.ShouldRefresh(id) {
return // Nothing to do here
}
2017-06-23 12:36:32 +02:00
2021-11-24 11:48:49 +01:00
if err := s.sources.LockSource(id); err != nil {
log.Println("Cloud not start routes refresh:", err)
return
}
2017-06-23 12:36:32 +02:00
2021-11-24 11:48:49 +01:00
// Apply jitter so, we do not hit everything at once.
// TODO: Make configurable
time.Sleep(time.Duration(rand.Intn(30)) * time.Second)
2021-12-02 16:11:15 +01:00
src := s.sources.Get(id)
2021-11-24 11:48:49 +01:00
// Prepare for impact.
defer func() {
if err := recover(); err != nil {
2018-10-01 12:02:14 +02:00
log.Println(
2021-11-24 11:48:49 +01:00
"Recovering after failed routes refresh of",
2021-12-02 16:11:15 +01:00
src.Name, "from:", err)
2021-11-24 11:48:49 +01:00
s.sources.RefreshError(id, err)
2017-06-23 11:12:24 +02:00
}
2021-11-24 11:48:49 +01:00
}()
2017-06-23 12:36:32 +02:00
2021-12-02 16:11:15 +01:00
if err := s.updateSource(ctx, src); err != nil {
2021-11-24 11:48:49 +01:00
log.Println(
2021-12-02 16:11:15 +01:00
"Refeshing routes of", src.Name, "failed:", err)
2021-11-24 11:48:49 +01:00
s.sources.RefreshError(id, err)
}
}
2018-10-01 12:13:40 +02:00
2021-11-24 11:48:49 +01:00
// Update all routes
func (s *RoutesStore) updateSource(
ctx context.Context,
2021-12-02 16:11:15 +01:00
src *config.SourceConfig,
2021-11-24 11:48:49 +01:00
) error {
2021-12-02 16:11:15 +01:00
rs := src.GetInstance()
res, err := rs.AllRoutes()
2021-11-24 11:48:49 +01:00
if err != nil {
return err
2017-06-23 11:12:24 +02:00
}
2018-10-01 12:02:14 +02:00
2021-12-03 18:17:41 +01:00
if err := s.awaitNeighborStore(ctx, src.ID); err != nil {
return err
}
2021-11-24 17:59:32 +01:00
// Prepare imported routes for lookup
2021-12-08 14:15:55 +01:00
imported := s.routesToLookupRoutes(ctx, "imported", src, res.Imported)
filtered := s.routesToLookupRoutes(ctx, "filtered", src, res.Filtered)
2021-12-02 16:11:15 +01:00
lookupRoutes := append(imported, filtered...)
if err = s.backend.SetRoutes(ctx, src.ID, lookupRoutes); err != nil {
return err
}
return s.sources.RefreshSuccess(src.ID)
}
2021-12-03 18:17:41 +01:00
// awaitNeighborStore polls the neighbor store state
// for the sourceID until the context is not longer valid.
func (s *RoutesStore) awaitNeighborStore(
ctx context.Context,
srcID string,
) error {
for {
err := ctx.Err()
if err != nil {
return err
}
if s.neighbors.IsInitialized(srcID) {
return nil
}
time.Sleep(100 * time.Millisecond)
}
}
2021-12-02 16:11:15 +01:00
func (s *RoutesStore) routesToLookupRoutes(
2021-12-08 14:15:55 +01:00
ctx context.Context,
2021-12-02 16:11:15 +01:00
state string,
src *config.SourceConfig,
routes api.Routes,
) api.LookupRoutes {
2021-11-24 17:59:32 +01:00
lookupRoutes := make(api.LookupRoutes, 0, len(routes))
for _, route := range routes {
2021-12-08 14:15:55 +01:00
neighbor, err := s.neighbors.GetNeighborAt(ctx, src.ID, route.NeighborID)
2021-12-02 16:11:15 +01:00
if err != nil {
log.Println("prepare route, neighbor lookup failed:", err)
continue
}
2021-12-08 16:16:39 +01:00
if neighbor == nil {
log.Println("prepare route, neighbor not found:", route.NeighborID)
continue
}
2021-11-24 17:59:32 +01:00
lr := &api.LookupRoute{
Route: route,
State: state,
Neighbor: neighbor,
RouteServer: &api.RouteServer{
ID: src.ID,
Name: src.Name,
},
}
2021-12-03 18:17:41 +01:00
lr.Route.Details = nil
2021-11-24 17:59:32 +01:00
lookupRoutes = append(lookupRoutes, lr)
}
2021-12-02 16:11:15 +01:00
return lookupRoutes
}
2021-11-24 17:59:32 +01:00
2021-03-22 17:35:20 +01:00
// Stats calculates some store insights
2021-11-19 22:21:18 +01:00
func (s *RoutesStore) Stats() *api.RoutesStoreStats {
2021-11-24 11:48:49 +01:00
ctx := context.TODO()
2021-12-02 16:11:15 +01:00
totalImported := uint(0)
totalFiltered := uint(0)
2017-06-23 12:55:08 +02:00
2021-10-22 22:51:11 +02:00
rsStats := []api.RouteServerRoutesStats{}
2017-06-23 12:55:08 +02:00
2021-11-24 11:48:49 +01:00
for _, sourceID := range s.sources.GetSourceIDs() {
status, err := s.sources.GetStatus(sourceID)
if err != nil {
log.Println("error while getting source status:", err)
continue
}
2021-12-02 16:11:15 +01:00
src := s.sources.Get(sourceID)
nImported, nFiltered, err := s.backend.CountRoutesAt(ctx, sourceID)
2021-11-24 11:48:49 +01:00
if err != nil {
2022-01-10 17:13:00 +01:00
if !errors.Is(err, sources.ErrSourceNotFound) {
log.Println("error during routes count:", err)
}
2021-11-24 11:48:49 +01:00
}
2017-06-23 12:55:08 +02:00
2021-11-24 11:48:49 +01:00
totalImported += nImported
totalFiltered += nFiltered
2017-06-23 12:55:08 +02:00
2021-10-22 22:51:11 +02:00
serverStats := api.RouteServerRoutesStats{
2021-12-02 16:11:15 +01:00
Name: src.Name,
2021-10-22 22:51:11 +02:00
Routes: api.RoutesStats{
2021-11-24 11:48:49 +01:00
Imported: nImported,
Filtered: nFiltered,
2017-06-23 12:55:08 +02:00
},
2021-11-24 11:48:49 +01:00
State: status.State.String(),
2017-06-23 12:55:08 +02:00
UpdatedAt: status.LastRefresh,
}
2021-12-03 18:17:41 +01:00
rsStats = append(rsStats, serverStats)
2017-06-23 12:55:08 +02:00
}
// Make stats
2021-10-22 22:51:11 +02:00
storeStats := &api.RoutesStoreStats{
TotalRoutes: api.RoutesStats{
2017-06-23 12:55:08 +02:00
Imported: totalImported,
Filtered: totalFiltered,
},
RouteServers: rsStats,
}
return storeStats
}
2021-12-07 19:11:11 +01:00
// CachedAt returns the time of the oldest partial
2021-12-06 10:03:17 +01:00
// refresh of the dataset.
func (s *RoutesStore) CachedAt(
ctx context.Context,
2021-12-07 19:11:11 +01:00
) time.Time {
2021-12-06 10:03:17 +01:00
return s.sources.CachedAt(ctx)
2018-09-25 21:26:11 +02:00
}
2021-03-22 17:35:20 +01:00
// CacheTTL returns the TTL time
2021-12-07 19:11:11 +01:00
func (s *RoutesStore) CacheTTL(
ctx context.Context,
) time.Time {
2021-12-06 10:03:17 +01:00
return s.sources.NextRefresh(ctx)
2018-09-25 21:26:11 +02:00
}
2021-03-22 17:35:20 +01:00
// LookupPrefix performs a lookup over all route servers
2021-12-03 18:17:41 +01:00
func (s *RoutesStore) LookupPrefix(
ctx context.Context,
prefix string,
) (api.LookupRoutes, error) {
return s.backend.FindByPrefix(ctx, prefix)
2017-06-30 14:15:43 +02:00
}
2021-10-15 21:24:24 +02:00
// LookupPrefixForNeighbors returns all routes for
2021-11-24 11:48:49 +01:00
// a set of neighbors.
2021-11-19 22:21:18 +01:00
func (s *RoutesStore) LookupPrefixForNeighbors(
2021-12-03 18:17:41 +01:00
ctx context.Context,
2021-10-15 21:24:24 +02:00
neighbors api.NeighborsLookupResults,
2021-12-03 18:17:41 +01:00
) (api.LookupRoutes, error) {
neighborIDs := []string{}
for _, rs := range neighbors {
for _, neighbor := range rs {
neighborIDs = append(neighborIDs, neighbor.ID)
2017-06-30 14:15:43 +02:00
}
2017-06-23 16:11:47 +02:00
}
2017-06-23 11:12:24 +02:00
2021-12-03 18:17:41 +01:00
return s.backend.FindByNeighbors(ctx, neighborIDs)
2017-06-23 11:12:24 +02:00
}