1
0
mirror of https://github.com/netsampler/goflow2.git synced 2024-05-06 15:54:52 +00:00
Files
netsampler-goflow2/cmd/enricher/main.go

179 lines
4.2 KiB
Go
Raw Normal View History

2021-05-22 16:12:26 -07:00
package main
import (
"bufio"
2023-08-31 23:27:47 -07:00
"bytes"
2023-08-09 19:47:20 -07:00
"errors"
2021-05-22 16:12:26 -07:00
"flag"
"fmt"
"io"
"log"
"log/slog"
2021-05-22 16:12:26 -07:00
"net"
"os"
"strings"
2023-08-09 19:47:20 -07:00
flowmessage "github.com/netsampler/goflow2/v2/cmd/enricher/pb"
2021-05-22 16:12:26 -07:00
// import various formatters
2023-08-09 19:47:20 -07:00
"github.com/netsampler/goflow2/v2/format"
_ "github.com/netsampler/goflow2/v2/format/binary"
_ "github.com/netsampler/goflow2/v2/format/json"
_ "github.com/netsampler/goflow2/v2/format/text"
2021-05-22 16:12:26 -07:00
// import various transports
2023-08-09 19:47:20 -07:00
"github.com/netsampler/goflow2/v2/transport"
_ "github.com/netsampler/goflow2/v2/transport/file"
_ "github.com/netsampler/goflow2/v2/transport/kafka"
2021-05-22 16:12:26 -07:00
2023-08-09 19:47:20 -07:00
"github.com/oschwald/geoip2-golang"
"google.golang.org/protobuf/encoding/protodelim"
2021-05-22 16:12:26 -07:00
)
var (
version = ""
buildinfos = ""
AppVersion = "Enricher " + version + " " + buildinfos
DbAsn = flag.String("db.asn", "", "IP->ASN database")
DbCountry = flag.String("db.country", "", "IP->Country database")
LogLevel = flag.String("loglevel", "info", "Log level")
LogFmt = flag.String("logfmt", "normal", "Log formatter")
2021-08-10 22:32:43 -07:00
SamplingRate = flag.Int("samplingrate", 0, "Set sampling rate (values > 0)")
2021-05-22 16:12:26 -07:00
Format = flag.String("format", "json", fmt.Sprintf("Choose the format (available: %s)", strings.Join(format.GetFormats(), ", ")))
Transport = flag.String("transport", "file", fmt.Sprintf("Choose the transport (available: %s)", strings.Join(transport.GetTransports(), ", ")))
Version = flag.Bool("v", false, "Print version")
)
func MapAsn(db *geoip2.Reader, addr []byte, dest *uint32) {
entry, err := db.ASN(net.IP(addr))
if err != nil {
return
}
*dest = uint32(entry.AutonomousSystemNumber)
}
func MapCountry(db *geoip2.Reader, addr []byte, dest *string) {
entry, err := db.Country(net.IP(addr))
if err != nil {
return
}
*dest = entry.Country.IsoCode
}
2023-08-31 23:27:47 -07:00
func MapFlow(dbAsn, dbCountry *geoip2.Reader, msg *ProtoProducerMessage) {
2021-05-22 16:12:26 -07:00
if dbAsn != nil {
2023-08-31 23:27:47 -07:00
MapAsn(dbAsn, msg.SrcAddr, &(msg.FlowMessageExt.SrcAs))
MapAsn(dbAsn, msg.DstAddr, &(msg.FlowMessageExt.DstAs))
2021-05-22 16:12:26 -07:00
}
if dbCountry != nil {
2023-08-31 23:27:47 -07:00
MapCountry(dbCountry, msg.SrcAddr, &(msg.FlowMessageExt.SrcCountry))
MapCountry(dbCountry, msg.DstAddr, &(msg.FlowMessageExt.DstCountry))
2021-05-22 16:12:26 -07:00
}
}
2023-08-31 23:27:47 -07:00
type ProtoProducerMessage struct {
flowmessage.FlowMessageExt
}
func (m *ProtoProducerMessage) MarshalBinary() ([]byte, error) {
buf := bytes.NewBuffer([]byte{})
_, err := protodelim.MarshalTo(buf, m)
return buf.Bytes(), err
}
2021-05-22 16:12:26 -07:00
func main() {
flag.Parse()
if *Version {
fmt.Println(AppVersion)
os.Exit(0)
}
var loglevel slog.Level
if err := loglevel.UnmarshalText([]byte(*LogLevel)); err != nil {
log.Fatal("error parsing log level")
}
lo := slog.HandlerOptions{
Level: loglevel,
}
logger := slog.New(slog.NewTextHandler(os.Stderr, &lo))
switch *LogFmt {
case "json":
logger = slog.New(slog.NewJSONHandler(os.Stderr, &lo))
}
slog.SetDefault(logger)
2021-05-22 16:12:26 -07:00
var dbAsn, dbCountry *geoip2.Reader
var err error
if *DbAsn != "" {
dbAsn, err = geoip2.Open(*DbAsn)
if err != nil {
slog.Error("error opening asn db", slog.String("error", err.Error()))
os.Exit(1)
2021-05-22 16:12:26 -07:00
}
defer dbAsn.Close()
}
if *DbCountry != "" {
dbCountry, err = geoip2.Open(*DbCountry)
if err != nil {
slog.Error("error opening country db", slog.String("error", err.Error()))
os.Exit(1)
2021-05-22 16:12:26 -07:00
}
defer dbCountry.Close()
}
2023-08-09 19:47:20 -07:00
formatter, err := format.FindFormat(*Format)
2021-05-22 16:12:26 -07:00
if err != nil {
log.Fatal(err)
}
2023-08-09 19:47:20 -07:00
transporter, err := transport.FindTransport(*Transport)
2021-05-22 16:12:26 -07:00
if err != nil {
slog.Error("error transporter", slog.String("error", err.Error()))
os.Exit(1)
2021-05-22 16:12:26 -07:00
}
2023-08-09 19:47:20 -07:00
defer transporter.Close()
2021-05-22 16:12:26 -07:00
logger.Info("starting enricher")
2021-05-22 16:12:26 -07:00
rdr := bufio.NewReader(os.Stdin)
2023-08-31 23:27:47 -07:00
var msg ProtoProducerMessage
2021-05-22 16:12:26 -07:00
for {
2023-08-31 23:27:47 -07:00
if err := protodelim.UnmarshalFrom(rdr, &msg); err != nil && errors.Is(err, io.EOF) {
2023-08-09 19:47:20 -07:00
return
} else if err != nil {
slog.Error("error unmarshalling message", slog.String("error", err.Error()))
2021-05-22 16:12:26 -07:00
continue
}
2023-08-31 23:27:47 -07:00
MapFlow(dbAsn, dbCountry, &msg)
2021-05-22 16:12:26 -07:00
2021-08-10 22:32:43 -07:00
if *SamplingRate > 0 {
msg.SamplingRate = uint64(*SamplingRate)
}
2023-08-31 23:27:47 -07:00
key, data, err := formatter.Format(&msg)
2021-05-22 16:12:26 -07:00
if err != nil {
slog.Error("error formatting message", slog.String("error", err.Error()))
2021-05-22 16:12:26 -07:00
continue
}
err = transporter.Send(key, data)
if err != nil {
slog.Error("error sending message", slog.String("error", err.Error()))
2021-05-22 16:12:26 -07:00
continue
}
2023-08-31 23:27:47 -07:00
msg.Reset()
2021-05-22 16:12:26 -07:00
}
}