2021-05-22 16:12:26 -07:00
package kafka
import (
"crypto/tls"
"crypto/x509"
"errors"
"flag"
"fmt"
2023-08-09 19:47:20 -07:00
"net"
2021-05-22 16:12:26 -07:00
"os"
2023-08-09 19:47:20 -07:00
"strconv"
2021-05-22 16:12:26 -07:00
"strings"
2021-10-31 10:23:03 -07:00
"time"
2021-05-22 16:12:26 -07:00
2023-08-09 19:47:20 -07:00
"github.com/netsampler/goflow2/v2/transport"
2021-05-22 16:12:26 -07:00
2023-08-09 19:47:20 -07:00
sarama "github.com/Shopify/sarama"
2021-05-22 16:12:26 -07:00
)
type KafkaDriver struct {
2021-10-31 10:23:03 -07:00
kafkaTLS bool
2022-10-08 15:02:22 -07:00
kafkaSASL string
2021-10-31 10:23:03 -07:00
kafkaTopic string
kafkaSrv string
kafkaBrk string
kafkaMaxMsgBytes int
kafkaFlushBytes int
kafkaFlushFrequency time . Duration
2021-05-22 16:12:26 -07:00
2022-10-08 15:02:22 -07:00
kafkaHashing bool
kafkaVersion string
2022-10-08 14:17:18 -07:00
kafkaCompressionCodec string
2021-05-22 16:12:26 -07:00
producer sarama . AsyncProducer
q chan bool
2023-08-09 19:47:20 -07:00
errors chan error
}
// Error specifically for inner Kafka errors
type KafkaTransportError struct {
Err error
}
func ( e * KafkaTransportError ) Error ( ) string {
return fmt . Sprintf ( "kafka transport %s" , e . Err . Error ( ) )
}
func ( e * KafkaTransportError ) Unwrap ( ) [ ] error {
return [ ] error { transport . ErrorTransport , e . Err }
2021-05-22 16:12:26 -07:00
}
2022-10-08 15:02:22 -07:00
type KafkaSASLAlgorithm string
const (
KAFKA_SASL_NONE KafkaSASLAlgorithm = "none"
KAFKA_SASL_PLAIN KafkaSASLAlgorithm = "plain"
KAFKA_SASL_SCRAM_SHA256 KafkaSASLAlgorithm = "scram-sha256"
KAFKA_SASL_SCRAM_SHA512 KafkaSASLAlgorithm = "scram-sha512"
)
2022-10-08 14:17:18 -07:00
var (
compressionCodecs = map [ string ] sarama . CompressionCodec {
2022-10-08 15:02:22 -07:00
strings . ToLower ( sarama . CompressionNone . String ( ) ) : sarama . CompressionNone ,
strings . ToLower ( sarama . CompressionGZIP . String ( ) ) : sarama . CompressionGZIP ,
2022-10-08 14:17:18 -07:00
strings . ToLower ( sarama . CompressionSnappy . String ( ) ) : sarama . CompressionSnappy ,
2022-10-08 15:02:22 -07:00
strings . ToLower ( sarama . CompressionLZ4 . String ( ) ) : sarama . CompressionLZ4 ,
strings . ToLower ( sarama . CompressionZSTD . String ( ) ) : sarama . CompressionZSTD ,
}
saslAlgorithms = map [ KafkaSASLAlgorithm ] bool {
KAFKA_SASL_PLAIN : true ,
KAFKA_SASL_SCRAM_SHA256 : true ,
KAFKA_SASL_SCRAM_SHA512 : true ,
}
saslAlgorithmsList = [ ] string {
string ( KAFKA_SASL_NONE ) ,
string ( KAFKA_SASL_PLAIN ) ,
string ( KAFKA_SASL_SCRAM_SHA256 ) ,
string ( KAFKA_SASL_SCRAM_SHA512 ) ,
2022-10-08 14:17:18 -07:00
}
)
2021-05-22 16:12:26 -07:00
func ( d * KafkaDriver ) Prepare ( ) error {
flag . BoolVar ( & d . kafkaTLS , "transport.kafka.tls" , false , "Use TLS to connect to Kafka" )
2022-10-08 15:02:22 -07:00
flag . StringVar ( & d . kafkaSASL , "transport.kafka.sasl" , "none" ,
fmt . Sprintf (
"Use SASL to connect to Kafka, available settings: %s (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)" ,
strings . Join ( saslAlgorithmsList , ", " ) ) )
2021-05-22 16:12:26 -07:00
flag . StringVar ( & d . kafkaTopic , "transport.kafka.topic" , "flow-messages" , "Kafka topic to produce to" )
flag . StringVar ( & d . kafkaSrv , "transport.kafka.srv" , "" , "SRV record containing a list of Kafka brokers (or use brokers)" )
flag . StringVar ( & d . kafkaBrk , "transport.kafka.brokers" , "127.0.0.1:9092,[::1]:9092" , "Kafka brokers list separated by commas" )
2021-10-31 10:23:03 -07:00
flag . IntVar ( & d . kafkaMaxMsgBytes , "transport.kafka.maxmsgbytes" , 1000000 , "Kafka max message bytes" )
flag . IntVar ( & d . kafkaFlushBytes , "transport.kafka.flushbytes" , int ( sarama . MaxRequestSize ) , "Kafka flush bytes" )
flag . DurationVar ( & d . kafkaFlushFrequency , "transport.kafka.flushfreq" , time . Second * 5 , "Kafka flush frequency" )
2021-05-22 16:12:26 -07:00
flag . BoolVar ( & d . kafkaHashing , "transport.kafka.hashing" , false , "Enable partition hashing" )
flag . StringVar ( & d . kafkaVersion , "transport.kafka.version" , "2.8.0" , "Kafka version" )
2022-10-08 14:17:18 -07:00
flag . StringVar ( & d . kafkaCompressionCodec , "transport.kafka.compression" , "" , "Kafka default compression" )
2021-05-22 16:12:26 -07:00
return nil
}
2023-08-09 19:47:20 -07:00
func ( d * KafkaDriver ) Errors ( ) <- chan error {
return d . errors
}
func ( d * KafkaDriver ) Init ( ) error {
2021-05-22 16:12:26 -07:00
kafkaConfigVersion , err := sarama . ParseKafkaVersion ( d . kafkaVersion )
if err != nil {
return err
}
kafkaConfig := sarama . NewConfig ( )
kafkaConfig . Version = kafkaConfigVersion
kafkaConfig . Producer . Return . Successes = false
2023-08-09 19:47:20 -07:00
kafkaConfig . Producer . Return . Errors = true
2021-10-31 10:23:03 -07:00
kafkaConfig . Producer . MaxMessageBytes = d . kafkaMaxMsgBytes
kafkaConfig . Producer . Flush . Bytes = d . kafkaFlushBytes
kafkaConfig . Producer . Flush . Frequency = d . kafkaFlushFrequency
2023-12-06 13:53:51 +01:00
kafkaConfig . Producer . Partitioner = sarama . NewRoundRobinPartitioner
2022-10-08 14:17:18 -07:00
if d . kafkaCompressionCodec != "" {
/ *
2022-10-08 15:02:22 -07:00
// when upgrading sarama, replace with:
// note: if the library adds more codecs, they will be supported natively
var cc * sarama . CompressionCodec
2022-10-08 14:17:18 -07:00
2022-10-08 15:02:22 -07:00
if err := cc . UnmarshalText ( [ ] byte ( d . kafkaCompressionCodec ) ) ; err != nil {
return err
}
kafkaConfig . Producer . Compression = * cc
2022-10-08 14:17:18 -07:00
* /
if cc , ok := compressionCodecs [ strings . ToLower ( d . kafkaCompressionCodec ) ] ; ! ok {
2023-08-09 19:47:20 -07:00
return fmt . Errorf ( "compression codec does not exist" )
2022-10-08 14:17:18 -07:00
} else {
kafkaConfig . Producer . Compression = cc
}
}
2022-10-08 15:02:22 -07:00
2021-05-22 16:12:26 -07:00
if d . kafkaTLS {
rootCAs , err := x509 . SystemCertPool ( )
if err != nil {
2023-08-09 19:47:20 -07:00
return fmt . Errorf ( "error initializing TLS: %v" , err )
2021-05-22 16:12:26 -07:00
}
kafkaConfig . Net . TLS . Enable = true
2023-08-09 19:47:20 -07:00
kafkaConfig . Net . TLS . Config = & tls . Config {
RootCAs : rootCAs ,
MinVersion : tls . VersionTLS12 ,
}
2021-05-22 16:12:26 -07:00
}
if d . kafkaHashing {
kafkaConfig . Producer . Partitioner = sarama . NewHashPartitioner
}
2022-10-08 15:02:22 -07:00
kafkaSASL := KafkaSASLAlgorithm ( d . kafkaSASL )
if d . kafkaSASL != "" && kafkaSASL != KAFKA_SASL_NONE {
_ , ok := saslAlgorithms [ KafkaSASLAlgorithm ( strings . ToLower ( d . kafkaSASL ) ) ]
if ! ok {
return errors . New ( "SASL algorithm does not exist" )
2021-05-22 16:12:26 -07:00
}
2022-10-08 15:02:22 -07:00
2021-05-22 16:12:26 -07:00
kafkaConfig . Net . SASL . Enable = true
kafkaConfig . Net . SASL . User = os . Getenv ( "KAFKA_SASL_USER" )
kafkaConfig . Net . SASL . Password = os . Getenv ( "KAFKA_SASL_PASS" )
if kafkaConfig . Net . SASL . User == "" && kafkaConfig . Net . SASL . Password == "" {
2023-08-09 19:47:20 -07:00
return fmt . Errorf ( "Kafka SASL config from environment was unsuccessful. KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set." )
2022-10-08 15:02:22 -07:00
}
if kafkaSASL == KAFKA_SASL_SCRAM_SHA256 || kafkaSASL == KAFKA_SASL_SCRAM_SHA512 {
kafkaConfig . Net . SASL . Handshake = true
if kafkaSASL == KAFKA_SASL_SCRAM_SHA512 {
kafkaConfig . Net . SASL . SCRAMClientGeneratorFunc = func ( ) sarama . SCRAMClient {
return & XDGSCRAMClient { HashGeneratorFcn : SHA512 }
}
kafkaConfig . Net . SASL . Mechanism = sarama . SASLTypeSCRAMSHA512
} else if kafkaSASL == KAFKA_SASL_SCRAM_SHA256 {
kafkaConfig . Net . SASL . SCRAMClientGeneratorFunc = func ( ) sarama . SCRAMClient {
return & XDGSCRAMClient { HashGeneratorFcn : SHA256 }
}
kafkaConfig . Net . SASL . Mechanism = sarama . SASLTypeSCRAMSHA256
}
2021-05-22 16:12:26 -07:00
}
}
2022-01-26 17:24:35 +01:00
var addrs [ ] string
2021-05-22 16:12:26 -07:00
if d . kafkaSrv != "" {
2023-08-09 19:47:20 -07:00
addrs , _ = GetServiceAddresses ( d . kafkaSrv )
2021-05-22 16:12:26 -07:00
} else {
addrs = strings . Split ( d . kafkaBrk , "," )
}
kafkaProducer , err := sarama . NewAsyncProducer ( addrs , kafkaConfig )
if err != nil {
return err
}
d . producer = kafkaProducer
d . q = make ( chan bool )
2023-08-09 19:47:20 -07:00
go func ( ) {
for {
select {
case msg := <- kafkaProducer . Errors ( ) :
var err error
if msg != nil {
err = & KafkaTransportError { msg }
}
2021-05-22 16:12:26 -07:00
select {
2023-08-09 19:47:20 -07:00
case d . errors <- err :
default :
}
if msg == nil {
2021-05-22 16:12:26 -07:00
return
}
2023-08-09 19:47:20 -07:00
case <- d . q :
return
2021-05-22 16:12:26 -07:00
}
2023-08-09 19:47:20 -07:00
}
} ( )
2021-05-22 16:12:26 -07:00
return err
}
func ( d * KafkaDriver ) Send ( key , data [ ] byte ) error {
d . producer . Input ( ) <- & sarama . ProducerMessage {
Topic : d . kafkaTopic ,
Key : sarama . ByteEncoder ( key ) ,
Value : sarama . ByteEncoder ( data ) ,
}
return nil
}
2023-08-09 19:47:20 -07:00
func ( d * KafkaDriver ) Close ( ) error {
2021-05-22 16:12:26 -07:00
d . producer . Close ( )
close ( d . q )
return nil
}
2023-08-09 19:47:20 -07:00
// todo: deprecate?
func GetServiceAddresses ( srv string ) ( addrs [ ] string , err error ) {
_ , srvs , err := net . LookupSRV ( "" , "" , srv )
if err != nil {
return nil , fmt . Errorf ( "service discovery: %v\n" , err )
}
for _ , srv := range srvs {
addrs = append ( addrs , net . JoinHostPort ( srv . Target , strconv . Itoa ( int ( srv . Port ) ) ) )
}
return addrs , nil
}
2021-05-22 16:12:26 -07:00
func init ( ) {
2023-08-09 19:47:20 -07:00
d := & KafkaDriver {
errors : make ( chan error ) ,
}
2021-05-22 16:12:26 -07:00
transport . RegisterTransportDriver ( "kafka" , d )
}