diff --git a/producer/proto/messages.go b/producer/proto/messages.go index 7b8fb2c..d187683 100644 --- a/producer/proto/messages.go +++ b/producer/proto/messages.go @@ -39,9 +39,6 @@ func (m *ProtoProducerMessage) MarshalText() ([]byte, error) { } func (m *ProtoProducerMessage) baseKey(h hash.Hash) { - if m.formatter == nil || len(m.formatter.key) == 0 { - return - } vfm := reflect.ValueOf(m) vfm = reflect.Indirect(vfm) @@ -72,6 +69,9 @@ func (m *ProtoProducerMessage) baseKey(h hash.Hash) { } func (m *ProtoProducerMessage) Key() []byte { + if m.formatter == nil || len(m.formatter.key) == 0 { + return nil + } h := fnv.New32() m.baseKey(h) return h.Sum(nil) diff --git a/transport/kafka/kafka.go b/transport/kafka/kafka.go index decc565..407a72c 100644 --- a/transport/kafka/kafka.go +++ b/transport/kafka/kafka.go @@ -120,6 +120,7 @@ func (d *KafkaDriver) Init() error { kafkaConfig.Producer.MaxMessageBytes = d.kafkaMaxMsgBytes kafkaConfig.Producer.Flush.Bytes = d.kafkaFlushBytes kafkaConfig.Producer.Flush.Frequency = d.kafkaFlushFrequency + kafkaConfig.Producer.Partitioner = sarama.NewRoundRobinPartitioner if d.kafkaCompressionCodec != "" { /*