-
Notifications
You must be signed in to change notification settings - Fork 187
Description
Issue Type
🐛 Bug Report
Description
Running FuzzyDeduplicationWorkflow.run() as documented fails during MinHashStage initialization with:
ValueError: Failed to look up actor with name 'curator_deduplication_id_generator'.
This could because 1. You are trying to look up a named actor you didn't create.
2. The named actor died. 3. You did not use a namespace matching the namespace of the actor.
Root cause: nemo_curator/stages/deduplication/id_generator.py:create_id_generator_actor() calls ray.shutdown() in its finally block (around line ~124), which destroys the detached ID generator actor before the workflow's pipeline executor can access it.
Environment
- Container:
nvcr.io/nvidia/nemo-curator:25.09 - Ray version: 2.9.3 (bundled in container)
- GPU: NVIDIA GPU (tested on single GPU setup)
- Python: 3.12 (bundled in container)
- Workflow: FuzzyDeduplicationWorkflow
Steps to Reproduce
- Launch the NeMo Curator container:
docker run --gpus all --shm-size=2g -it --rm \
-v $(pwd):/workspace \
nvcr.io/nvidia/nemo-curator:25.09- Follow the official Fuzzy Deduplication documentation:
from nemo_curator.stages.deduplication.fuzzy.workflow import FuzzyDeduplicationWorkflow
fuzzy_workflow = FuzzyDeduplicationWorkflow(
input_path="/path/to/input/data",
cache_path="/path/to/cache",
output_path="/path/to/output",
text_field="text",
perform_removal=False,
input_blocksize="1GiB",
seed=42,
char_ngrams=24,
num_bands=20,
minhashes_per_band=13,
use_64_bit_hash=False,
bands_per_iteration=5,
)
fuzzy_workflow.run()- Observe the crash during MinHashStage initialization
Expected Behavior
Workflow should complete successfully, producing:
- MinHash signatures in cache directory
- LSH bucket assignments
- Duplicate ID files in output directory
- ID generator mapping JSON
Actual Behavior
Workflow crashes with:
(MinHashStage pid=77675) ValueError:
Failed to get ID generator actor. Start an ID generator actor via `create_id_generator_actor` if calling this stage directly.
If using the FuzzyDedup API this should be started automatically.
ray::MinHashStage-0:MinHashStage.__init__() (pid=77675, ip=172.17.0.2, actor_id=..., repr=<nemo_curator.backends.experimental.ray_actor_pool.adapter.MinHashStage object at 0x...>)
File "/opt/Curator/nemo_curator/backends/experimental/ray_actor_pool/adapter.py", line 40, in __init__
self.stage.setup(worker_metadata)
File "/opt/Curator/nemo_curator/stages/deduplication/fuzzy/minhash.py", line 275, in setup
raise ValueError(err_msg) from e
ValueError:
Failed to get ID generator actor. Start an ID generator actor via `create_id_generator_actor` if calling this stage directly.
If using the FuzzyDedup API this should be started automatically.
Root Cause Analysis
Problematic Code Location
File: /opt/Curator/nemo_curator/stages/deduplication/id_generator.py
Function: create_id_generator_actor() (lines approximately 95-124)
def create_id_generator_actor(
filepath: str | None = None,
storage_options: dict[str, Any] | None = None,
) -> None:
"""
Create and register a detached ID generator actor.
This actor persists across multiple pipeline runs and provides
unique IDs for document deduplication.
"""
from nemo_curator.backends.utils import register_loguru_serializer
register_loguru_serializer()
ray.init(ignore_reinit_error=True)
try:
if filepath is None:
_ = IdGenerator.options(
name=CURATOR_ID_GENERATOR_ACTOR_NAME,
namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
lifetime="detached"
).remote()
else:
storage_options = storage_options or {}
with fsspec.open(filepath, mode="r", **storage_options) as f:
data = json.load(f)
_ = IdGenerator.options(
name=CURATOR_ID_GENERATOR_ACTOR_NAME,
namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
lifetime="detached"
).remote(start_id=data["next_id"], batch_registry=data["batch_registry"])
except Exception as e:
logger.error(f"Error creating id generator actor: {e}")
raise
finally:
ray.shutdown() # ⚠️ THIS IS THE BUG - kills the entire Ray clusterWhy This Breaks
The execution flow is:
FuzzyDeduplicationWorkflow.run()callscreate_id_generator_actor()- Helper function:
- Calls
ray.init() - Creates detached actor (intended to survive driver termination)
- Calls
ray.shutdown()infinallyblock → entire Ray cluster destroyed, including the "detached" actor
- Calls
- Workflow continues and starts a new Ray session for the pipeline executor
MinHashStage.setup()tries to retrieve the actor viaget_id_generator_actor()- Actor not found → crash
Technical Detail
Detached actors (lifetime="detached") are designed to survive driver process termination, but they do NOT survive cluster shutdown. When ray.shutdown() is called, the entire Ray cluster (including all actors, detached or not) is destroyed.
The ray.shutdown() call was likely intended to clean up resources between helper calls, but it inadvertently destroys the actor that the workflow depends on.
Proposed Fix
Option 1: Remove ray.shutdown() (Recommended)
Simply remove the ray.shutdown() call and let the workflow manage the Ray lifecycle:
def create_id_generator_actor(
filepath: str | None = None,
storage_options: dict[str, Any] | None = None,
) -> None:
from nemo_curator.backends.utils import register_loguru_serializer
register_loguru_serializer()
ray.init(ignore_reinit_error=True)
try:
if filepath is None:
_ = IdGenerator.options(
name=CURATOR_ID_GENERATOR_ACTOR_NAME,
namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
lifetime="detached"
).remote()
else:
storage_options = storage_options or {}
with fsspec.open(filepath, mode="r", **storage_options) as f:
data = json.load(f)
_ = IdGenerator.options(
name=CURATOR_ID_GENERATOR_ACTOR_NAME,
namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
lifetime="detached"
).remote(start_id=data["next_id"], batch_registry=data["batch_registry"])
except Exception as e:
logger.error(f"Error creating id generator actor: {e}")
raise
# No ray.shutdown() - let the workflow handle Ray lifecycle managementOption 2: Make kill_id_generator_actor More Robust
Additionally, improve error handling in the cleanup function:
def kill_id_generator_actor() -> None:
"""Kill the ID generator actor if it exists."""
try:
if ray.is_initialized():
actor = get_id_generator_actor()
ray.kill(actor, no_restart=True)
except (ValueError, ray.exceptions.RayActorError):
# Actor already cleaned up, Ray shut down, or actor never created
passThis prevents the workflow from crashing during cleanup if Ray has already shut down or the actor doesn't exist.
Workaround for Users
Until this is fixed upstream, users can apply this monkey-patch before running the workflow:
import ray
from nemo_curator.stages.deduplication.id_generator import (
IdGenerator,
CURATOR_ID_GENERATOR_ACTOR_NAME,
)
import nemo_curator.stages.deduplication.fuzzy.workflow as workflow_module
def patched_create_id_generator_actor(filepath=None, storage_options=None):
"""Patched version that doesn't shutdown Ray."""
from nemo_curator.backends.utils import register_loguru_serializer
import fsspec
import json
from loguru import logger
register_loguru_serializer()
ray.init(ignore_reinit_error=True)
try:
if filepath is None:
_ = IdGenerator.options(
name=CURATOR_ID_GENERATOR_ACTOR_NAME,
namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
lifetime="detached"
).remote()
else:
storage_options = storage_options or {}
with fsspec.open(filepath, mode="r", **storage_options) as f:
data = json.load(f)
_ = IdGenerator.options(
name=CURATOR_ID_GENERATOR_ACTOR_NAME,
namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
lifetime="detached"
).remote(start_id=data["next_id"], batch_registry=data["batch_registry"])
except Exception as e:
logger.error(f"Error creating id generator actor: {e}")
raise
def patched_kill_id_generator_actor():
"""Patched version that handles Ray shutdown gracefully."""
try:
if ray.is_initialized():
actor = ray.get_actor(
CURATOR_ID_GENERATOR_ACTOR_NAME,
namespace=CURATOR_ID_GENERATOR_ACTOR_NAME
)
ray.kill(actor, no_restart=True)
except (ValueError, ray.exceptions.RayActorError):
pass
# Apply patches
workflow_module.create_id_generator_actor = patched_create_id_generator_actor
workflow_module.kill_id_generator_actor = patched_kill_id_generator_actor
# Now run the workflow normally
fuzzy_workflow = FuzzyDeduplicationWorkflow(...)
fuzzy_workflow.run()Impact
- Severity: High
- User Impact: Breaks the documented fuzzy deduplication workflow
- Affected Versions: 25.09 (potentially earlier versions)
- Affected Components:
FuzzyDeduplicationWorkflow, LSH-based fuzzy deduplication - Workaround Available: Yes (monkey-patch required)
Additional Context
Why This Bug Exists
The design intent was likely:
- Create a helper function that sets up the actor and cleans up after itself
- Use
ray.shutdown()to avoid resource leaks
However, the implementation doesn't account for:
- Detached actors need the Ray cluster to remain alive
- The workflow starts its own Ray session after the helper returns
- Helper cleanup destroys resources the workflow depends on
Recommended Architecture
The workflow should manage Ray lifecycle at the top level:
- Initialize Ray once at workflow start
- Create detached actors that persist for the workflow duration
- Clean up actors and shut down Ray only after all stages complete
Helper functions should not manage Ray lifecycle independently.
Testing Notes
After applying the fix:
Expected successful output:
INFO - Minhash pipeline completed in ~12 seconds
INFO - LSH pipeline completed in ~68 seconds
INFO - No potential duplicates found in the dataset. Skipping connected components pipeline.
INFO - Fuzzy deduplication pipeline completed in ~80 seconds
Expected output directories:
cache_path/MinHashStage/- MinHash signature parquet filescache_path/LSHStage/- LSH bucket assignmentsoutput_path/FuzzyDuplicateIds/- Duplicate ID parquet filesoutput_path/fuzzy_id_generator.json- ID generator state mapping
References
- Documentation: NeMo Curator GPU Deduplication - Fuzzy Duplicate Removal
- Container Image:
nvcr.io/nvidia/nemo-curator:25.09 - Related Files:
/opt/Curator/nemo_curator/stages/deduplication/id_generator.py/opt/Curator/nemo_curator/stages/deduplication/fuzzy/workflow.py/opt/Curator/nemo_curator/stages/deduplication/fuzzy/minhash.py
Willingness to Contribute
I'm happy to:
- Provide additional testing or debugging information
- Submit a pull request with the proposed fix
- Test the fix on additional GPU configurations
- Help update documentation
Thank you for maintaining NeMo Curator! This bug prevents users from following the documented fuzzy deduplication workflow, so a fix would be greatly appreciated.