mirror of
https://github.com/netsampler/goflow2.git
synced 2024-05-06 15:54:52 +00:00
Merge pull request #30 from netsampler/bug/enricher-decode
bugfix: enricher decoding certain protobuf
This commit is contained in:
@ -181,7 +181,7 @@ with a database for Autonomous System Number and Country.
|
|||||||
Similar output options as GoFlow are provided.
|
Similar output options as GoFlow are provided.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
$ ./goflow2 -format=pb | ./enricher -db.asn path-to/GeoLite2-ASN.mmdb -db.country path-to/GeoLite2-Country.mmdb
|
$ ./goflow2 -transport.file.sep= -format=pb -format.protobuf.fixedlen=true | ./enricher -db.asn path-to/GeoLite2-ASN.mmdb -db.country path-to/GeoLite2-Country.mmdb
|
||||||
```
|
```
|
||||||
|
|
||||||
For a more scalable production setting, Kafka and protobuf are recommended.
|
For a more scalable production setting, Kafka and protobuf are recommended.
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -146,13 +147,28 @@ func main() {
|
|||||||
rdr := bufio.NewReader(os.Stdin)
|
rdr := bufio.NewReader(os.Stdin)
|
||||||
|
|
||||||
msg := &flowmessage.FlowMessageExt{}
|
msg := &flowmessage.FlowMessageExt{}
|
||||||
|
msgLen := make([]byte, binary.MaxVarintLen64)
|
||||||
|
lenBufSize := len(msgLen)
|
||||||
for {
|
for {
|
||||||
line, err := rdr.ReadBytes('\n')
|
n, err := rdr.Read(msgLen)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(line) == 0 {
|
|
||||||
|
len, vn := proto.DecodeVarint(msgLen[0:n])
|
||||||
|
if len == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
line := make([]byte, len)
|
||||||
|
if vn < lenBufSize {
|
||||||
|
copy(line[0:lenBufSize-vn], msgLen[vn:lenBufSize])
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err = io.ReadFull(rdr, line[lenBufSize-vn:])
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
log.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
line = bytes.TrimSuffix(line, []byte("\n"))
|
line = bytes.TrimSuffix(line, []byte("\n"))
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
|
|
||||||
type FileDriver struct {
|
type FileDriver struct {
|
||||||
fileDestination string
|
fileDestination string
|
||||||
|
lineSeparator string
|
||||||
w io.Writer
|
w io.Writer
|
||||||
file *os.File
|
file *os.File
|
||||||
lock *sync.RWMutex
|
lock *sync.RWMutex
|
||||||
@ -22,6 +23,7 @@ type FileDriver struct {
|
|||||||
|
|
||||||
func (d *FileDriver) Prepare() error {
|
func (d *FileDriver) Prepare() error {
|
||||||
flag.StringVar(&d.fileDestination, "transport.file", "", "File/console output (empty for stdout)")
|
flag.StringVar(&d.fileDestination, "transport.file", "", "File/console output (empty for stdout)")
|
||||||
|
flag.StringVar(&d.lineSeparator, "transport.file.sep", "\n", "Line separator")
|
||||||
// idea: add terminal coloring based on key partitioning (if any)
|
// idea: add terminal coloring based on key partitioning (if any)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -76,7 +78,7 @@ func (d *FileDriver) Send(key, data []byte) error {
|
|||||||
d.lock.RLock()
|
d.lock.RLock()
|
||||||
w := d.w
|
w := d.w
|
||||||
d.lock.RUnlock()
|
d.lock.RUnlock()
|
||||||
_, err := fmt.Fprintln(w, string(data))
|
_, err := fmt.Fprint(w, string(data)+d.lineSeparator)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user