Skip to content

[Bug] Pulsar Go Client consumer memory limit does not work as expected #1427

@danielchang-Z

Description

@danielchang-Z

Expected behavior

When the consumer receives messages, the memory reserved by memLimit.ForceReserveMemory should be fully released by memLimit.ReleaseMemory, ensuring the memory counter remains balanced and does not exceed the configured limit.

Actual behavior

In consumer_partition.go, the logic is inconsistent:

This leads to a mismatch. For example, if a batch of 3 messages (each 1 KiB) is received:

  • memLimit.ForceReserveMemory reserves 6 KiB.
  • Later, memLimit.ReleaseMemory releases only 3 KiB.

As a result, the memory usage counter can grow beyond the actual limit.

Steps to reproduce

  1. Run a Pulsar Go consumer with memLimit configured.
  2. Consume batched messages where the aggregated size differs from the raw size (e.g., multiple small messages).
  3. Observe that memory usage grows inconsistently and eventually exceeds the expected limit.

System configuration

Pulsar Go Client version: v0.14.0, v0.15.1, v0.16.0 (and possibly others)

Suggested fix

Ensure that memLimit.ReleaseMemory is called with the same size used by memLimit.ForceReserveMemory.

Relevant code

func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
    ...
	var (
		bytesReceived   int
		skippedMessages int32
	)
	for i := 0; i < numMsgs; i++ {
        ...
		messages = append(messages, msg)
		bytesReceived += msg.size()
		if pc.options.autoReceiverQueueSize {
			pc.client.memLimit.ForceReserveMemory(int64(bytesReceived)) // should be int64(msg.size())
			pc.incomingMessages.Add(int32(1))
			pc.markScaleIfNeed()
		}
    }
    ...
}
func (pc *partitionConsumer) dispatcher() {
	...
	for {
		...
		select {
		...
		// if the messageCh is nil or the messageCh is full this will not be selected
		case messageCh <- nextMessage:
			// allow this message to be garbage collected
			messages[0] = nil
			messages = messages[1:]

			// for the zeroQueueConsumer, the permits controlled by itself
			if pc.options.receiverQueueSize > 0 {
				pc.availablePermits.inc()
			}

			if pc.options.autoReceiverQueueSize {
				pc.incomingMessages.Dec()
				pc.client.memLimit.ReleaseMemory(int64(nextMessageSize)) // this is right
				pc.expectMoreIncomingMessages()
			}
		...
		}
		...
	}
	...
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions