Skip to content

[CELEBORN-2274] Fix replicate channels not resumed when transitioning from PUSH_AND_REPLICATE_PAUSED to PUSH_PAUSED#3616

Open
sl3635 wants to merge 1 commit intoapache:mainfrom
sl3635:CELEBORN-2274
Open

[CELEBORN-2274] Fix replicate channels not resumed when transitioning from PUSH_AND_REPLICATE_PAUSED to PUSH_PAUSED#3616
sl3635 wants to merge 1 commit intoapache:mainfrom
sl3635:CELEBORN-2274

Conversation

@sl3635
Copy link
Contributor

@sl3635 sl3635 commented Mar 5, 2026

What changes were proposed in this pull request?

Fix a bug in MemoryManager.switchServingState() where replicate channels permanently lose autoRead=true after a memory pressure event.

When the serving state transitions from PUSH_AND_REPLICATE_PAUSED to PUSH_PAUSED, resumeReplicate() was only called inside the !tryResumeByPinnedMemory() guard. If tryResumeByPinnedMemory() returned true, the entire block was skipped and replicate channels were never resumed.

The fix moves resumeReplicate() outside the tryResumeByPinnedMemory() guard so it is always called when stepping down from PUSH_AND_REPLICATE_PAUSED to PUSH_PAUSED. This is a state machine invariant: PUSH_PAUSED means only push is paused; replicate must always be resumed.

Why are the changes needed?

Once replicate channels are stuck with autoRead=false, Netty I/O threads stop reading from all replicate connections. Remote workers writing to the affected worker see their TCP send buffers fill up (zero window), causing pending writes to accumulate in ChannelOutboundBuffer. Each pending write holds a reference to a direct memory ByteBuf, causing direct memory to grow indefinitely on the remote workers.

The failure sequence:

  1. Worker hits memory pressure → state = PUSH_AND_REPLICATE_PAUSED → all channels paused
  2. Pinned memory is low → tryResumeByPinnedMemory() returns trueresumeByPinnedMemory(PUSH_PAUSED) resumes push only, replicate not resumed
  3. Memory drops to push-only range → state = PUSH_PAUSED, but resumeReplicate() is never called
  4. Replicate channels permanently stuck with autoRead=false, causing unbounded direct memory growth on remote workers

Does this PR resolve a correctness bug?

Yes.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added a new unit test Test MemoryManager resume replicate by pinned memory in MemoryManagerSuite that reproduces the exact failure scenario:

  1. Enter PUSH_AND_REPLICATE_PAUSED with low pinned memory (channels resumed by pinned memory path)
  2. Raise pinned memory so both push and replicate get paused
  3. Drop memory to PUSH_PAUSED range with low pinned memory
  4. Assert replicate listener is resumed — this assertion fails without the fix

… from PUSH_AND_REPLICATE_PAUSED to PUSH_PAUSED
Copy link
Member

@SteNicholas SteNicholas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fix. LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants