mirror of
https://github.com/bgp/stayrtr.git
synced 2024-05-06 15:54:54 +00:00
330 lines
7.4 KiB
Go
330 lines
7.4 KiB
Go
package cache
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
rtr "github.com/bgp/stayrtr/lib"
|
|
"github.com/bgp/stayrtr/metrics"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type VRPCache struct {
|
|
lastdata *VRPList
|
|
lasthash []byte
|
|
lastchange time.Time
|
|
lastts time.Time
|
|
sendNotifs bool
|
|
useSerial int
|
|
cacheBin string
|
|
|
|
FetchConfig *FetchConfig
|
|
|
|
Server *rtr.Server
|
|
|
|
metricsEvent *metrics.MetricsEvent // this is busted. Fix me.
|
|
|
|
exported VRPList
|
|
mu *sync.RWMutex
|
|
|
|
slurm *SlurmConfig
|
|
|
|
checktime bool
|
|
}
|
|
|
|
func NewVRPCache(server *rtr.Server, checktime bool, sendNotifs bool) *VRPCache {
|
|
c := &VRPCache{
|
|
Server: server,
|
|
FetchConfig: NewFetchConfig(),
|
|
lastdata: &VRPList{},
|
|
sendNotifs: sendNotifs,
|
|
checktime: checktime,
|
|
}
|
|
return c
|
|
}
|
|
|
|
// newSHA256 will return the sha256 sum of the byte slice
|
|
// The return will be converted form a [32]byte to []byte
|
|
func newSHA256(data []byte) []byte {
|
|
hash := sha256.Sum256(data)
|
|
return hash[:]
|
|
}
|
|
|
|
func decodeJSON(data []byte) (*VRPList, error) {
|
|
buf := bytes.NewBuffer(data)
|
|
dec := json.NewDecoder(buf)
|
|
|
|
var vrplistjson VRPList
|
|
err := dec.Decode(&vrplistjson)
|
|
return &vrplistjson, err
|
|
}
|
|
|
|
func isValidPrefixLength(prefix *net.IPNet, maxLength uint8) bool {
|
|
plen, max := net.IPMask.Size(prefix.Mask)
|
|
|
|
if plen == 0 || uint8(plen) > maxLength || maxLength > uint8(max) {
|
|
log.Errorf("%s Maxlength wrong: %d - %d", prefix, plen, maxLength)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// processData will take a slice of prefix.VRPJson and attempt to convert them to a slice of rtr.VRP.
|
|
// Will check the following:
|
|
// 1 - The prefix is a valid prefix
|
|
// 2 - The ASN is a valid ASN
|
|
// 3 - The MaxLength is valid
|
|
// Will return a deduped slice, as well as total VRPs, IPv4 VRPs, and IPv6 VRPs
|
|
func processData(vrplistjson []VRPJson) ([]rtr.VRP, int, int, int) {
|
|
filterDuplicates := make(map[string]bool)
|
|
|
|
var vrplist []rtr.VRP
|
|
var countv4 int
|
|
var countv6 int
|
|
|
|
for _, v := range vrplistjson {
|
|
prefix, err := v.GetPrefix2()
|
|
if err != nil {
|
|
log.Error(err)
|
|
continue
|
|
}
|
|
asn, err := v.GetASN2()
|
|
if err != nil {
|
|
log.Error(err)
|
|
continue
|
|
}
|
|
|
|
if !isValidPrefixLength(prefix, v.Length) {
|
|
continue
|
|
}
|
|
|
|
if prefix.IP.To4() != nil {
|
|
countv4++
|
|
} else {
|
|
countv6++
|
|
}
|
|
|
|
key := fmt.Sprintf("%s,%d,%d", prefix, asn, v.Length)
|
|
_, exists := filterDuplicates[key]
|
|
if exists {
|
|
continue
|
|
}
|
|
filterDuplicates[key] = true
|
|
|
|
vrp := rtr.VRP{
|
|
Prefix: *prefix,
|
|
ASN: asn,
|
|
MaxLen: v.Length,
|
|
}
|
|
vrplist = append(vrplist, vrp)
|
|
}
|
|
return vrplist, countv4 + countv6, countv4, countv6
|
|
}
|
|
|
|
type IdenticalFile struct {
|
|
File string
|
|
}
|
|
|
|
func (e IdenticalFile) Error() string {
|
|
return fmt.Sprintf("File %s is identical to the previous version", e.File)
|
|
}
|
|
|
|
func (c *VRPCache) Exporter(wr http.ResponseWriter, r *http.Request) {
|
|
c.mu.RLock()
|
|
toExport := c.exported
|
|
c.mu.RUnlock()
|
|
enc := json.NewEncoder(wr)
|
|
enc.Encode(toExport)
|
|
}
|
|
|
|
// Update the state based on the current slurm file and data.
|
|
func (c *VRPCache) UpdateFromNewState() error {
|
|
sessid := c.Server.GetSessionId()
|
|
|
|
if c.checktime {
|
|
buildtime, err := time.Parse(time.RFC3339, c.lastdata.Metadata.Buildtime)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
notafter := buildtime.Add(time.Hour * 24)
|
|
if time.Now().UTC().After(notafter) {
|
|
return errors.New(fmt.Sprintf("VRP JSON file is older than 24 hours: %v", buildtime))
|
|
}
|
|
}
|
|
|
|
vrpsjson := c.lastdata.Data
|
|
if c.slurm != nil {
|
|
kept, removed := c.slurm.FilterOnVRPs(vrpsjson)
|
|
asserted := c.slurm.AssertVRPs()
|
|
log.Infof("Slurm filtering: %v kept, %v removed, %v asserted", len(kept), len(removed), len(asserted))
|
|
vrpsjson = append(kept, asserted...)
|
|
}
|
|
|
|
vrps, count, countv4, countv6 := processData(vrpsjson)
|
|
|
|
log.Infof("New update (%v uniques, %v total prefixes).", len(vrps), count)
|
|
|
|
c.Server.AddVRPs(vrps)
|
|
|
|
serial, _ := c.Server.GetCurrentSerial(sessid)
|
|
log.Infof("Updated added, new serial %v", serial)
|
|
if c.sendNotifs {
|
|
log.Debugf("Sending notifications to clients")
|
|
c.Server.NotifyClientsLatest()
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.exported = VRPList{
|
|
Metadata: MetaData{
|
|
Counts: len(vrpsjson),
|
|
Buildtime: c.lastdata.Metadata.Buildtime,
|
|
},
|
|
Data: vrpsjson,
|
|
}
|
|
|
|
c.mu.Unlock()
|
|
|
|
if c.metricsEvent != nil {
|
|
var countv4_dup int
|
|
var countv6_dup int
|
|
for _, vrp := range vrps {
|
|
if vrp.Prefix.IP.To4() != nil {
|
|
countv4_dup++
|
|
} else if vrp.Prefix.IP.To16() != nil {
|
|
countv6_dup++
|
|
}
|
|
}
|
|
c.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, c.lastchange, c.lastts, c.cacheBin)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *VRPCache) UpdateFile(file string) (bool, error) {
|
|
log.Debugf("Refreshing cache from %s", file)
|
|
|
|
c.lastts = time.Now().UTC()
|
|
data, code, lastrefresh, err := c.FetchConfig.FetchFile(file)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if lastrefresh {
|
|
metrics.LastRefresh.WithLabelValues(file).Set(float64(c.lastts.UnixNano() / 1e9))
|
|
}
|
|
if code != -1 {
|
|
metrics.RefreshStatusCode.WithLabelValues(file, fmt.Sprintf("%d", code)).Inc()
|
|
}
|
|
|
|
hsum := newSHA256(data)
|
|
if c.lasthash != nil {
|
|
cres := bytes.Compare(c.lasthash, hsum)
|
|
if cres == 0 {
|
|
return false, IdenticalFile{File: file}
|
|
}
|
|
}
|
|
|
|
log.Infof("new cache file: Updating sha256 hash %x -> %x", c.lasthash, hsum)
|
|
|
|
vrplistjson, err := decodeJSON(data)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
c.lasthash = hsum
|
|
c.lastchange = time.Now().UTC()
|
|
c.lastdata = vrplistjson
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (c *VRPCache) UpdateSlurm(file string) (bool, error) {
|
|
log.Debugf("Refreshing slurm from %v", file)
|
|
data, code, lastrefresh, err := c.FetchConfig.FetchFile(file)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if lastrefresh {
|
|
metrics.LastRefresh.WithLabelValues(file).Set(float64(c.lastts.UnixNano() / 1e9))
|
|
}
|
|
if code != -1 {
|
|
metrics.RefreshStatusCode.WithLabelValues(file, fmt.Sprintf("%d", code)).Inc()
|
|
}
|
|
|
|
buf := bytes.NewBuffer(data)
|
|
|
|
slurm, err := DecodeJSONSlurm(buf)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
c.slurm = slurm
|
|
return true, nil
|
|
}
|
|
|
|
func (c *VRPCache) RoutineUpdate(file string, interval int, slurmFile string) {
|
|
log.Debugf("Starting refresh routine (file: %v, interval: %vs, slurm: %v)", file, interval, slurmFile)
|
|
signals := make(chan os.Signal, 1)
|
|
signal.Notify(signals, syscall.SIGHUP)
|
|
for {
|
|
var delay *time.Timer
|
|
if c.lastchange.IsZero() {
|
|
log.Warn("Initial sync not complete. Refreshing every 30 seconds")
|
|
delay = time.NewTimer(time.Duration(30) * time.Second)
|
|
} else {
|
|
delay = time.NewTimer(time.Duration(interval) * time.Second)
|
|
}
|
|
select {
|
|
case <-delay.C:
|
|
case <-signals:
|
|
log.Debug("Received HUP signal")
|
|
}
|
|
delay.Stop()
|
|
slurmNotPresentOrUpdated := false
|
|
if slurmFile != "" {
|
|
var err error
|
|
slurmNotPresentOrUpdated, err = c.UpdateSlurm(slurmFile)
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case HttpNotModified:
|
|
log.Info(err)
|
|
case IdenticalEtag:
|
|
log.Info(err)
|
|
default:
|
|
log.Errorf("Slurm: %v", err)
|
|
}
|
|
}
|
|
}
|
|
cacheUpdated, err := c.UpdateFile(file)
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case HttpNotModified:
|
|
log.Info(err)
|
|
case IdenticalEtag:
|
|
log.Info(err)
|
|
case IdenticalFile:
|
|
log.Info(err)
|
|
default:
|
|
log.Errorf("Error updating: %v", err)
|
|
}
|
|
}
|
|
|
|
// Only process the first time after there is either a cache or SLURM
|
|
// update.
|
|
if cacheUpdated || slurmNotPresentOrUpdated {
|
|
err := c.UpdateFromNewState()
|
|
if err != nil {
|
|
log.Errorf("Error updating from new state: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|