1
0
mirror of https://github.com/alice-lg/alice-lg.git synced 2024-05-11 05:55:03 +00:00

Merge branch 'feature/limit-refresh-paralellism' into develop

This commit is contained in:
Annika Hannig
2022-02-09 13:16:41 +01:00
9 changed files with 243 additions and 38 deletions

View File

@ -8,16 +8,29 @@
with `neighbor.asn` (in case of java script errors).
This also applies to the API.
In the config `neighbors_store_refresh_interval` needs to be updated.
* Parallel route / neighbor store refreshs: Route servers are not
longer queried sequentially. A jitter is applied to not hit all
servers exactly at once.
* Parallelism can be tuned through the config parameters:
[server]
routes_store_refresh_parallelism = 5
neighbors_store_refresh_parallelism = 10000
A value of 1 is a sequential refresh.
* Postgres store backend: Not keeping routes and neighbors in
memory might reduce the memory footprint.
* Support for alternative pipe in `multi_table` birdwatcher
configurations.
## 5.0.1 (2021-11-01)
* Fixed parsing extended communities in openbgpd source causing a crash.

View File

@ -25,7 +25,6 @@ func createHeapProfile(filename string) {
log.Fatal("could not create memory profile: ", err)
}
defer f.Close() // error handling omitted for example
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
@ -37,16 +36,16 @@ func createAllocProfile(filename string) {
log.Fatal("could not create alloc profile: ", err)
}
defer f.Close() // error handling omitted for example
runtime.GC() // get up-to-date statistics
if err := pprof.Lookup("allocs").WriteTo(f, 0); err != nil {
log.Fatal("could not write alloc profile: ", err)
}
}
func startMemoryProfile(prefix string) {
t := 0
for {
t := 0
filename := fmt.Sprintf("%s-heap-%03d", prefix, t)
runtime.GC() // get up-to-date statistics (according to docs)
createHeapProfile(filename)
log.Println("wrote memory heap profile:", filename)
filename = fmt.Sprintf("%s-allocs-%03d", prefix, t)

View File

@ -7,15 +7,31 @@
listen_http = 127.0.0.1:7340
# configures the built-in webserver timeout in seconds (default 120s)
# http_timeout = 60
# enable the prefix-lookup endpoint / the global search feature
enable_prefix_lookup = true
# Try to refresh the neighbor status on every request to /neighbors
enable_neighbors_status_refresh = false
asn = 9033
# this ASN is used as a fallback value in the RPKI feature and for route
# filtering evaluation with large BGP communities
asn = 9033
store_backend = postgres
# how many route servers will be refreshed at the same time
# if set to 0 (or for the matter of fact 1), refresh will be
# sequential.
# Default: 1
routes_store_refresh_parallelism = 5
neighbors_store_refresh_parallelism = 10000
# how much time should pass between refreshes (in minutes)
# Default: 5
routes_store_refresh_interval = 5
neighbors_store_refresh_interval = 5
[postgres]
url = "postgres://postgres:postgres@localhost:5432/alice"
min_connections = 2

View File

@ -73,14 +73,16 @@ const (
// A ServerConfig holds the runtime configuration
// for the backend.
type ServerConfig struct {
Listen string `ini:"listen_http"`
HTTPTimeout int `ini:"http_timeout"`
EnablePrefixLookup bool `ini:"enable_prefix_lookup"`
NeighborsStoreRefreshInterval int `ini:"neighbours_store_refresh_interval"`
RoutesStoreRefreshInterval int `ini:"routes_store_refresh_interval"`
StoreBackend string `ini:"store_backend"`
Asn int `ini:"asn"`
EnableNeighborsStatusRefresh bool `ini:"enable_neighbors_status_refresh"`
Listen string `ini:"listen_http"`
HTTPTimeout int `ini:"http_timeout"`
EnablePrefixLookup bool `ini:"enable_prefix_lookup"`
NeighborsStoreRefreshInterval int `ini:"neighbors_store_refresh_interval"`
NeighborsStoreRefreshParallelism int `ini:"neighbors_store_refresh_parallelism"`
RoutesStoreRefreshInterval int `ini:"routes_store_refresh_interval"`
RoutesStoreRefreshParallelism int `ini:"routes_store_refresh_parallelism"`
StoreBackend string `ini:"store_backend"`
Asn int `ini:"asn"`
EnableNeighborsStatusRefresh bool `ini:"enable_neighbors_status_refresh"`
}
// PostgresConfig is the configuration for the database
@ -840,8 +842,10 @@ func LoadConfig(file string) (*Config, error) {
// Map sections
server := ServerConfig{
HTTPTimeout: DefaultHTTPTimeout,
StoreBackend: "memory",
HTTPTimeout: DefaultHTTPTimeout,
StoreBackend: "memory",
RoutesStoreRefreshParallelism: 1,
NeighborsStoreRefreshParallelism: 1,
}
if err := parsedConfig.Section("server").MapTo(&server); err != nil {
return nil, err

View File

@ -13,6 +13,18 @@ asn = 9033
# this ASN is used as a fallback value in the RPKI feature and for route
# filtering evaluation with large BGP communities
# how many route servers will be refreshed at the same time
# if set to 0 (or for the matter of fact 1), refresh will be
# sequential.
# Default: 1
routes_store_refresh_parallelism = 5
neighbors_store_refresh_parallelism = 10000
# how much time should pass between refreshes (in minutes)
# Default: 5
routes_store_refresh_interval = 5
neighbors_store_refresh_interval = 5
store_backend = postgres
[postgres]

View File

@ -70,11 +70,16 @@ func NewNeighborsStore(
if refreshInterval == 0 {
refreshInterval = time.Duration(5) * time.Minute
}
refreshParallelism := cfg.Server.NeighborsStoreRefreshParallelism
if refreshParallelism <= 0 {
refreshParallelism = 1
}
log.Println("Neighbors Store refresh interval set to:", refreshInterval)
log.Println("Neighbors refresh interval set to:", refreshInterval)
log.Println("Neighbors refresh parallelism:", refreshParallelism)
// Store refresh information per store
sources := NewSourcesStore(cfg, refreshInterval)
sources := NewSourcesStore(cfg, refreshInterval, refreshParallelism)
// Neighbors will be refreshed on every GetNeighborsAt
// invocation. Why? I (Annika) don't know. I have to ask Patrick.
@ -183,7 +188,7 @@ func (s *NeighborsStore) safeUpdateSource(id string) {
// sources last neighbor refresh is longer ago
// than the configured refresh period.
func (s *NeighborsStore) update() {
for _, id := range s.sources.GetSourceIDs() {
for _, id := range s.sources.GetSourceIDsForRefresh() {
go s.safeUpdateSource(id)
}
}

View File

@ -66,11 +66,16 @@ func NewRoutesStore(
if refreshInterval == 0 {
refreshInterval = time.Duration(5) * time.Minute
}
refreshParallelism := cfg.Server.RoutesStoreRefreshParallelism
if refreshParallelism <= 0 {
refreshParallelism = 1
}
log.Println("Routes refresh interval set to:", refreshInterval)
log.Println("Routes refresh parallelism:", refreshParallelism)
// Store refresh information per store
sources := NewSourcesStore(cfg, refreshInterval)
sources := NewSourcesStore(cfg, refreshInterval, refreshParallelism)
store := &RoutesStore{
backend: backend,
sources: sources,
@ -95,7 +100,7 @@ func (s *RoutesStore) Start() {
// refresh period. This is totally the same as the
// NeighborsStore.update and maybe these functions can be merged (TODO)
func (s *RoutesStore) update() {
for _, id := range s.sources.GetSourceIDs() {
for _, id := range s.sources.GetSourceIDsForRefresh() {
go s.safeUpdateSource(id)
}
}

View File

@ -3,6 +3,7 @@ package store
import (
"context"
"log"
"sort"
"sync"
"time"
@ -21,7 +22,7 @@ const (
// State is an enum of the above States
type State int
// String() converts a state into a string
// String converts a state into a string
func (s State) String() string {
switch s {
case StateInit:
@ -36,24 +37,45 @@ func (s State) String() string {
return "INVALID"
}
// Status defines a status the store can be in
// Status defines a status the store can be in.
type Status struct {
RefreshInterval time.Duration
RefreshParallelism int
LastRefresh time.Time
LastRefreshDuration time.Duration
LastError interface{}
State State
Initialized bool
SourceID string
lastRefreshStart time.Time
}
// SourceStatusList is a sortable list of source status
type SourceStatusList []*Status
// Len implements the sort interface
func (l SourceStatusList) Len() int {
return len(l)
}
// Less implements the sort interface
func (l SourceStatusList) Less(i, j int) bool {
return l[i].lastRefreshStart.Before(l[j].lastRefreshStart)
}
// Swap implements the sort interface
func (l SourceStatusList) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
}
// SourcesStore provides methods for retrieving
// the current status of a source.
type SourcesStore struct {
refreshInterval time.Duration
status map[string]*Status
sources map[string]*config.SourceConfig
refreshInterval time.Duration
refreshParallelism int
status map[string]*Status
sources map[string]*config.SourceConfig
sync.Mutex
}
@ -61,6 +83,7 @@ type SourcesStore struct {
func NewSourcesStore(
cfg *config.Config,
refreshInterval time.Duration,
refreshParallelism int,
) *SourcesStore {
status := make(map[string]*Status)
sources := make(map[string]*config.SourceConfig)
@ -71,13 +94,15 @@ func NewSourcesStore(
sources[sourceID] = src
status[sourceID] = &Status{
RefreshInterval: refreshInterval,
SourceID: sourceID,
}
}
return &SourcesStore{
status: status,
sources: sources,
refreshInterval: refreshInterval,
status: status,
sources: sources,
refreshInterval: refreshInterval,
refreshParallelism: refreshParallelism,
}
}
@ -111,8 +136,6 @@ func (s *SourcesStore) IsInitialized(sourceID string) (bool, error) {
}
// NextRefresh calculates the next refresh time
// TODO: I doubt the usefulness of these numbers.
//
func (s *SourcesStore) NextRefresh(
ctx context.Context,
) time.Time {
@ -209,6 +232,43 @@ func (s *SourcesStore) GetSourceIDs() []string {
return ids
}
// GetSourceIDsForRefresh will retrieve a list of source IDs,
// which are currently not locked, sorted by least refreshed.
// The number of sources returned is limited through the
// refresh parallelism parameter.
func (s *SourcesStore) GetSourceIDsForRefresh() []string {
s.Lock()
defer s.Unlock()
locked := 0
sources := make(SourceStatusList, 0, len(s.status))
for _, status := range s.status {
sources = append(sources, status)
if status.State == StateBusy {
locked++
}
}
// Sort by refresh start time ascending
sort.Sort(sources)
slots := s.refreshParallelism - locked
if slots <= 0 {
slots = 0
}
ids := make([]string, 0, slots)
i := 0
for _, status := range sources {
if i >= slots {
break
}
ids = append(ids, status.SourceID)
i++
}
return ids
}
// LockSource indicates the start of a refresh
func (s *SourcesStore) LockSource(sourceID string) error {
s.Lock()
@ -236,8 +296,7 @@ func (s *SourcesStore) RefreshSuccess(sourceID string) error {
}
status.State = StateReady
status.LastRefresh = time.Now().UTC()
status.LastRefreshDuration = time.Now().Sub(
status.lastRefreshStart)
status.LastRefreshDuration = time.Since(status.lastRefreshStart)
status.LastError = nil
status.Initialized = true // We now have data
return nil
@ -246,7 +305,7 @@ func (s *SourcesStore) RefreshSuccess(sourceID string) error {
// RefreshError indicates that the refresh has failed
func (s *SourcesStore) RefreshError(
sourceID string,
err interface{},
sourceErr interface{},
) {
s.Lock()
defer s.Unlock()
@ -257,8 +316,7 @@ func (s *SourcesStore) RefreshError(
}
status.State = StateError
status.LastRefresh = time.Now().UTC()
status.LastRefreshDuration = time.Now().Sub(
status.lastRefreshStart)
status.LastError = err
status.LastRefreshDuration = time.Since(status.lastRefreshStart)
status.LastError = sourceErr
return
}

View File

@ -1,11 +1,104 @@
package store
import "testing"
import (
"testing"
"time"
)
func TestAddSource(t *testing.T) {
func TestGetSourceIDsForRefreshSequential(t *testing.T) {
s := &SourcesStore{
refreshParallelism: 1,
status: map[string]*Status{
"src1": {
SourceID: "src1",
},
"src2": {
SourceID: "src2",
},
},
}
ids := s.GetSourceIDsForRefresh()
if len(ids) != 1 {
t.Error("expected 1 id")
}
if err := s.LockSource(ids[0]); err != nil {
t.Error(err)
}
lastID := ids[0]
ids = s.GetSourceIDsForRefresh()
if len(ids) != 0 {
t.Error("all concurrent refresh slots should be taken")
}
if err := s.RefreshSuccess(lastID); err != nil {
t.Error(err)
}
ids = s.GetSourceIDsForRefresh()
if len(ids) != 1 {
t.Error("expected 1 id")
}
if ids[0] == lastID {
t.Error("the next source should have been returned")
}
}
func TestGetStatus(t *testing.T) {
func TestGetSourceIDsForRefreshParallel(t *testing.T) {
s := &SourcesStore{
refreshParallelism: 2,
status: map[string]*Status{
"src1": {
SourceID: "src1",
},
"src2": {
SourceID: "src2",
},
"src3": {
SourceID: "src3",
lastRefreshStart: time.Now().UTC(),
},
},
}
ids := s.GetSourceIDsForRefresh()
if len(ids) != 2 {
t.Error("expected 2 ids")
}
for _, id := range ids {
if err := s.LockSource(id); err != nil {
t.Error(err)
}
if id == "src3" {
t.Error("unexpected src3")
}
}
nextIds := s.GetSourceIDsForRefresh()
if len(nextIds) != 0 {
t.Error("all concurrent refresh slots should be taken")
}
for _, id := range ids {
if err := s.RefreshSuccess(id); err != nil {
t.Error(err)
}
}
ids = s.GetSourceIDsForRefresh()
t.Log(ids)
t.Log(s.status["src1"])
if len(ids) != 2 {
t.Error("expected 2 id")
}
if ids[0] != "src3" {
t.Error("expected src3 to be least refreshed")
}
}