diff --git a/cmd/enricher/main.go b/cmd/enricher/main.go index b138382..71bfd1b 100644 --- a/cmd/enricher/main.go +++ b/cmd/enricher/main.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "bytes" "errors" "flag" "fmt" @@ -62,17 +63,27 @@ func MapCountry(db *geoip2.Reader, addr []byte, dest *string) { *dest = entry.Country.IsoCode } -func MapFlow(dbAsn, dbCountry *geoip2.Reader, msg *flowmessage.FlowMessageExt) { +func MapFlow(dbAsn, dbCountry *geoip2.Reader, msg *ProtoProducerMessage) { if dbAsn != nil { - MapAsn(dbAsn, msg.SrcAddr, &(msg.SrcAs)) - MapAsn(dbAsn, msg.DstAddr, &(msg.DstAs)) + MapAsn(dbAsn, msg.SrcAddr, &(msg.FlowMessageExt.SrcAs)) + MapAsn(dbAsn, msg.DstAddr, &(msg.FlowMessageExt.DstAs)) } if dbCountry != nil { - MapCountry(dbCountry, msg.SrcAddr, &(msg.SrcCountry)) - MapCountry(dbCountry, msg.DstAddr, &(msg.DstCountry)) + MapCountry(dbCountry, msg.SrcAddr, &(msg.FlowMessageExt.SrcCountry)) + MapCountry(dbCountry, msg.DstAddr, &(msg.FlowMessageExt.DstCountry)) } } +type ProtoProducerMessage struct { + flowmessage.FlowMessageExt +} + +func (m *ProtoProducerMessage) MarshalBinary() ([]byte, error) { + buf := bytes.NewBuffer([]byte{}) + _, err := protodelim.MarshalTo(buf, m) + return buf.Bytes(), err +} + func main() { flag.Parse() @@ -122,22 +133,22 @@ func main() { rdr := bufio.NewReader(os.Stdin) - msg := &flowmessage.FlowMessageExt{} + var msg ProtoProducerMessage for { - if err := protodelim.UnmarshalFrom(rdr, msg); err != nil && errors.Is(err, io.EOF) { + if err := protodelim.UnmarshalFrom(rdr, &msg); err != nil && errors.Is(err, io.EOF) { return } else if err != nil { log.Error(err) continue } - MapFlow(dbAsn, dbCountry, msg) + MapFlow(dbAsn, dbCountry, &msg) if *SamplingRate > 0 { msg.SamplingRate = uint64(*SamplingRate) } - key, data, err := formatter.Format(msg) + key, data, err := formatter.Format(&msg) if err != nil { log.Error(err) continue @@ -148,5 +159,7 @@ func main() { log.Error(err) continue } + + msg.Reset() } }