mirror of
https://github.com/netsampler/goflow2.git
synced 2024-05-06 15:54:52 +00:00
* Refactor template as module * Abstraction to allow custom template storage (eg: file, redis, http...) * Works similarly to Transport and Format
112 lines
2.4 KiB
Go
112 lines
2.4 KiB
Go
package utils
|
|
|
|
import (
|
|
"bytes"
|
|
"time"
|
|
|
|
"github.com/netsampler/goflow2/decoders/netflowlegacy"
|
|
"github.com/netsampler/goflow2/format"
|
|
flowmessage "github.com/netsampler/goflow2/pb"
|
|
"github.com/netsampler/goflow2/producer"
|
|
"github.com/netsampler/goflow2/transport"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
type StateNFLegacy struct {
|
|
stopper
|
|
|
|
Format format.FormatInterface
|
|
Transport transport.TransportInterface
|
|
Logger Logger
|
|
}
|
|
|
|
func NewStateNFLegacy() *StateNFLegacy {
|
|
return &StateNFLegacy{}
|
|
}
|
|
|
|
func (s *StateNFLegacy) DecodeFlow(msg interface{}) error {
|
|
pkt := msg.(BaseMessage)
|
|
buf := bytes.NewBuffer(pkt.Payload)
|
|
key := pkt.Src.String()
|
|
samplerAddress := pkt.Src
|
|
if samplerAddress.To4() != nil {
|
|
samplerAddress = samplerAddress.To4()
|
|
}
|
|
|
|
ts := uint64(time.Now().UTC().Unix())
|
|
if pkt.SetTime {
|
|
ts = uint64(pkt.RecvTime.UTC().Unix())
|
|
}
|
|
|
|
timeTrackStart := time.Now()
|
|
msgDec, err := netflowlegacy.DecodeMessage(buf)
|
|
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case *netflowlegacy.ErrorVersion:
|
|
NetFlowErrors.With(
|
|
prometheus.Labels{
|
|
"router": key,
|
|
"error": "error_version",
|
|
}).
|
|
Inc()
|
|
}
|
|
return err
|
|
}
|
|
|
|
switch msgDecConv := msgDec.(type) {
|
|
case netflowlegacy.PacketNetFlowV5:
|
|
NetFlowStats.With(
|
|
prometheus.Labels{
|
|
"router": key,
|
|
"version": "5",
|
|
}).
|
|
Inc()
|
|
NetFlowSetStatsSum.With(
|
|
prometheus.Labels{
|
|
"router": key,
|
|
"version": "5",
|
|
"type": "DataFlowSet",
|
|
}).
|
|
Add(float64(msgDecConv.Count))
|
|
}
|
|
|
|
var flowMessageSet []*flowmessage.FlowMessage
|
|
flowMessageSet, err = producer.ProcessMessageNetFlowLegacy(msgDec)
|
|
|
|
timeTrackStop := time.Now()
|
|
DecoderTime.With(
|
|
prometheus.Labels{
|
|
"name": "NetFlowV5",
|
|
}).
|
|
Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000)
|
|
|
|
for _, fmsg := range flowMessageSet {
|
|
fmsg.TimeReceived = ts
|
|
fmsg.SamplerAddress = samplerAddress
|
|
|
|
if s.Format != nil {
|
|
key, data, err := s.Format.Format(fmsg)
|
|
|
|
if err != nil && s.Logger != nil {
|
|
s.Logger.Error(err)
|
|
}
|
|
if err == nil && s.Transport != nil {
|
|
err = s.Transport.Send(key, data)
|
|
if err != nil {
|
|
s.Logger.Error(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
|
|
if err := s.start(); err != nil {
|
|
return err
|
|
}
|
|
return UDPStoppableRoutine(s.stopCh, "NetFlowV5", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
|
|
}
|