1
0
mirror of https://github.com/netsampler/goflow2.git synced 2024-05-06 15:54:52 +00:00

155 lines
3.9 KiB
Go
Raw Normal View History

2021-05-22 16:12:26 -07:00
package main
import (
"context"
"flag"
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"sync"
// import various formatters
"github.com/netsampler/goflow2/format"
_ "github.com/netsampler/goflow2/format/json"
_ "github.com/netsampler/goflow2/format/protobuf"
// import various transports
"github.com/netsampler/goflow2/transport"
_ "github.com/netsampler/goflow2/transport/file"
_ "github.com/netsampler/goflow2/transport/kafka"
"github.com/netsampler/goflow2/utils"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
)
var (
version = ""
buildinfos = ""
AppVersion = "GoFlow2 " + version + " " + buildinfos
ReusePort = flag.Bool("reuseport", false, "Enable so_reuseport")
ListenAddresses = flag.String("listen", "sflow://:6343,netflow://:2055", "listen addresses")
Workers = flag.Int("workers", 1, "Number of workers per collector")
LogLevel = flag.String("loglevel", "info", "Log level")
LogFmt = flag.String("logfmt", "normal", "Log formatter")
Format = flag.String("format", "json", fmt.Sprintf("Choose the format (available: %s)", strings.Join(format.GetFormats(), ", ")))
Transport = flag.String("transport", "file", fmt.Sprintf("Choose the transport (available: %s)", strings.Join(transport.GetTransports(), ", ")))
//FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf")
MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address")
MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path")
TemplatePath = flag.String("templates.path", "/templates", "NetFlow/IPFIX templates list")
Version = flag.Bool("v", false, "Print version")
)
func httpServer( /*state *utils.StateNetFlow*/ ) {
http.Handle(*MetricsPath, promhttp.Handler())
//http.HandleFunc(*TemplatePath, state.ServeHTTPTemplates)
log.Fatal(http.ListenAndServe(*MetricsAddr, nil))
}
func main() {
flag.Parse()
if *Version {
fmt.Println(AppVersion)
os.Exit(0)
}
lvl, _ := log.ParseLevel(*LogLevel)
log.SetLevel(lvl)
ctx := context.Background()
formatter, err := format.FindFormat(ctx, *Format)
if err != nil {
log.Fatal(err)
}
transporter, err := transport.FindTransport(ctx, *Transport)
if err != nil {
log.Fatal(err)
}
defer transporter.Close(ctx)
switch *LogFmt {
case "json":
log.SetFormatter(&log.JSONFormatter{})
}
log.Info("Starting GoFlow2")
go httpServer()
//go httpServer(sNF)
wg := &sync.WaitGroup{}
for _, listenAddress := range strings.Split(*ListenAddresses, ",") {
wg.Add(1)
go func(listenAddress string) {
defer wg.Done()
listenAddrUrl, err := url.Parse(listenAddress)
if err != nil {
log.Fatal(err)
}
hostname := listenAddrUrl.Hostname()
port, err := strconv.ParseUint(listenAddrUrl.Port(), 10, 64)
if err != nil {
log.Errorf("Port %s could not be converted to integer", listenAddrUrl.Port())
return
}
logFields := log.Fields{
"scheme": listenAddrUrl.Scheme,
"hostname": hostname,
"port": port,
}
log.WithFields(logFields).Info("Starting collection")
if listenAddrUrl.Scheme == "sflow" {
sSFlow := &utils.StateSFlow{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
}
err = sSFlow.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else if listenAddrUrl.Scheme == "netflow" {
sNF := &utils.StateNetFlow{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
}
err = sNF.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else if listenAddrUrl.Scheme == "nfl" {
sNFL := &utils.StateNFLegacy{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
}
err = sNFL.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else {
log.Errorf("scheme %s does not exist", listenAddrUrl.Scheme)
return
}
if err != nil {
log.WithFields(logFields).Fatal(err)
}
}(listenAddress)
}
wg.Wait()
}