From 66658cfbbc0d803d7e58aa79ee8a79347694f96a Mon Sep 17 00:00:00 2001 From: Zorn Hsu Date: Mon, 16 Jun 2025 15:48:01 -0700 Subject: [PATCH 1/2] feat(producer): rack-aware partitioner Signed-off-by: Zorn Hsu --- async_producer.go | 15 +++++++++++++++ config.go | 4 ++++ .../kafka-console-producer.go | 6 ++++++ 3 files changed, 25 insertions(+) diff --git a/async_producer.go b/async_producer.go index e34eed544..9aa5850b8 100644 --- a/async_producer.go +++ b/async_producer.go @@ -555,6 +555,21 @@ func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { return err } + // Select only partitions with leaders in this rack if configured so, falling back if none are available. + if tp.parent.conf.Producer.PartitionerRackAware { + var availablePartitionsInRack []int32 + var clientRack = tp.parent.client.Config().RackID + for _, p := range partitions { + leaderBroker, err := tp.parent.client.Leader(msg.Topic, p) + if err == nil && leaderBroker.Rack() == clientRack { + availablePartitionsInRack = append(availablePartitionsInRack, p) + } + } + if len(availablePartitionsInRack) > 0 { + partitions = availablePartitionsInRack + } + } + numPartitions := int32(len(partitions)) if numPartitions == 0 { diff --git a/config.go b/config.go index 6a198dc89..63c21d5bc 100644 --- a/config.go +++ b/config.go @@ -196,6 +196,8 @@ type Config struct { // (defaults to hashing the message key). Similar to the `partitioner.class` // setting for the JVM producer. Partitioner PartitionerConstructor + // Controls whether the partitioner is rack-aware. This also affects custom partitioners. + PartitionerRackAware bool // If enabled, the producer will ensure that exactly one copy of each message is // written. Idempotent bool @@ -546,6 +548,8 @@ func NewConfig() *Config { c.Producer.Return.Errors = true c.Producer.CompressionLevel = CompressionLevelDefault + c.Producer.PartitionerRackAware = false + c.Producer.Transaction.Timeout = 1 * time.Minute c.Producer.Transaction.Retry.Max = 50 c.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond diff --git a/tools/kafka-console-producer/kafka-console-producer.go b/tools/kafka-console-producer/kafka-console-producer.go index 43e15ab9f..e2ed7d9c5 100644 --- a/tools/kafka-console-producer/kafka-console-producer.go +++ b/tools/kafka-console-producer/kafka-console-producer.go @@ -22,6 +22,7 @@ var ( value = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.") partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`") partition = flag.Int("partition", -1, "The partition to produce to.") + rackID = flag.String("rackid", "", "Produce to leaders with the same client.rack") verbose = flag.Bool("verbose", false, "Turn on sarama logging to stderr") showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr") silent = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout") @@ -83,6 +84,11 @@ func main() { printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner)) } + if *rackID != "" { + config.Producer.PartitionerRackAware = true + config.RackID = *rackID + } + message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)} if *key != "" { From 5a210446332a2c819b858647a74bc2e5dd637c87 Mon Sep 17 00:00:00 2001 From: Zorn Hsu Date: Tue, 17 Jun 2025 15:20:42 -0700 Subject: [PATCH 2/2] address feedback Signed-off-by: Zorn Hsu --- async_producer.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/async_producer.go b/async_producer.go index 9aa5850b8..8a65bc581 100644 --- a/async_producer.go +++ b/async_producer.go @@ -557,16 +557,17 @@ func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { // Select only partitions with leaders in this rack if configured so, falling back if none are available. if tp.parent.conf.Producer.PartitionerRackAware { - var availablePartitionsInRack []int32 - var clientRack = tp.parent.client.Config().RackID + clientRack := tp.parent.client.Config().RackID + + var partitionsInRack []int32 for _, p := range partitions { - leaderBroker, err := tp.parent.client.Leader(msg.Topic, p) - if err == nil && leaderBroker.Rack() == clientRack { - availablePartitionsInRack = append(availablePartitionsInRack, p) + leader, err := tp.parent.client.Leader(msg.Topic, p) + if err == nil && leader.Rack() == clientRack { + partitionsInRack = append(partitionsInRack, p) } } - if len(availablePartitionsInRack) > 0 { - partitions = availablePartitionsInRack + if len(partitionsInRack) > 0 { + partitions = partitionsInRack } }