package utils import ( "bytes" "net" "time" "github.com/netsampler/goflow2/decoders/sflow" "github.com/netsampler/goflow2/format" flowmessage "github.com/netsampler/goflow2/pb" "github.com/netsampler/goflow2/producer" "github.com/netsampler/goflow2/transport" "github.com/prometheus/client_golang/prometheus" ) type StateSFlow struct { stopper Format format.FormatInterface Transport transport.TransportInterface Logger Logger Config *producer.ProducerConfig configMapped *producer.ProducerConfigMapped } func (s *StateSFlow) DecodeFlow(msg interface{}) error { pkt := msg.(BaseMessage) buf := bytes.NewBuffer(pkt.Payload) key := pkt.Src.String() ts := uint64(time.Now().UTC().Unix()) if pkt.SetTime { ts = uint64(pkt.RecvTime.UTC().Unix()) } timeTrackStart := time.Now() msgDec, err := sflow.DecodeMessage(buf) if err != nil { switch err.(type) { case *sflow.ErrorVersion: SFlowErrors.With( prometheus.Labels{ "router": key, "error": "error_version", }). Inc() case *sflow.ErrorIPVersion: SFlowErrors.With( prometheus.Labels{ "router": key, "error": "error_ip_version", }). Inc() case *sflow.ErrorDataFormat: SFlowErrors.With( prometheus.Labels{ "router": key, "error": "error_data_format", }). Inc() default: SFlowErrors.With( prometheus.Labels{ "router": key, "error": "error_decoding", }). Inc() } return err } switch msgDecConv := msgDec.(type) { case sflow.Packet: agentStr := net.IP(msgDecConv.AgentIP).String() SFlowStats.With( prometheus.Labels{ "router": key, "agent": agentStr, "version": "5", }). Inc() for _, samples := range msgDecConv.Samples { typeStr := "unknown" countRec := 0 switch samplesConv := samples.(type) { case sflow.FlowSample: typeStr = "FlowSample" countRec = len(samplesConv.Records) case sflow.CounterSample: typeStr = "CounterSample" if samplesConv.Header.Format == 4 { typeStr = "Expanded" + typeStr } countRec = len(samplesConv.Records) case sflow.ExpandedFlowSample: typeStr = "ExpandedFlowSample" countRec = len(samplesConv.Records) } SFlowSampleStatsSum.With( prometheus.Labels{ "router": key, "agent": agentStr, "version": "5", "type": typeStr, }). Inc() SFlowSampleRecordsStatsSum.With( prometheus.Labels{ "router": key, "agent": agentStr, "version": "5", "type": typeStr, }). Add(float64(countRec)) } } var flowMessageSet []*flowmessage.FlowMessage flowMessageSet, err = producer.ProcessMessageSFlowConfig(msgDec, s.configMapped) timeTrackStop := time.Now() DecoderTime.With( prometheus.Labels{ "name": "sFlow", }). Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000) for _, fmsg := range flowMessageSet { fmsg.TimeReceived = ts fmsg.TimeFlowStart = ts fmsg.TimeFlowEnd = ts if s.Format != nil { key, data, err := s.Format.Format(fmsg) if err != nil && s.Logger != nil { s.Logger.Error(err) } if err == nil && s.Transport != nil { err = s.Transport.Send(key, data) if err != nil { s.Logger.Error(err) } } } } return nil } func (s *StateSFlow) initConfig() { s.configMapped = producer.NewProducerConfigMapped(s.Config) } func (s *StateSFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error { if err := s.start(); err != nil { return err } s.initConfig() return UDPStoppableRoutine(s.stopCh, "sFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger) }