1
0
mirror of https://github.com/netsampler/goflow2.git synced 2024-05-06 15:54:52 +00:00

enricher: add binary marshaller (#212)

This commit is contained in:
Louis
2023-08-31 23:27:47 -07:00
committed by GitHub
parent 8988f2c244
commit 085e34a85f

View File

@ -2,6 +2,7 @@ package main
import ( import (
"bufio" "bufio"
"bytes"
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
@ -62,17 +63,27 @@ func MapCountry(db *geoip2.Reader, addr []byte, dest *string) {
*dest = entry.Country.IsoCode *dest = entry.Country.IsoCode
} }
func MapFlow(dbAsn, dbCountry *geoip2.Reader, msg *flowmessage.FlowMessageExt) { func MapFlow(dbAsn, dbCountry *geoip2.Reader, msg *ProtoProducerMessage) {
if dbAsn != nil { if dbAsn != nil {
MapAsn(dbAsn, msg.SrcAddr, &(msg.SrcAs)) MapAsn(dbAsn, msg.SrcAddr, &(msg.FlowMessageExt.SrcAs))
MapAsn(dbAsn, msg.DstAddr, &(msg.DstAs)) MapAsn(dbAsn, msg.DstAddr, &(msg.FlowMessageExt.DstAs))
} }
if dbCountry != nil { if dbCountry != nil {
MapCountry(dbCountry, msg.SrcAddr, &(msg.SrcCountry)) MapCountry(dbCountry, msg.SrcAddr, &(msg.FlowMessageExt.SrcCountry))
MapCountry(dbCountry, msg.DstAddr, &(msg.DstCountry)) MapCountry(dbCountry, msg.DstAddr, &(msg.FlowMessageExt.DstCountry))
} }
} }
type ProtoProducerMessage struct {
flowmessage.FlowMessageExt
}
func (m *ProtoProducerMessage) MarshalBinary() ([]byte, error) {
buf := bytes.NewBuffer([]byte{})
_, err := protodelim.MarshalTo(buf, m)
return buf.Bytes(), err
}
func main() { func main() {
flag.Parse() flag.Parse()
@ -122,22 +133,22 @@ func main() {
rdr := bufio.NewReader(os.Stdin) rdr := bufio.NewReader(os.Stdin)
msg := &flowmessage.FlowMessageExt{} var msg ProtoProducerMessage
for { for {
if err := protodelim.UnmarshalFrom(rdr, msg); err != nil && errors.Is(err, io.EOF) { if err := protodelim.UnmarshalFrom(rdr, &msg); err != nil && errors.Is(err, io.EOF) {
return return
} else if err != nil { } else if err != nil {
log.Error(err) log.Error(err)
continue continue
} }
MapFlow(dbAsn, dbCountry, msg) MapFlow(dbAsn, dbCountry, &msg)
if *SamplingRate > 0 { if *SamplingRate > 0 {
msg.SamplingRate = uint64(*SamplingRate) msg.SamplingRate = uint64(*SamplingRate)
} }
key, data, err := formatter.Format(msg) key, data, err := formatter.Format(&msg)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
continue continue
@ -148,5 +159,7 @@ func main() {
log.Error(err) log.Error(err)
continue continue
} }
msg.Reset()
} }
} }