2021-05-22 16:12:26 -07:00
package kafka
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"flag"
"fmt"
"os"
"strings"
2021-10-31 10:23:03 -07:00
"time"
2021-05-22 16:12:26 -07:00
sarama "github.com/Shopify/sarama"
"github.com/netsampler/goflow2/transport"
"github.com/netsampler/goflow2/utils"
log "github.com/sirupsen/logrus"
)
type KafkaDriver struct {
2021-10-31 10:23:03 -07:00
kafkaTLS bool
2022-10-08 15:02:22 -07:00
kafkaSASL string
kafkaSCRAM string
2021-10-31 10:23:03 -07:00
kafkaTopic string
kafkaSrv string
kafkaBrk string
kafkaMaxMsgBytes int
kafkaFlushBytes int
kafkaFlushFrequency time . Duration
2021-05-22 16:12:26 -07:00
kafkaLogErrors bool
2022-10-08 15:02:22 -07:00
kafkaHashing bool
kafkaVersion string
2022-10-08 14:17:18 -07:00
kafkaCompressionCodec string
2021-05-22 16:12:26 -07:00
producer sarama . AsyncProducer
q chan bool
}
2022-10-08 15:02:22 -07:00
type KafkaSASLAlgorithm string
const (
KAFKA_SASL_NONE KafkaSASLAlgorithm = "none"
KAFKA_SASL_PLAIN KafkaSASLAlgorithm = "plain"
KAFKA_SASL_SCRAM_SHA256 KafkaSASLAlgorithm = "scram-sha256"
KAFKA_SASL_SCRAM_SHA512 KafkaSASLAlgorithm = "scram-sha512"
)
2022-10-08 14:17:18 -07:00
var (
compressionCodecs = map [ string ] sarama . CompressionCodec {
2022-10-08 15:02:22 -07:00
strings . ToLower ( sarama . CompressionNone . String ( ) ) : sarama . CompressionNone ,
strings . ToLower ( sarama . CompressionGZIP . String ( ) ) : sarama . CompressionGZIP ,
2022-10-08 14:17:18 -07:00
strings . ToLower ( sarama . CompressionSnappy . String ( ) ) : sarama . CompressionSnappy ,
2022-10-08 15:02:22 -07:00
strings . ToLower ( sarama . CompressionLZ4 . String ( ) ) : sarama . CompressionLZ4 ,
strings . ToLower ( sarama . CompressionZSTD . String ( ) ) : sarama . CompressionZSTD ,
}
saslAlgorithms = map [ KafkaSASLAlgorithm ] bool {
KAFKA_SASL_PLAIN : true ,
KAFKA_SASL_SCRAM_SHA256 : true ,
KAFKA_SASL_SCRAM_SHA512 : true ,
}
saslAlgorithmsList = [ ] string {
string ( KAFKA_SASL_NONE ) ,
string ( KAFKA_SASL_PLAIN ) ,
string ( KAFKA_SASL_SCRAM_SHA256 ) ,
string ( KAFKA_SASL_SCRAM_SHA512 ) ,
2022-10-08 14:17:18 -07:00
}
)
2021-05-22 16:12:26 -07:00
func ( d * KafkaDriver ) Prepare ( ) error {
flag . BoolVar ( & d . kafkaTLS , "transport.kafka.tls" , false , "Use TLS to connect to Kafka" )
2022-10-08 15:02:22 -07:00
flag . StringVar ( & d . kafkaSASL , "transport.kafka.sasl" , "none" ,
fmt . Sprintf (
"Use SASL to connect to Kafka, available settings: %s (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)" ,
strings . Join ( saslAlgorithmsList , ", " ) ) )
2021-05-22 16:12:26 -07:00
flag . StringVar ( & d . kafkaTopic , "transport.kafka.topic" , "flow-messages" , "Kafka topic to produce to" )
flag . StringVar ( & d . kafkaSrv , "transport.kafka.srv" , "" , "SRV record containing a list of Kafka brokers (or use brokers)" )
flag . StringVar ( & d . kafkaBrk , "transport.kafka.brokers" , "127.0.0.1:9092,[::1]:9092" , "Kafka brokers list separated by commas" )
2021-10-31 10:23:03 -07:00
flag . IntVar ( & d . kafkaMaxMsgBytes , "transport.kafka.maxmsgbytes" , 1000000 , "Kafka max message bytes" )
flag . IntVar ( & d . kafkaFlushBytes , "transport.kafka.flushbytes" , int ( sarama . MaxRequestSize ) , "Kafka flush bytes" )
flag . DurationVar ( & d . kafkaFlushFrequency , "transport.kafka.flushfreq" , time . Second * 5 , "Kafka flush frequency" )
2021-05-22 16:12:26 -07:00
flag . BoolVar ( & d . kafkaLogErrors , "transport.kafka.log.err" , false , "Log Kafka errors" )
flag . BoolVar ( & d . kafkaHashing , "transport.kafka.hashing" , false , "Enable partition hashing" )
//flag.StringVar(&d.kafkaKeying, "transport.kafka.key", "SamplerAddress,DstAS", "Kafka list of fields to do hashing on (partition) separated by commas")
flag . StringVar ( & d . kafkaVersion , "transport.kafka.version" , "2.8.0" , "Kafka version" )
2022-10-08 14:17:18 -07:00
flag . StringVar ( & d . kafkaCompressionCodec , "transport.kafka.compression" , "" , "Kafka default compression" )
2021-05-22 16:12:26 -07:00
return nil
}
func ( d * KafkaDriver ) Init ( context . Context ) error {
kafkaConfigVersion , err := sarama . ParseKafkaVersion ( d . kafkaVersion )
if err != nil {
return err
}
kafkaConfig := sarama . NewConfig ( )
kafkaConfig . Version = kafkaConfigVersion
kafkaConfig . Producer . Return . Successes = false
kafkaConfig . Producer . Return . Errors = d . kafkaLogErrors
2021-10-31 10:23:03 -07:00
kafkaConfig . Producer . MaxMessageBytes = d . kafkaMaxMsgBytes
kafkaConfig . Producer . Flush . Bytes = d . kafkaFlushBytes
kafkaConfig . Producer . Flush . Frequency = d . kafkaFlushFrequency
2022-10-08 14:17:18 -07:00
if d . kafkaCompressionCodec != "" {
/ *
2022-10-08 15:02:22 -07:00
// when upgrading sarama, replace with:
// note: if the library adds more codecs, they will be supported natively
var cc * sarama . CompressionCodec
2022-10-08 14:17:18 -07:00
2022-10-08 15:02:22 -07:00
if err := cc . UnmarshalText ( [ ] byte ( d . kafkaCompressionCodec ) ) ; err != nil {
return err
}
kafkaConfig . Producer . Compression = * cc
2022-10-08 14:17:18 -07:00
* /
if cc , ok := compressionCodecs [ strings . ToLower ( d . kafkaCompressionCodec ) ] ; ! ok {
return errors . New ( "compression codec does not exist" )
} else {
kafkaConfig . Producer . Compression = cc
}
}
2022-10-08 15:02:22 -07:00
2021-05-22 16:12:26 -07:00
if d . kafkaTLS {
rootCAs , err := x509 . SystemCertPool ( )
if err != nil {
return errors . New ( fmt . Sprintf ( "Error initializing TLS: %v" , err ) )
}
kafkaConfig . Net . TLS . Enable = true
kafkaConfig . Net . TLS . Config = & tls . Config { RootCAs : rootCAs }
}
if d . kafkaHashing {
kafkaConfig . Producer . Partitioner = sarama . NewHashPartitioner
}
2022-10-08 15:02:22 -07:00
kafkaSASL := KafkaSASLAlgorithm ( d . kafkaSASL )
if d . kafkaSASL != "" && kafkaSASL != KAFKA_SASL_NONE {
_ , ok := saslAlgorithms [ KafkaSASLAlgorithm ( strings . ToLower ( d . kafkaSASL ) ) ]
if ! ok {
return errors . New ( "SASL algorithm does not exist" )
2021-05-22 16:12:26 -07:00
}
2022-10-08 15:02:22 -07:00
2021-05-22 16:12:26 -07:00
kafkaConfig . Net . SASL . Enable = true
kafkaConfig . Net . SASL . User = os . Getenv ( "KAFKA_SASL_USER" )
kafkaConfig . Net . SASL . Password = os . Getenv ( "KAFKA_SASL_PASS" )
if kafkaConfig . Net . SASL . User == "" && kafkaConfig . Net . SASL . Password == "" {
return errors . New ( "Kafka SASL config from environment was unsuccessful. KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set." )
2022-10-08 15:02:22 -07:00
}
if kafkaSASL == KAFKA_SASL_SCRAM_SHA256 || kafkaSASL == KAFKA_SASL_SCRAM_SHA512 {
kafkaConfig . Net . SASL . Handshake = true
if kafkaSASL == KAFKA_SASL_SCRAM_SHA512 {
kafkaConfig . Net . SASL . SCRAMClientGeneratorFunc = func ( ) sarama . SCRAMClient {
return & XDGSCRAMClient { HashGeneratorFcn : SHA512 }
}
kafkaConfig . Net . SASL . Mechanism = sarama . SASLTypeSCRAMSHA512
} else if kafkaSASL == KAFKA_SASL_SCRAM_SHA256 {
kafkaConfig . Net . SASL . SCRAMClientGeneratorFunc = func ( ) sarama . SCRAMClient {
return & XDGSCRAMClient { HashGeneratorFcn : SHA256 }
}
kafkaConfig . Net . SASL . Mechanism = sarama . SASLTypeSCRAMSHA256
}
2021-05-22 16:12:26 -07:00
}
}
2022-01-26 17:24:35 +01:00
var addrs [ ] string
2021-05-22 16:12:26 -07:00
if d . kafkaSrv != "" {
addrs , _ = utils . GetServiceAddresses ( d . kafkaSrv )
} else {
addrs = strings . Split ( d . kafkaBrk , "," )
}
kafkaProducer , err := sarama . NewAsyncProducer ( addrs , kafkaConfig )
if err != nil {
return err
}
d . producer = kafkaProducer
d . q = make ( chan bool )
if d . kafkaLogErrors {
go func ( ) {
for {
select {
case msg := <- kafkaProducer . Errors ( ) :
//if log != nil {
log . Error ( msg )
//}
case <- d . q :
return
}
}
} ( )
}
return err
}
func ( d * KafkaDriver ) Send ( key , data [ ] byte ) error {
d . producer . Input ( ) <- & sarama . ProducerMessage {
Topic : d . kafkaTopic ,
Key : sarama . ByteEncoder ( key ) ,
Value : sarama . ByteEncoder ( data ) ,
}
return nil
}
func ( d * KafkaDriver ) Close ( context . Context ) error {
d . producer . Close ( )
close ( d . q )
return nil
}
func init ( ) {
d := & KafkaDriver { }
transport . RegisterTransportDriver ( "kafka" , d )
}