1
0
mirror of https://github.com/netsampler/goflow2.git synced 2024-05-06 15:54:52 +00:00
Louis c4c4ffb4e4 Refactor template as module (#49)
* Refactor template as module
* Abstraction to allow custom template storage (eg: file, redis, http...)
* Works similarly to Transport and Format
2023-04-15 20:52:32 -07:00

171 lines
3.6 KiB
Go

package utils
import (
"bytes"
"net"
"time"
"github.com/netsampler/goflow2/decoders/sflow"
"github.com/netsampler/goflow2/format"
flowmessage "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/producer"
"github.com/netsampler/goflow2/transport"
"github.com/prometheus/client_golang/prometheus"
)
type StateSFlow struct {
stopper
Format format.FormatInterface
Transport transport.TransportInterface
Logger Logger
Config *producer.ProducerConfig
configMapped *producer.ProducerConfigMapped
}
func NewStateSFlow() *StateSFlow {
return &StateSFlow{}
}
func (s *StateSFlow) DecodeFlow(msg interface{}) error {
pkt := msg.(BaseMessage)
buf := bytes.NewBuffer(pkt.Payload)
key := pkt.Src.String()
ts := uint64(time.Now().UTC().Unix())
if pkt.SetTime {
ts = uint64(pkt.RecvTime.UTC().Unix())
}
timeTrackStart := time.Now()
msgDec, err := sflow.DecodeMessage(buf)
if err != nil {
switch err.(type) {
case *sflow.ErrorVersion:
SFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_version",
}).
Inc()
case *sflow.ErrorIPVersion:
SFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_ip_version",
}).
Inc()
case *sflow.ErrorDataFormat:
SFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_data_format",
}).
Inc()
default:
SFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_decoding",
}).
Inc()
}
return err
}
switch msgDecConv := msgDec.(type) {
case sflow.Packet:
agentStr := net.IP(msgDecConv.AgentIP).String()
SFlowStats.With(
prometheus.Labels{
"router": key,
"agent": agentStr,
"version": "5",
}).
Inc()
for _, samples := range msgDecConv.Samples {
typeStr := "unknown"
countRec := 0
switch samplesConv := samples.(type) {
case sflow.FlowSample:
typeStr = "FlowSample"
countRec = len(samplesConv.Records)
case sflow.CounterSample:
typeStr = "CounterSample"
if samplesConv.Header.Format == 4 {
typeStr = "Expanded" + typeStr
}
countRec = len(samplesConv.Records)
case sflow.ExpandedFlowSample:
typeStr = "ExpandedFlowSample"
countRec = len(samplesConv.Records)
}
SFlowSampleStatsSum.With(
prometheus.Labels{
"router": key,
"agent": agentStr,
"version": "5",
"type": typeStr,
}).
Inc()
SFlowSampleRecordsStatsSum.With(
prometheus.Labels{
"router": key,
"agent": agentStr,
"version": "5",
"type": typeStr,
}).
Add(float64(countRec))
}
}
var flowMessageSet []*flowmessage.FlowMessage
flowMessageSet, err = producer.ProcessMessageSFlowConfig(msgDec, s.configMapped)
timeTrackStop := time.Now()
DecoderTime.With(
prometheus.Labels{
"name": "sFlow",
}).
Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
fmsg.TimeFlowStart = ts
fmsg.TimeFlowEnd = ts
if s.Format != nil {
key, data, err := s.Format.Format(fmsg)
if err != nil && s.Logger != nil {
s.Logger.Error(err)
}
if err == nil && s.Transport != nil {
err = s.Transport.Send(key, data)
if err != nil {
s.Logger.Error(err)
}
}
}
}
return nil
}
func (s *StateSFlow) initConfig() {
s.configMapped = producer.NewProducerConfigMapped(s.Config)
}
func (s *StateSFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
if err := s.start(); err != nil {
return err
}
s.initConfig()
return UDPStoppableRoutine(s.stopCh, "sFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}