From 5ba02103ebfe5b9590759a14879025eb427a9ef9 Mon Sep 17 00:00:00 2001 From: Louis Date: Wed, 6 Dec 2023 13:53:51 +0100 Subject: [PATCH] bug: fix kafka transport default partitioning (#246) --- producer/proto/messages.go | 6 +++--- transport/kafka/kafka.go | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/producer/proto/messages.go b/producer/proto/messages.go index 7b8fb2c..d187683 100644 --- a/producer/proto/messages.go +++ b/producer/proto/messages.go @@ -39,9 +39,6 @@ func (m *ProtoProducerMessage) MarshalText() ([]byte, error) { } func (m *ProtoProducerMessage) baseKey(h hash.Hash) { - if m.formatter == nil || len(m.formatter.key) == 0 { - return - } vfm := reflect.ValueOf(m) vfm = reflect.Indirect(vfm) @@ -72,6 +69,9 @@ func (m *ProtoProducerMessage) baseKey(h hash.Hash) { } func (m *ProtoProducerMessage) Key() []byte { + if m.formatter == nil || len(m.formatter.key) == 0 { + return nil + } h := fnv.New32() m.baseKey(h) return h.Sum(nil) diff --git a/transport/kafka/kafka.go b/transport/kafka/kafka.go index decc565..407a72c 100644 --- a/transport/kafka/kafka.go +++ b/transport/kafka/kafka.go @@ -120,6 +120,7 @@ func (d *KafkaDriver) Init() error { kafkaConfig.Producer.MaxMessageBytes = d.kafkaMaxMsgBytes kafkaConfig.Producer.Flush.Bytes = d.kafkaFlushBytes kafkaConfig.Producer.Flush.Frequency = d.kafkaFlushFrequency + kafkaConfig.Producer.Partitioner = sarama.NewRoundRobinPartitioner if d.kafkaCompressionCodec != "" { /*