diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index ae291e2..8ad9915 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -193,6 +193,36 @@ func main() { numSockets = 1 } + var numWorkers int + if listenAddrUrl.Query().Has("workers") { + if numWorkersTmp, err := strconv.ParseUint(listenAddrUrl.Query().Get("workers"), 10, 64); err != nil { + log.Fatal(err) + } else { + numWorkers = int(numWorkersTmp) + } + } + if numWorkers == 0 { + numWorkers = numSockets * 2 + } + + var isBlocking bool + if listenAddrUrl.Query().Has("blocking") { + if isBlocking, err = strconv.ParseBool(listenAddrUrl.Query().Get("blocking")); err != nil { + log.Fatal(err) + } + } + + var queueSize int + if listenAddrUrl.Query().Has("queue_size") { + if queueSizeTmp, err := strconv.ParseUint(listenAddrUrl.Query().Get("queue_size"), 10, 64); err != nil { + log.Fatal(err) + } else { + queueSize = int(queueSizeTmp) + } + } else if !isBlocking { + queueSize = 1000000 + } + hostname := listenAddrUrl.Hostname() port, err := strconv.ParseUint(listenAddrUrl.Port(), 10, 64) if err != nil { @@ -201,17 +231,23 @@ func main() { } logFields := log.Fields{ - "scheme": listenAddrUrl.Scheme, - "hostname": hostname, - "port": port, - "count": numSockets, + "scheme": listenAddrUrl.Scheme, + "hostname": hostname, + "port": port, + "count": numSockets, + "workers": numWorkers, + "blocking": isBlocking, + "queue_size": queueSize, } l := log.WithFields(logFields) l.Info("starting collection") cfg := &utils.UDPReceiverConfig{ - Sockets: numSockets, + Sockets: numSockets, + Workers: numWorkers, + QueueSize: queueSize, + Blocking: isBlocking, } recv, err := utils.NewUDPReceiver(cfg) if err != nil {