-
Notifications
You must be signed in to change notification settings - Fork 369
Open
Description
Expected behavior
When the consumer receives messages, the memory reserved by memLimit.ForceReserveMemory should be fully released by memLimit.ReleaseMemory, ensuring the memory counter remains balanced and does not exceed the configured limit.
Actual behavior
In consumer_partition.go, the logic is inconsistent:
- At consumer_partition.go#L1181-L1436,
memLimit.ForceReserveMemoryuses the accumulated size of all messages. - At consumer_partition.go#L1587-L1708,
memLimit.ReleaseMemoryis called with only the raw/original size.
This leads to a mismatch. For example, if a batch of 3 messages (each 1 KiB) is received:
memLimit.ForceReserveMemoryreserves 6 KiB.- Later,
memLimit.ReleaseMemoryreleases only 3 KiB.
As a result, the memory usage counter can grow beyond the actual limit.
Steps to reproduce
- Run a Pulsar Go consumer with
memLimitconfigured. - Consume batched messages where the aggregated size differs from the raw size (e.g., multiple small messages).
- Observe that memory usage grows inconsistently and eventually exceeds the expected limit.
System configuration
Pulsar Go Client version: v0.14.0, v0.15.1, v0.16.0 (and possibly others)
Suggested fix
Ensure that memLimit.ReleaseMemory is called with the same size used by memLimit.ForceReserveMemory.
Relevant code
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
...
var (
bytesReceived int
skippedMessages int32
)
for i := 0; i < numMsgs; i++ {
...
messages = append(messages, msg)
bytesReceived += msg.size()
if pc.options.autoReceiverQueueSize {
pc.client.memLimit.ForceReserveMemory(int64(bytesReceived)) // should be int64(msg.size())
pc.incomingMessages.Add(int32(1))
pc.markScaleIfNeed()
}
}
...
}func (pc *partitionConsumer) dispatcher() {
...
for {
...
select {
...
// if the messageCh is nil or the messageCh is full this will not be selected
case messageCh <- nextMessage:
// allow this message to be garbage collected
messages[0] = nil
messages = messages[1:]
// for the zeroQueueConsumer, the permits controlled by itself
if pc.options.receiverQueueSize > 0 {
pc.availablePermits.inc()
}
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
pc.client.memLimit.ReleaseMemory(int64(nextMessageSize)) // this is right
pc.expectMoreIncomingMessages()
}
...
}
...
}
...
}Metadata
Metadata
Assignees
Labels
No labels