From b047aafe15b905c3f93614a62769895faa2ebe0a Mon Sep 17 00:00:00 2001 From: Louis Date: Mon, 14 Aug 2023 18:39:56 -0700 Subject: [PATCH] goflow2: allows using a single port for multiple protocols (sFlow, NetFlow, IPFIX) (#197) --- cmd/goflow2/main.go | 2 ++ utils/pipe.go | 41 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index 5b95d7b..ae291e2 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -231,6 +231,8 @@ func main() { p = utils.NewSFlowPipe(cfgPipe) } else if listenAddrUrl.Scheme == "netflow" { p = utils.NewNetFlowPipe(cfgPipe) + } else if listenAddrUrl.Scheme == "flow" { + p = utils.NewFlowPipe(cfgPipe) } else { l.Errorf("scheme %s does not exist", listenAddrUrl.Scheme) return diff --git a/utils/pipe.go b/utils/pipe.go index d14c14b..1407970 100644 --- a/utils/pipe.go +++ b/utils/pipe.go @@ -186,7 +186,7 @@ func (p *NetFlowPipe) DecodeFlow(msg interface{}) error { return &PipeMessageError{pkt, err} } default: - return &PipeMessageError{pkt, fmt.Errorf("Not a NetFlow packet")} + return &PipeMessageError{pkt, fmt.Errorf("not a NetFlow packet")} } var flowMessageSet []producer.ProducerMessage @@ -222,3 +222,42 @@ func (p *NetFlowPipe) DecodeFlow(msg interface{}) error { func (p *NetFlowPipe) Close() { } + +type AutoFlowPipe struct { + *SFlowPipe + *NetFlowPipe +} + +func NewFlowPipe(cfg *PipeConfig) *AutoFlowPipe { + p := &AutoFlowPipe{ + SFlowPipe: NewSFlowPipe(cfg), + NetFlowPipe: NewNetFlowPipe(cfg), + } + return p +} + +func (p *AutoFlowPipe) Close() { + p.SFlowPipe.Close() + p.NetFlowPipe.Close() +} + +func (p *AutoFlowPipe) DecodeFlow(msg interface{}) error { + pkt, ok := msg.(*Message) + if !ok { + return fmt.Errorf("flow is not *Message") + } + buf := bytes.NewBuffer(pkt.Payload) + + var proto uint32 + if err := utils.BinaryDecoder(buf, &proto); err != nil { + return &PipeMessageError{pkt, err} + } + + protoNetFlow := (proto & 0xFFFF0000) >> 16 + if proto == 5 { + return p.SFlowPipe.DecodeFlow(msg) + } else if protoNetFlow == 5 || protoNetFlow == 9 || protoNetFlow == 10 { + return p.NetFlowPipe.DecodeFlow(msg) + } + return fmt.Errorf("could not identify protocol %d", proto) +}