From 21c7fee62ef68c4e598179cc527fa6292fbe06c4 Mon Sep 17 00:00:00 2001 From: lspgn Date: Fri, 27 Aug 2021 10:04:22 -0700 Subject: [PATCH] bugfix: enricher decoding certain protobuf * now requires length prefixed messages * allows custom line separator for text transport output --- README.md | 2 +- cmd/enricher/main.go | 20 ++++++++++++++++++-- transport/file/transport.go | 4 +++- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index b562a8a..474c12a 100644 --- a/README.md +++ b/README.md @@ -181,7 +181,7 @@ with a database for Autonomous System Number and Country. Similar output options as GoFlow are provided. ```bash -$ ./goflow2 -format=pb | ./enricher -db.asn path-to/GeoLite2-ASN.mmdb -db.country path-to/GeoLite2-Country.mmdb +$ ./goflow2 -transport.file.sep= -format=pb -format.protobuf.fixedlen=true | ./enricher -db.asn path-to/GeoLite2-ASN.mmdb -db.country path-to/GeoLite2-Country.mmdb ``` For a more scalable production setting, Kafka and protobuf are recommended. diff --git a/cmd/enricher/main.go b/cmd/enricher/main.go index 23ef5db..ccb861a 100644 --- a/cmd/enricher/main.go +++ b/cmd/enricher/main.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "encoding/binary" "flag" "fmt" "io" @@ -144,13 +145,28 @@ func main() { rdr := bufio.NewReader(os.Stdin) msg := &flowmessage.FlowMessageExt{} + msgLen := make([]byte, binary.MaxVarintLen64) + lenBufSize := len(msgLen) for { - line, err := rdr.ReadBytes('\n') + n, err := rdr.Read(msgLen) if err != nil && err != io.EOF { log.Error(err) continue } - if len(line) == 0 { + + len, vn := proto.DecodeVarint(msgLen[0:n]) + if len == 0 { + continue + } + + line := make([]byte, len) + if vn < lenBufSize { + copy(line[0:lenBufSize-vn], msgLen[vn:lenBufSize]) + } + + n, err = io.ReadFull(rdr, line[lenBufSize-vn:]) + if err != nil && err != io.EOF { + log.Error(err) continue } line = bytes.TrimSuffix(line, []byte("\n")) diff --git a/transport/file/transport.go b/transport/file/transport.go index b3d4e43..5143c8b 100644 --- a/transport/file/transport.go +++ b/transport/file/transport.go @@ -14,6 +14,7 @@ import ( type FileDriver struct { fileDestination string + lineSeparator string w io.Writer file *os.File lock *sync.RWMutex @@ -22,6 +23,7 @@ type FileDriver struct { func (d *FileDriver) Prepare() error { flag.StringVar(&d.fileDestination, "transport.file", "", "File/console output (empty for stdout)") + flag.StringVar(&d.lineSeparator, "transport.file.sep", "\n", "Line separator") // idea: add terminal coloring based on key partitioning (if any) return nil } @@ -76,7 +78,7 @@ func (d *FileDriver) Send(key, data []byte) error { d.lock.RLock() w := d.w d.lock.RUnlock() - _, err := fmt.Fprintln(w, string(data)) + _, err := fmt.Fprint(w, string(data)+d.lineSeparator) return err }