Skip to content

Conversation

@raphael
Copy link
Member

@raphael raphael commented Feb 17, 2025

This PR solves a potential deadlock with maps and improves the stability and reliability of the worker pool by making several key improvements.

Worker & Job Cleanup:

  • Introduces explicit worker cleanup coordination using a cleanup map to prevent races
  • Adds dedicated cleanup for stale pending jobs
  • Makes worker cleanup distributed and idempotent across nodes
  • Improves stream cleanup for inactive workers

Concurrency & Race Conditions:

  • Fixes race conditions in SetAndWait by using per-waiter channels
  • Improves goroutine lifecycle management by using logger-based contexts
  • Makes node and worker stream handling more robust

Architecture:

  • Renames maps for better clarity (e.g. jobsMap -> jobMap)
  • Consolidates worker cleanup logic
  • Adds initialization events for worker/node streams
  • Makes job requeuing more reliable with retries

Ensure two nodes can't process stale workers concurrently
Instead of each worker having their own cache,
have the parent node hold the cache to avoid
duplicate caching.
The original code used future timestamps in workerKeepAliveMap to prevent
concurrent cleanup operations. This made stale workers appear active and
could permanently prevent cleanup if a node crashed during the process.

Fixed by:
- Added dedicated cleanupMap to track workers being cleaned up
- Implemented proper concurrency handling using SetIfNotExists/TestAndSet
- Added retry logic with exponential backoff for requeuing jobs
- Ensured cleanup map is properly closed during node shutdown
- Updated worker.go to handle new processRequeuedJobs retry parameter

The fix ensures stale workers and their jobs are reliably cleaned up even
in case of node failures or concurrent cleanup attempts.
- Use background context for worker goroutines to prevent premature termination
- Preserve logging context while making worker lifecycle independent of caller
- Rename maps for better clarity (e.g. jobsMap -> jobMap)
- Improve node stream management with nodeStreams map
- Clean up error handling and logging patterns

This fixes an issue where workers could be leaked when the caller's context
was cancelled before proper cleanup could occur.
Improves the Node component's ability to detect and reassign jobs from stale
or deleted workers by:
1. Adding explicit orphaned job detection for workers missing keep-alive entries
2. Centralizing worker cleanup logic to ensure consistent job reassignment
3. Simplifying worker state validation to catch edge cases in distributed scenarios

This ensures that no jobs are lost when workers become unavailable, maintaining
eventual consistency of job assignments across the worker pool.
Enhances worker cleanup mechanism by handling stale cleanup locks and
adding cleanup verification. Key changes:

* Add detection and cleanup of stale worker cleanup locks
* Clean up jobs from jobMap after successful requeue
* Improve logging around worker cleanup and job requeuing
* Upgrade requeue log level to Info for better operational visibility

This improves reliability of the distributed job system by preventing
orphaned jobs and stale locks from accumulating over time.
Add cleanup of stale consumers during sink initialization to prevent accumulation
of stale consumers in Redis. Previously stale consumers were only cleaned up
periodically, which could lead to a buildup if sinks did not shut down cleanly.

Also refactor the stale consumer cleanup logic to:
1. Extract common cleanup code into deleteStreamStaleConsumers
2. Improve error handling and logging
3. Properly clean up all related data structures (Redis consumer group,
   keep-alive map, and consumers map)
Improve the worker cleanup implementation by:
1. Split cleanupWorker into smaller, focused functions:
   - acquireCleanupLock: handles cleanup lock management
   - requeueWorkerJobs: handles job requeuing
   - cleanupWorker: orchestrates the cleanup process

2. Simplify cleanupInactiveWorkers:
   - Use activeWorkers() to get list of active workers
   - Combine jobMap and workerMap checks into a single loop
   - Skip workers being actively cleaned up

3. Rename isActive to isWithinTTL to better reflect its purpose
   - Function checks if a timestamp is within TTL duration
   - Used consistently across node and worker cleanup
This commit adds a new background process to clean up stale entries in the
pending jobs map. Previously, stale entries were only cleaned up when
attempting to dispatch a job with the same key. Now, a dedicated goroutine
runs at the ackGracePeriod frequency to proactively remove expired entries.

Additional changes:
- Fix jobPendingMap comment to clarify it's indexed by job key not worker ID
- Add debug logs for worker shutdown in handleEvents and keepAlive
- Refactor timestamp validation to use isWithinTTL helper across the codebase
- Improve error handling in cleanupStalePendingJobs using TestAndDelete

The periodic cleanup helps prevent memory leaks from abandoned dispatch
attempts and makes the job dispatch system more reliable.
Remove the requeueJob helper function and directly use dispatchJob for
requeueing jobs during worker cleanup and rebalancing.
Non-read notifications on the ichan channel were blocking writes and causing deadlocks during Set operations. This commit removes ichan and replaces it with a waiter-based mechanism using dedicated per-call channels, ensuring notifications are delivered without blocking.
@raphael raphael merged commit 2598358 into main Feb 19, 2025
4 checks passed
@raphael raphael deleted the fix/worker_streams_leak branch February 19, 2025 04:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants