1
0
mirror of https://github.com/netsampler/goflow2.git synced 2024-05-06 15:54:52 +00:00
2022-06-13 17:36:28 +02:00

108 lines
2.3 KiB
Go

package utils
import (
"bytes"
"time"
"github.com/netsampler/goflow2/decoders/netflowlegacy"
"github.com/netsampler/goflow2/format"
flowmessage "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/producer"
"github.com/netsampler/goflow2/transport"
"github.com/prometheus/client_golang/prometheus"
)
type StateNFLegacy struct {
stopper
Format format.FormatInterface
Transport transport.TransportInterface
Logger Logger
}
func (s *StateNFLegacy) DecodeFlow(msg interface{}) error {
pkt := msg.(BaseMessage)
buf := bytes.NewBuffer(pkt.Payload)
key := pkt.Src.String()
samplerAddress := pkt.Src
if samplerAddress.To4() != nil {
samplerAddress = samplerAddress.To4()
}
ts := uint64(time.Now().UTC().Unix())
if pkt.SetTime {
ts = uint64(pkt.RecvTime.UTC().Unix())
}
timeTrackStart := time.Now()
msgDec, err := netflowlegacy.DecodeMessage(buf)
if err != nil {
switch err.(type) {
case *netflowlegacy.ErrorVersion:
NetFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_version",
}).
Inc()
}
return err
}
switch msgDecConv := msgDec.(type) {
case netflowlegacy.PacketNetFlowV5:
NetFlowStats.With(
prometheus.Labels{
"router": key,
"version": "5",
}).
Inc()
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "5",
"type": "DataFlowSet",
}).
Add(float64(msgDecConv.Count))
}
var flowMessageSet []*flowmessage.FlowMessage
flowMessageSet, err = producer.ProcessMessageNetFlowLegacy(msgDec)
timeTrackStop := time.Now()
DecoderTime.With(
prometheus.Labels{
"name": "NetFlowV5",
}).
Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
fmsg.SamplerAddress = samplerAddress
if s.Format != nil {
key, data, err := s.Format.Format(fmsg)
if err != nil && s.Logger != nil {
s.Logger.Error(err)
}
if err == nil && s.Transport != nil {
err = s.Transport.Send(key, data)
if err != nil {
s.Logger.Error(err)
}
}
}
}
return nil
}
func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
if err := s.start(); err != nil {
return err
}
return UDPStoppableRoutine(s.stopCh, "NetFlowV5", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}