From 9d93f0c9c117d2b7ab15e25ad9a0f8b287fdef1d Mon Sep 17 00:00:00 2001 From: Annika Hannig Date: Tue, 8 Feb 2022 21:55:26 +0100 Subject: [PATCH 1/4] initial limit on parallelism --- cmd/alice-lg/main.go | 3 +- etc/alice-lg/alice.example.conf | 18 +++++++- pkg/config/config.go | 24 +++++----- pkg/config/testdata/alice.conf | 12 +++++ pkg/store/neighbors_store.go | 9 +++- pkg/store/routes_store.go | 7 ++- pkg/store/sources_store.go | 78 +++++++++++++++++++++++++++++---- 7 files changed, 126 insertions(+), 25 deletions(-) diff --git a/cmd/alice-lg/main.go b/cmd/alice-lg/main.go index ba5a994..9e27aeb 100644 --- a/cmd/alice-lg/main.go +++ b/cmd/alice-lg/main.go @@ -25,7 +25,6 @@ func createHeapProfile(filename string) { log.Fatal("could not create memory profile: ", err) } defer f.Close() // error handling omitted for example - runtime.GC() // get up-to-date statistics if err := pprof.WriteHeapProfile(f); err != nil { log.Fatal("could not write memory profile: ", err) } @@ -37,7 +36,6 @@ func createAllocProfile(filename string) { log.Fatal("could not create alloc profile: ", err) } defer f.Close() // error handling omitted for example - runtime.GC() // get up-to-date statistics if err := pprof.Lookup("allocs").WriteTo(f, 0); err != nil { log.Fatal("could not write alloc profile: ", err) } @@ -47,6 +45,7 @@ func startMemoryProfile(prefix string) { for { t := 0 filename := fmt.Sprintf("%s-heap-%03d", prefix, t) + runtime.GC() // get up-to-date statistics (according to docs) createHeapProfile(filename) log.Println("wrote memory heap profile:", filename) filename = fmt.Sprintf("%s-allocs-%03d", prefix, t) diff --git a/etc/alice-lg/alice.example.conf b/etc/alice-lg/alice.example.conf index 6105c75..c63aebb 100644 --- a/etc/alice-lg/alice.example.conf +++ b/etc/alice-lg/alice.example.conf @@ -7,15 +7,31 @@ listen_http = 127.0.0.1:7340 # configures the built-in webserver timeout in seconds (default 120s) # http_timeout = 60 + # enable the prefix-lookup endpoint / the global search feature enable_prefix_lookup = true + # Try to refresh the neighbor status on every request to /neighbors enable_neighbors_status_refresh = false -asn = 9033 + # this ASN is used as a fallback value in the RPKI feature and for route # filtering evaluation with large BGP communities +asn = 9033 + store_backend = postgres +# how many route servers will be refreshed at the same time +# if set to 0 (or for the matter of fact 1), refresh will be +# sequential. +# Default: 1 +routes_store_refresh_parallelism = 5 +neighbors_store_refresh_parallelism = 10000 + +# how much time should pass between refreshes (in minutes) +# Default: 5 +routes_store_refresh_interval = 5 +neighbors_store_refresh_interval = 5 + [postgres] url = "postgres://postgres:postgres@localhost:5432/alice" min_connections = 2 diff --git a/pkg/config/config.go b/pkg/config/config.go index 4dbb381..8f200d2 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -73,14 +73,16 @@ const ( // A ServerConfig holds the runtime configuration // for the backend. type ServerConfig struct { - Listen string `ini:"listen_http"` - HTTPTimeout int `ini:"http_timeout"` - EnablePrefixLookup bool `ini:"enable_prefix_lookup"` - NeighborsStoreRefreshInterval int `ini:"neighbours_store_refresh_interval"` - RoutesStoreRefreshInterval int `ini:"routes_store_refresh_interval"` - StoreBackend string `ini:"store_backend"` - Asn int `ini:"asn"` - EnableNeighborsStatusRefresh bool `ini:"enable_neighbors_status_refresh"` + Listen string `ini:"listen_http"` + HTTPTimeout int `ini:"http_timeout"` + EnablePrefixLookup bool `ini:"enable_prefix_lookup"` + NeighborsStoreRefreshInterval int `ini:"neighbors_store_refresh_interval"` + NeighborsStoreRefreshParallelism int `ini:"neighbors_store_refresh_parallelism"` + RoutesStoreRefreshInterval int `ini:"routes_store_refresh_interval"` + RoutesStoreRefreshParallelism int `ini:"routes_store_refresh_parallelism"` + StoreBackend string `ini:"store_backend"` + Asn int `ini:"asn"` + EnableNeighborsStatusRefresh bool `ini:"enable_neighbors_status_refresh"` } // PostgresConfig is the configuration for the database @@ -840,8 +842,10 @@ func LoadConfig(file string) (*Config, error) { // Map sections server := ServerConfig{ - HTTPTimeout: DefaultHTTPTimeout, - StoreBackend: "memory", + HTTPTimeout: DefaultHTTPTimeout, + StoreBackend: "memory", + RoutesStoreRefreshParallelism: 1, + NeighborsStoreRefreshParallelism: 1, } if err := parsedConfig.Section("server").MapTo(&server); err != nil { return nil, err diff --git a/pkg/config/testdata/alice.conf b/pkg/config/testdata/alice.conf index 154a1c9..6d0ecab 100644 --- a/pkg/config/testdata/alice.conf +++ b/pkg/config/testdata/alice.conf @@ -13,6 +13,18 @@ asn = 9033 # this ASN is used as a fallback value in the RPKI feature and for route # filtering evaluation with large BGP communities +# how many route servers will be refreshed at the same time +# if set to 0 (or for the matter of fact 1), refresh will be +# sequential. +# Default: 1 +routes_store_refresh_parallelism = 5 +neighbors_store_refresh_parallelism = 10000 + +# how much time should pass between refreshes (in minutes) +# Default: 5 +routes_store_refresh_interval = 5 +neighbors_store_refresh_interval = 5 + store_backend = postgres [postgres] diff --git a/pkg/store/neighbors_store.go b/pkg/store/neighbors_store.go index fc78430..440fe3a 100644 --- a/pkg/store/neighbors_store.go +++ b/pkg/store/neighbors_store.go @@ -70,11 +70,16 @@ func NewNeighborsStore( if refreshInterval == 0 { refreshInterval = time.Duration(5) * time.Minute } + refreshParallelism := cfg.Server.NeighborsStoreRefreshParallelism + if refreshParallelism <= 0 { + refreshParallelism = 1 + } - log.Println("Neighbors Store refresh interval set to:", refreshInterval) + log.Println("Neighbors refresh interval set to:", refreshInterval) + log.Println("Neighbors refresh parallelism:", refreshParallelism) // Store refresh information per store - sources := NewSourcesStore(cfg, refreshInterval) + sources := NewSourcesStore(cfg, refreshInterval, refreshParallelism) // Neighbors will be refreshed on every GetNeighborsAt // invocation. Why? I (Annika) don't know. I have to ask Patrick. diff --git a/pkg/store/routes_store.go b/pkg/store/routes_store.go index ebd4d34..44c5845 100644 --- a/pkg/store/routes_store.go +++ b/pkg/store/routes_store.go @@ -66,11 +66,16 @@ func NewRoutesStore( if refreshInterval == 0 { refreshInterval = time.Duration(5) * time.Minute } + refreshParallelism := cfg.Server.NeighborsStoreRefreshParallelism + if refreshParallelism <= 0 { + refreshParallelism = 1 + } log.Println("Routes refresh interval set to:", refreshInterval) + log.Println("Routes refresh parallelism:", refreshParallelism) // Store refresh information per store - sources := NewSourcesStore(cfg, refreshInterval) + sources := NewSourcesStore(cfg, refreshInterval, refreshParallelism) store := &RoutesStore{ backend: backend, sources: sources, diff --git a/pkg/store/sources_store.go b/pkg/store/sources_store.go index 6eb2501..83f4740 100644 --- a/pkg/store/sources_store.go +++ b/pkg/store/sources_store.go @@ -3,6 +3,7 @@ package store import ( "context" "log" + "sort" "sync" "time" @@ -36,24 +37,45 @@ func (s State) String() string { return "INVALID" } -// Status defines a status the store can be in +// Status defines a status the store can be in. type Status struct { RefreshInterval time.Duration + RefreshParallelism int LastRefresh time.Time LastRefreshDuration time.Duration LastError interface{} State State Initialized bool + SourceID string lastRefreshStart time.Time } +// SourceStatusList is a sortable list of source status +type SourceStatusList []*Status + +// Len implements the sort interface +func (l SourceStatusList) Len() int { + return len(l) +} + +// Less implements the sort interface +func (l SourceStatusList) Less(i, j int) bool { + return l[i].lastRefreshStart.Before(l[j].lastRefreshStart) +} + +// Swap implements the sort interface +func (l SourceStatusList) Swap(i, j int) { + l[i], l[j] = l[j], l[i] +} + // SourcesStore provides methods for retrieving // the current status of a source. type SourcesStore struct { - refreshInterval time.Duration - status map[string]*Status - sources map[string]*config.SourceConfig + refreshInterval time.Duration + refreshParallelism int + status map[string]*Status + sources map[string]*config.SourceConfig sync.Mutex } @@ -61,6 +83,7 @@ type SourcesStore struct { func NewSourcesStore( cfg *config.Config, refreshInterval time.Duration, + refreshParallelism int, ) *SourcesStore { status := make(map[string]*Status) sources := make(map[string]*config.SourceConfig) @@ -71,13 +94,15 @@ func NewSourcesStore( sources[sourceID] = src status[sourceID] = &Status{ RefreshInterval: refreshInterval, + SourceID: sourceID, } } return &SourcesStore{ - status: status, - sources: sources, - refreshInterval: refreshInterval, + status: status, + sources: sources, + refreshInterval: refreshInterval, + refreshParallelism: refreshParallelism, } } @@ -111,8 +136,6 @@ func (s *SourcesStore) IsInitialized(sourceID string) (bool, error) { } // NextRefresh calculates the next refresh time -// TODO: I doubt the usefulness of these numbers. -// func (s *SourcesStore) NextRefresh( ctx context.Context, ) time.Time { @@ -209,6 +232,43 @@ func (s *SourcesStore) GetSourceIDs() []string { return ids } +// GetSourceIDsForRefresh will retrieve a list of source IDs, +// which are currently not locked, sorted by least refreshed. +// The number of sources returned is limited through the +// refresh parallelism parameter. +func (s *SourcesStore) GetSourceIDsForRefresh() []string { + s.Lock() + defer s.Unlock() + + locked := 0 + sources := make(SourceStatusList, 0, len(s.status)) + for _, status := range s.status { + sources = append(sources, status) + if status.State == StateBusy { + locked++ + } + } + + // Sort by refresh start time ascending + sort.Sort(sources) + + slots := s.refreshParallelism - locked + if slots <= 0 { + slots = 0 + } + + ids := make([]string, 0, slots) + i := 0 + for _, status := range sources { + if i >= slots { + break + } + ids = append(ids, status.SourceID) + i++ + } + return ids +} + // LockSource indicates the start of a refresh func (s *SourcesStore) LockSource(sourceID string) error { s.Lock() From a1f33d90f21ff678f8b85df691213e1f434825d4 Mon Sep 17 00:00:00 2001 From: Annika Hannig Date: Wed, 9 Feb 2022 11:56:03 +0100 Subject: [PATCH 2/4] added tests --- pkg/store/sources_store.go | 2 + pkg/store/sources_store_test.go | 99 ++++++++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/pkg/store/sources_store.go b/pkg/store/sources_store.go index 83f4740..bef8572 100644 --- a/pkg/store/sources_store.go +++ b/pkg/store/sources_store.go @@ -249,6 +249,8 @@ func (s *SourcesStore) GetSourceIDsForRefresh() []string { } } + log.Println(locked) + // Sort by refresh start time ascending sort.Sort(sources) diff --git a/pkg/store/sources_store_test.go b/pkg/store/sources_store_test.go index 86434cc..f7c62f0 100644 --- a/pkg/store/sources_store_test.go +++ b/pkg/store/sources_store_test.go @@ -1,11 +1,104 @@ package store -import "testing" +import ( + "testing" + "time" +) -func TestAddSource(t *testing.T) { +func TestGetSourceIDsForRefreshSequential(t *testing.T) { + + s := &SourcesStore{ + refreshParallelism: 1, + status: map[string]*Status{ + "src1": { + SourceID: "src1", + }, + "src2": { + SourceID: "src2", + }, + }, + } + + ids := s.GetSourceIDsForRefresh() + if len(ids) != 1 { + t.Error("expected 1 id") + } + if err := s.LockSource(ids[0]); err != nil { + t.Error(err) + } + lastID := ids[0] + + ids = s.GetSourceIDsForRefresh() + if len(ids) != 0 { + t.Error("all concurrent refresh slots should be taken") + } + + if err := s.RefreshSuccess(lastID); err != nil { + t.Error(err) + } + + ids = s.GetSourceIDsForRefresh() + if len(ids) != 1 { + t.Error("expected 1 id") + } + + if ids[0] == lastID { + t.Error("the next source should have been returned") + } } -func TestGetStatus(t *testing.T) { +func TestGetSourceIDsForRefreshParallel(t *testing.T) { + s := &SourcesStore{ + refreshParallelism: 2, + status: map[string]*Status{ + "src1": { + SourceID: "src1", + }, + "src2": { + SourceID: "src2", + }, + "src3": { + SourceID: "src3", + lastRefreshStart: time.Now().UTC(), + }, + }, + } + + ids := s.GetSourceIDsForRefresh() + if len(ids) != 2 { + t.Error("expected 2 ids") + } + for _, id := range ids { + if err := s.LockSource(id); err != nil { + t.Error(err) + } + + if id == "src3" { + t.Error("unexpected src3") + } + } + + nextIds := s.GetSourceIDsForRefresh() + if len(nextIds) != 0 { + t.Error("all concurrent refresh slots should be taken") + } + + for _, id := range ids { + if err := s.RefreshSuccess(id); err != nil { + t.Error(err) + } + } + + ids = s.GetSourceIDsForRefresh() + t.Log(ids) + t.Log(s.status["src1"]) + if len(ids) != 2 { + t.Error("expected 2 id") + } + + if ids[0] != "src3" { + t.Error("expected src3 to be least refreshed") + } } From deca20ad9f350fe5fc3908c088c78cb444cda259 Mon Sep 17 00:00:00 2001 From: Annika Hannig Date: Wed, 9 Feb 2022 12:02:22 +0100 Subject: [PATCH 3/4] use limited refresh parallelism --- pkg/store/neighbors_store.go | 2 +- pkg/store/routes_store.go | 2 +- pkg/store/sources_store.go | 14 +++++--------- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/pkg/store/neighbors_store.go b/pkg/store/neighbors_store.go index 440fe3a..1fcca67 100644 --- a/pkg/store/neighbors_store.go +++ b/pkg/store/neighbors_store.go @@ -188,7 +188,7 @@ func (s *NeighborsStore) safeUpdateSource(id string) { // sources last neighbor refresh is longer ago // than the configured refresh period. func (s *NeighborsStore) update() { - for _, id := range s.sources.GetSourceIDs() { + for _, id := range s.sources.GetSourceIDsForRefresh() { go s.safeUpdateSource(id) } } diff --git a/pkg/store/routes_store.go b/pkg/store/routes_store.go index 44c5845..b66cc41 100644 --- a/pkg/store/routes_store.go +++ b/pkg/store/routes_store.go @@ -100,7 +100,7 @@ func (s *RoutesStore) Start() { // refresh period. This is totally the same as the // NeighborsStore.update and maybe these functions can be merged (TODO) func (s *RoutesStore) update() { - for _, id := range s.sources.GetSourceIDs() { + for _, id := range s.sources.GetSourceIDsForRefresh() { go s.safeUpdateSource(id) } } diff --git a/pkg/store/sources_store.go b/pkg/store/sources_store.go index bef8572..74068cf 100644 --- a/pkg/store/sources_store.go +++ b/pkg/store/sources_store.go @@ -22,7 +22,7 @@ const ( // State is an enum of the above States type State int -// String() converts a state into a string +// String converts a state into a string func (s State) String() string { switch s { case StateInit: @@ -249,8 +249,6 @@ func (s *SourcesStore) GetSourceIDsForRefresh() []string { } } - log.Println(locked) - // Sort by refresh start time ascending sort.Sort(sources) @@ -298,8 +296,7 @@ func (s *SourcesStore) RefreshSuccess(sourceID string) error { } status.State = StateReady status.LastRefresh = time.Now().UTC() - status.LastRefreshDuration = time.Now().Sub( - status.lastRefreshStart) + status.LastRefreshDuration = time.Since(status.lastRefreshStart) status.LastError = nil status.Initialized = true // We now have data return nil @@ -308,7 +305,7 @@ func (s *SourcesStore) RefreshSuccess(sourceID string) error { // RefreshError indicates that the refresh has failed func (s *SourcesStore) RefreshError( sourceID string, - err interface{}, + sourceErr interface{}, ) { s.Lock() defer s.Unlock() @@ -319,8 +316,7 @@ func (s *SourcesStore) RefreshError( } status.State = StateError status.LastRefresh = time.Now().UTC() - status.LastRefreshDuration = time.Now().Sub( - status.lastRefreshStart) - status.LastError = err + status.LastRefreshDuration = time.Since(status.lastRefreshStart) + status.LastError = sourceErr return } From 58907407547057196e3b75f53e386b09babaf116 Mon Sep 17 00:00:00 2001 From: Annika Hannig Date: Wed, 9 Feb 2022 12:59:35 +0100 Subject: [PATCH 4/4] updated changelog --- CHANGELOG.md | 13 +++++++++++++ cmd/alice-lg/main.go | 2 +- pkg/store/routes_store.go | 2 +- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d42994..5d56824 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,16 +8,29 @@ with `neighbor.asn` (in case of java script errors). This also applies to the API. + In the config `neighbors_store_refresh_interval` needs to be updated. + * Parallel route / neighbor store refreshs: Route servers are not longer queried sequentially. A jitter is applied to not hit all servers exactly at once. + * Parallelism can be tuned through the config parameters: + [server] + + routes_store_refresh_parallelism = 5 + neighbors_store_refresh_parallelism = 10000 + + A value of 1 is a sequential refresh. + * Postgres store backend: Not keeping routes and neighbors in memory might reduce the memory footprint. * Support for alternative pipe in `multi_table` birdwatcher configurations. + + + ## 5.0.1 (2021-11-01) * Fixed parsing extended communities in openbgpd source causing a crash. diff --git a/cmd/alice-lg/main.go b/cmd/alice-lg/main.go index 9e27aeb..3f17fb6 100644 --- a/cmd/alice-lg/main.go +++ b/cmd/alice-lg/main.go @@ -42,8 +42,8 @@ func createAllocProfile(filename string) { } func startMemoryProfile(prefix string) { + t := 0 for { - t := 0 filename := fmt.Sprintf("%s-heap-%03d", prefix, t) runtime.GC() // get up-to-date statistics (according to docs) createHeapProfile(filename) diff --git a/pkg/store/routes_store.go b/pkg/store/routes_store.go index b66cc41..1104d48 100644 --- a/pkg/store/routes_store.go +++ b/pkg/store/routes_store.go @@ -66,7 +66,7 @@ func NewRoutesStore( if refreshInterval == 0 { refreshInterval = time.Duration(5) * time.Minute } - refreshParallelism := cfg.Server.NeighborsStoreRefreshParallelism + refreshParallelism := cfg.Server.RoutesStoreRefreshParallelism if refreshParallelism <= 0 { refreshParallelism = 1 }