Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,22 @@ func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
return err
}

// Select only partitions with leaders in this rack if configured so, falling back if none are available.
if tp.parent.conf.Producer.PartitionerRackAware {
clientRack := tp.parent.client.Config().RackID

var partitionsInRack []int32
for _, p := range partitions {
leader, err := tp.parent.client.Leader(msg.Topic, p)
if err == nil && leader.Rack() == clientRack {
partitionsInRack = append(partitionsInRack, p)
}
}
if len(partitionsInRack) > 0 {
partitions = partitionsInRack
}
}

numPartitions := int32(len(partitions))

if numPartitions == 0 {
Expand Down
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ type Config struct {
// (defaults to hashing the message key). Similar to the `partitioner.class`
// setting for the JVM producer.
Partitioner PartitionerConstructor
// Controls whether the partitioner is rack-aware. This also affects custom partitioners.
PartitionerRackAware bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Do we have any Java-side Kafka config value to inform this config name? Does Java’s Kafka even support this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the Java version is partitioner.rack.aware, where the Java PR to support this is pending review.

// If enabled, the producer will ensure that exactly one copy of each message is
// written.
Idempotent bool
Expand Down Expand Up @@ -546,6 +548,8 @@ func NewConfig() *Config {
c.Producer.Return.Errors = true
c.Producer.CompressionLevel = CompressionLevelDefault

c.Producer.PartitionerRackAware = false

c.Producer.Transaction.Timeout = 1 * time.Minute
c.Producer.Transaction.Retry.Max = 50
c.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond
Expand Down
6 changes: 6 additions & 0 deletions tools/kafka-console-producer/kafka-console-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
value = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
partition = flag.Int("partition", -1, "The partition to produce to.")
rackID = flag.String("rackid", "", "Produce to leaders with the same client.rack")
verbose = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
silent = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
Expand Down Expand Up @@ -83,6 +84,11 @@ func main() {
printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner))
}

if *rackID != "" {
config.Producer.PartitionerRackAware = true
config.RackID = *rackID
}

message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)}

if *key != "" {
Expand Down
Loading