1
0
mirror of https://github.com/netsampler/goflow2.git synced 2024-05-06 15:54:52 +00:00
Louis c4c4ffb4e4 Refactor template as module (#49)
* Refactor template as module
* Abstraction to allow custom template storage (eg: file, redis, http...)
* Works similarly to Transport and Format
2023-04-15 20:52:32 -07:00

112 lines
2.4 KiB
Go

package utils
import (
"bytes"
"time"
"github.com/netsampler/goflow2/decoders/netflowlegacy"
"github.com/netsampler/goflow2/format"
flowmessage "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/producer"
"github.com/netsampler/goflow2/transport"
"github.com/prometheus/client_golang/prometheus"
)
type StateNFLegacy struct {
stopper
Format format.FormatInterface
Transport transport.TransportInterface
Logger Logger
}
func NewStateNFLegacy() *StateNFLegacy {
return &StateNFLegacy{}
}
func (s *StateNFLegacy) DecodeFlow(msg interface{}) error {
pkt := msg.(BaseMessage)
buf := bytes.NewBuffer(pkt.Payload)
key := pkt.Src.String()
samplerAddress := pkt.Src
if samplerAddress.To4() != nil {
samplerAddress = samplerAddress.To4()
}
ts := uint64(time.Now().UTC().Unix())
if pkt.SetTime {
ts = uint64(pkt.RecvTime.UTC().Unix())
}
timeTrackStart := time.Now()
msgDec, err := netflowlegacy.DecodeMessage(buf)
if err != nil {
switch err.(type) {
case *netflowlegacy.ErrorVersion:
NetFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_version",
}).
Inc()
}
return err
}
switch msgDecConv := msgDec.(type) {
case netflowlegacy.PacketNetFlowV5:
NetFlowStats.With(
prometheus.Labels{
"router": key,
"version": "5",
}).
Inc()
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "5",
"type": "DataFlowSet",
}).
Add(float64(msgDecConv.Count))
}
var flowMessageSet []*flowmessage.FlowMessage
flowMessageSet, err = producer.ProcessMessageNetFlowLegacy(msgDec)
timeTrackStop := time.Now()
DecoderTime.With(
prometheus.Labels{
"name": "NetFlowV5",
}).
Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
fmsg.SamplerAddress = samplerAddress
if s.Format != nil {
key, data, err := s.Format.Format(fmsg)
if err != nil && s.Logger != nil {
s.Logger.Error(err)
}
if err == nil && s.Transport != nil {
err = s.Transport.Send(key, data)
if err != nil {
s.Logger.Error(err)
}
}
}
}
return nil
}
func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
if err := s.start(); err != nil {
return err
}
return UDPStoppableRoutine(s.stopCh, "NetFlowV5", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}