diff --git a/transport/kafka/kafka.go b/transport/kafka/kafka.go index 76f8fa2..f87ce8f 100644 --- a/transport/kafka/kafka.go +++ b/transport/kafka/kafka.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "strings" + "time" sarama "github.com/Shopify/sarama" "github.com/netsampler/goflow2/transport" @@ -18,11 +19,14 @@ import ( ) type KafkaDriver struct { - kafkaTLS bool - kafkaSASL bool - kafkaTopic string - kafkaSrv string - kafkaBrk string + kafkaTLS bool + kafkaSASL bool + kafkaTopic string + kafkaSrv string + kafkaBrk string + kafkaMaxMsgBytes int + kafkaFlushBytes int + kafkaFlushFrequency time.Duration kafkaLogErrors bool @@ -41,6 +45,9 @@ func (d *KafkaDriver) Prepare() error { flag.StringVar(&d.kafkaTopic, "transport.kafka.topic", "flow-messages", "Kafka topic to produce to") flag.StringVar(&d.kafkaSrv, "transport.kafka.srv", "", "SRV record containing a list of Kafka brokers (or use brokers)") flag.StringVar(&d.kafkaBrk, "transport.kafka.brokers", "127.0.0.1:9092,[::1]:9092", "Kafka brokers list separated by commas") + flag.IntVar(&d.kafkaMaxMsgBytes, "transport.kafka.maxmsgbytes", 1000000, "Kafka max message bytes") + flag.IntVar(&d.kafkaFlushBytes, "transport.kafka.flushbytes", int(sarama.MaxRequestSize), "Kafka flush bytes") + flag.DurationVar(&d.kafkaFlushFrequency, "transport.kafka.flushfreq", time.Second*5, "Kafka flush frequency") flag.BoolVar(&d.kafkaLogErrors, "transport.kafka.log.err", false, "Log Kafka errors") flag.BoolVar(&d.kafkaHashing, "transport.kafka.hashing", false, "Enable partition hashing") @@ -61,6 +68,9 @@ func (d *KafkaDriver) Init(context.Context) error { kafkaConfig.Version = kafkaConfigVersion kafkaConfig.Producer.Return.Successes = false kafkaConfig.Producer.Return.Errors = d.kafkaLogErrors + kafkaConfig.Producer.MaxMessageBytes = d.kafkaMaxMsgBytes + kafkaConfig.Producer.Flush.Bytes = d.kafkaFlushBytes + kafkaConfig.Producer.Flush.Frequency = d.kafkaFlushFrequency if d.kafkaTLS { rootCAs, err := x509.SystemCertPool() if err != nil {