mirror of
https://github.com/netsampler/goflow2.git
synced 2024-05-06 15:54:52 +00:00
75 lines
2.0 KiB
Go
75 lines
2.0 KiB
Go
package rawproducer
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/netip"
|
|
"time"
|
|
|
|
"github.com/netsampler/goflow2/v2/decoders/netflow"
|
|
"github.com/netsampler/goflow2/v2/decoders/netflowlegacy"
|
|
"github.com/netsampler/goflow2/v2/decoders/sflow"
|
|
"github.com/netsampler/goflow2/v2/producer"
|
|
)
|
|
|
|
// Producer that keeps the same format
|
|
// as the original flow samples.
|
|
// This can be used for debugging (eg: getting NetFlow Option Templates)
|
|
type RawProducer struct {
|
|
}
|
|
|
|
// Raw message
|
|
type RawMessage struct {
|
|
Message interface{} `json:"message"`
|
|
Src netip.AddrPort `json:"src"`
|
|
TimeReceived time.Time `json:"time_received"`
|
|
}
|
|
|
|
func (m RawMessage) MarshalJSON() ([]byte, error) {
|
|
typeStr := "unknown"
|
|
switch m.Message.(type) {
|
|
case *netflowlegacy.PacketNetFlowV5:
|
|
typeStr = "netflowv5"
|
|
case *netflow.NFv9Packet:
|
|
typeStr = "netflowv9"
|
|
case *netflow.IPFIXPacket:
|
|
typeStr = "ipfix"
|
|
case *sflow.Packet:
|
|
typeStr = "sflow"
|
|
}
|
|
|
|
tmpStruct := struct {
|
|
Type string `json:"type"`
|
|
Message interface{} `json:"message"`
|
|
Src *netip.AddrPort `json:"src"`
|
|
TimeReceived *time.Time `json:"time_received"`
|
|
}{
|
|
Type: typeStr,
|
|
Message: m.Message,
|
|
Src: &m.Src,
|
|
TimeReceived: &m.TimeReceived,
|
|
}
|
|
return json.Marshal(tmpStruct)
|
|
}
|
|
|
|
func (m RawMessage) MarshalText() ([]byte, error) {
|
|
var msgContents []byte
|
|
var err error
|
|
if msg, ok := m.Message.(interface {
|
|
MarshalText() ([]byte, error)
|
|
}); ok {
|
|
msgContents, err = msg.MarshalText()
|
|
}
|
|
return []byte(fmt.Sprintf("%s %s: %s", m.TimeReceived.String(), m.Src.String(), string(msgContents))), err
|
|
}
|
|
|
|
func (p *RawProducer) Produce(msg interface{}, args *producer.ProduceArgs) ([]producer.ProducerMessage, error) {
|
|
// should return msg wrapped
|
|
// []*interface{msg,}
|
|
return []producer.ProducerMessage{RawMessage{msg, args.Src, args.TimeReceived}}, nil
|
|
}
|
|
|
|
func (p *RawProducer) Commit(flowMessageSet []producer.ProducerMessage) {}
|
|
|
|
func (p *RawProducer) Close() {}
|