mirror of
https://github.com/netsampler/goflow2.git
synced 2024-05-06 15:54:52 +00:00
* Refactor template as module * Abstraction to allow custom template storage (eg: file, redis, http...) * Works similarly to Transport and Format
171 lines
3.6 KiB
Go
171 lines
3.6 KiB
Go
package utils
|
|
|
|
import (
|
|
"bytes"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/netsampler/goflow2/decoders/sflow"
|
|
"github.com/netsampler/goflow2/format"
|
|
flowmessage "github.com/netsampler/goflow2/pb"
|
|
"github.com/netsampler/goflow2/producer"
|
|
"github.com/netsampler/goflow2/transport"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
type StateSFlow struct {
|
|
stopper
|
|
|
|
Format format.FormatInterface
|
|
Transport transport.TransportInterface
|
|
Logger Logger
|
|
|
|
Config *producer.ProducerConfig
|
|
configMapped *producer.ProducerConfigMapped
|
|
}
|
|
|
|
func NewStateSFlow() *StateSFlow {
|
|
return &StateSFlow{}
|
|
}
|
|
|
|
func (s *StateSFlow) DecodeFlow(msg interface{}) error {
|
|
pkt := msg.(BaseMessage)
|
|
buf := bytes.NewBuffer(pkt.Payload)
|
|
key := pkt.Src.String()
|
|
|
|
ts := uint64(time.Now().UTC().Unix())
|
|
if pkt.SetTime {
|
|
ts = uint64(pkt.RecvTime.UTC().Unix())
|
|
}
|
|
|
|
timeTrackStart := time.Now()
|
|
msgDec, err := sflow.DecodeMessage(buf)
|
|
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case *sflow.ErrorVersion:
|
|
SFlowErrors.With(
|
|
prometheus.Labels{
|
|
"router": key,
|
|
"error": "error_version",
|
|
}).
|
|
Inc()
|
|
case *sflow.ErrorIPVersion:
|
|
SFlowErrors.With(
|
|
prometheus.Labels{
|
|
"router": key,
|
|
"error": "error_ip_version",
|
|
}).
|
|
Inc()
|
|
case *sflow.ErrorDataFormat:
|
|
SFlowErrors.With(
|
|
prometheus.Labels{
|
|
"router": key,
|
|
"error": "error_data_format",
|
|
}).
|
|
Inc()
|
|
default:
|
|
SFlowErrors.With(
|
|
prometheus.Labels{
|
|
"router": key,
|
|
"error": "error_decoding",
|
|
}).
|
|
Inc()
|
|
}
|
|
return err
|
|
}
|
|
|
|
switch msgDecConv := msgDec.(type) {
|
|
case sflow.Packet:
|
|
agentStr := net.IP(msgDecConv.AgentIP).String()
|
|
SFlowStats.With(
|
|
prometheus.Labels{
|
|
"router": key,
|
|
"agent": agentStr,
|
|
"version": "5",
|
|
}).
|
|
Inc()
|
|
|
|
for _, samples := range msgDecConv.Samples {
|
|
typeStr := "unknown"
|
|
countRec := 0
|
|
switch samplesConv := samples.(type) {
|
|
case sflow.FlowSample:
|
|
typeStr = "FlowSample"
|
|
countRec = len(samplesConv.Records)
|
|
case sflow.CounterSample:
|
|
typeStr = "CounterSample"
|
|
if samplesConv.Header.Format == 4 {
|
|
typeStr = "Expanded" + typeStr
|
|
}
|
|
countRec = len(samplesConv.Records)
|
|
case sflow.ExpandedFlowSample:
|
|
typeStr = "ExpandedFlowSample"
|
|
countRec = len(samplesConv.Records)
|
|
}
|
|
SFlowSampleStatsSum.With(
|
|
prometheus.Labels{
|
|
"router": key,
|
|
"agent": agentStr,
|
|
"version": "5",
|
|
"type": typeStr,
|
|
}).
|
|
Inc()
|
|
|
|
SFlowSampleRecordsStatsSum.With(
|
|
prometheus.Labels{
|
|
"router": key,
|
|
"agent": agentStr,
|
|
"version": "5",
|
|
"type": typeStr,
|
|
}).
|
|
Add(float64(countRec))
|
|
}
|
|
|
|
}
|
|
|
|
var flowMessageSet []*flowmessage.FlowMessage
|
|
flowMessageSet, err = producer.ProcessMessageSFlowConfig(msgDec, s.configMapped)
|
|
|
|
timeTrackStop := time.Now()
|
|
DecoderTime.With(
|
|
prometheus.Labels{
|
|
"name": "sFlow",
|
|
}).
|
|
Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000)
|
|
|
|
for _, fmsg := range flowMessageSet {
|
|
fmsg.TimeReceived = ts
|
|
fmsg.TimeFlowStart = ts
|
|
fmsg.TimeFlowEnd = ts
|
|
|
|
if s.Format != nil {
|
|
key, data, err := s.Format.Format(fmsg)
|
|
|
|
if err != nil && s.Logger != nil {
|
|
s.Logger.Error(err)
|
|
}
|
|
if err == nil && s.Transport != nil {
|
|
err = s.Transport.Send(key, data)
|
|
if err != nil {
|
|
s.Logger.Error(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *StateSFlow) initConfig() {
|
|
s.configMapped = producer.NewProducerConfigMapped(s.Config)
|
|
}
|
|
|
|
func (s *StateSFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
|
|
if err := s.start(); err != nil {
|
|
return err
|
|
}
|
|
s.initConfig()
|
|
return UDPStoppableRoutine(s.stopCh, "sFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
|
|
}
|