Skip to content

create_id_generator_actor calls ray.shutdown(), breaking FuzzyDeduplicationWorkflow #1216

@msiraga

Description

@msiraga

Issue Type

🐛 Bug Report


Description

Running FuzzyDeduplicationWorkflow.run() as documented fails during MinHashStage initialization with:

ValueError: Failed to look up actor with name 'curator_deduplication_id_generator'. 
This could because 1. You are trying to look up a named actor you didn't create. 
2. The named actor died. 3. You did not use a namespace matching the namespace of the actor.

Root cause: nemo_curator/stages/deduplication/id_generator.py:create_id_generator_actor() calls ray.shutdown() in its finally block (around line ~124), which destroys the detached ID generator actor before the workflow's pipeline executor can access it.


Environment

  • Container: nvcr.io/nvidia/nemo-curator:25.09
  • Ray version: 2.9.3 (bundled in container)
  • GPU: NVIDIA GPU (tested on single GPU setup)
  • Python: 3.12 (bundled in container)
  • Workflow: FuzzyDeduplicationWorkflow

Steps to Reproduce

  1. Launch the NeMo Curator container:
docker run --gpus all --shm-size=2g -it --rm \
  -v $(pwd):/workspace \
  nvcr.io/nvidia/nemo-curator:25.09
  1. Follow the official Fuzzy Deduplication documentation:
from nemo_curator.stages.deduplication.fuzzy.workflow import FuzzyDeduplicationWorkflow

fuzzy_workflow = FuzzyDeduplicationWorkflow(
    input_path="/path/to/input/data",
    cache_path="/path/to/cache",
    output_path="/path/to/output",
    text_field="text",
    perform_removal=False,
    input_blocksize="1GiB",
    seed=42,
    char_ngrams=24,
    num_bands=20,
    minhashes_per_band=13,
    use_64_bit_hash=False,
    bands_per_iteration=5,
)

fuzzy_workflow.run()
  1. Observe the crash during MinHashStage initialization

Expected Behavior

Workflow should complete successfully, producing:

  • MinHash signatures in cache directory
  • LSH bucket assignments
  • Duplicate ID files in output directory
  • ID generator mapping JSON

Actual Behavior

Workflow crashes with:

(MinHashStage pid=77675) ValueError: 
            Failed to get ID generator actor. Start an ID generator actor via `create_id_generator_actor` if calling this stage directly.
            If using the FuzzyDedup API this should be started automatically.

ray::MinHashStage-0:MinHashStage.__init__() (pid=77675, ip=172.17.0.2, actor_id=..., repr=<nemo_curator.backends.experimental.ray_actor_pool.adapter.MinHashStage object at 0x...>)
  File "/opt/Curator/nemo_curator/backends/experimental/ray_actor_pool/adapter.py", line 40, in __init__
    self.stage.setup(worker_metadata)
  File "/opt/Curator/nemo_curator/stages/deduplication/fuzzy/minhash.py", line 275, in setup
    raise ValueError(err_msg) from e
ValueError: 
            Failed to get ID generator actor. Start an ID generator actor via `create_id_generator_actor` if calling this stage directly.
            If using the FuzzyDedup API this should be started automatically.

Root Cause Analysis

Problematic Code Location

File: /opt/Curator/nemo_curator/stages/deduplication/id_generator.py
Function: create_id_generator_actor() (lines approximately 95-124)

def create_id_generator_actor(
    filepath: str | None = None,
    storage_options: dict[str, Any] | None = None,
) -> None:
    """
    Create and register a detached ID generator actor.
    
    This actor persists across multiple pipeline runs and provides
    unique IDs for document deduplication.
    """
    from nemo_curator.backends.utils import register_loguru_serializer

    register_loguru_serializer()
    ray.init(ignore_reinit_error=True)
    
    try:
        if filepath is None:
            _ = IdGenerator.options(
                name=CURATOR_ID_GENERATOR_ACTOR_NAME,
                namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
                lifetime="detached"
            ).remote()
        else:
            storage_options = storage_options or {}
            with fsspec.open(filepath, mode="r", **storage_options) as f:
                data = json.load(f)
            _ = IdGenerator.options(
                name=CURATOR_ID_GENERATOR_ACTOR_NAME,
                namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
                lifetime="detached"
            ).remote(start_id=data["next_id"], batch_registry=data["batch_registry"])
    except Exception as e:
        logger.error(f"Error creating id generator actor: {e}")
        raise
    finally:
        ray.shutdown()  # ⚠️ THIS IS THE BUG - kills the entire Ray cluster

Why This Breaks

The execution flow is:

  1. FuzzyDeduplicationWorkflow.run() calls create_id_generator_actor()
  2. Helper function:
    • Calls ray.init()
    • Creates detached actor (intended to survive driver termination)
    • Calls ray.shutdown() in finally block → entire Ray cluster destroyed, including the "detached" actor
  3. Workflow continues and starts a new Ray session for the pipeline executor
  4. MinHashStage.setup() tries to retrieve the actor via get_id_generator_actor()
  5. Actor not found → crash

Technical Detail

Detached actors (lifetime="detached") are designed to survive driver process termination, but they do NOT survive cluster shutdown. When ray.shutdown() is called, the entire Ray cluster (including all actors, detached or not) is destroyed.

The ray.shutdown() call was likely intended to clean up resources between helper calls, but it inadvertently destroys the actor that the workflow depends on.


Proposed Fix

Option 1: Remove ray.shutdown() (Recommended)

Simply remove the ray.shutdown() call and let the workflow manage the Ray lifecycle:

def create_id_generator_actor(
    filepath: str | None = None,
    storage_options: dict[str, Any] | None = None,
) -> None:
    from nemo_curator.backends.utils import register_loguru_serializer

    register_loguru_serializer()
    ray.init(ignore_reinit_error=True)
    
    try:
        if filepath is None:
            _ = IdGenerator.options(
                name=CURATOR_ID_GENERATOR_ACTOR_NAME,
                namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
                lifetime="detached"
            ).remote()
        else:
            storage_options = storage_options or {}
            with fsspec.open(filepath, mode="r", **storage_options) as f:
                data = json.load(f)
            _ = IdGenerator.options(
                name=CURATOR_ID_GENERATOR_ACTOR_NAME,
                namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
                lifetime="detached"
            ).remote(start_id=data["next_id"], batch_registry=data["batch_registry"])
    except Exception as e:
        logger.error(f"Error creating id generator actor: {e}")
        raise
    # No ray.shutdown() - let the workflow handle Ray lifecycle management

Option 2: Make kill_id_generator_actor More Robust

Additionally, improve error handling in the cleanup function:

def kill_id_generator_actor() -> None:
    """Kill the ID generator actor if it exists."""
    try:
        if ray.is_initialized():
            actor = get_id_generator_actor()
            ray.kill(actor, no_restart=True)
    except (ValueError, ray.exceptions.RayActorError):
        # Actor already cleaned up, Ray shut down, or actor never created
        pass

This prevents the workflow from crashing during cleanup if Ray has already shut down or the actor doesn't exist.


Workaround for Users

Until this is fixed upstream, users can apply this monkey-patch before running the workflow:

import ray
from nemo_curator.stages.deduplication.id_generator import (
    IdGenerator,
    CURATOR_ID_GENERATOR_ACTOR_NAME,
)
import nemo_curator.stages.deduplication.fuzzy.workflow as workflow_module


def patched_create_id_generator_actor(filepath=None, storage_options=None):
    """Patched version that doesn't shutdown Ray."""
    from nemo_curator.backends.utils import register_loguru_serializer
    import fsspec
    import json
    from loguru import logger
    
    register_loguru_serializer()
    ray.init(ignore_reinit_error=True)
    
    try:
        if filepath is None:
            _ = IdGenerator.options(
                name=CURATOR_ID_GENERATOR_ACTOR_NAME,
                namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
                lifetime="detached"
            ).remote()
        else:
            storage_options = storage_options or {}
            with fsspec.open(filepath, mode="r", **storage_options) as f:
                data = json.load(f)
            _ = IdGenerator.options(
                name=CURATOR_ID_GENERATOR_ACTOR_NAME,
                namespace=CURATOR_ID_GENERATOR_ACTOR_NAME,
                lifetime="detached"
            ).remote(start_id=data["next_id"], batch_registry=data["batch_registry"])
    except Exception as e:
        logger.error(f"Error creating id generator actor: {e}")
        raise


def patched_kill_id_generator_actor():
    """Patched version that handles Ray shutdown gracefully."""
    try:
        if ray.is_initialized():
            actor = ray.get_actor(
                CURATOR_ID_GENERATOR_ACTOR_NAME,
                namespace=CURATOR_ID_GENERATOR_ACTOR_NAME
            )
            ray.kill(actor, no_restart=True)
    except (ValueError, ray.exceptions.RayActorError):
        pass


# Apply patches
workflow_module.create_id_generator_actor = patched_create_id_generator_actor
workflow_module.kill_id_generator_actor = patched_kill_id_generator_actor

# Now run the workflow normally
fuzzy_workflow = FuzzyDeduplicationWorkflow(...)
fuzzy_workflow.run()

Impact

  • Severity: High
  • User Impact: Breaks the documented fuzzy deduplication workflow
  • Affected Versions: 25.09 (potentially earlier versions)
  • Affected Components: FuzzyDeduplicationWorkflow, LSH-based fuzzy deduplication
  • Workaround Available: Yes (monkey-patch required)

Additional Context

Why This Bug Exists

The design intent was likely:

  1. Create a helper function that sets up the actor and cleans up after itself
  2. Use ray.shutdown() to avoid resource leaks

However, the implementation doesn't account for:

  1. Detached actors need the Ray cluster to remain alive
  2. The workflow starts its own Ray session after the helper returns
  3. Helper cleanup destroys resources the workflow depends on

Recommended Architecture

The workflow should manage Ray lifecycle at the top level:

  • Initialize Ray once at workflow start
  • Create detached actors that persist for the workflow duration
  • Clean up actors and shut down Ray only after all stages complete

Helper functions should not manage Ray lifecycle independently.


Testing Notes

After applying the fix:

Expected successful output:

INFO - Minhash pipeline completed in ~12 seconds
INFO - LSH pipeline completed in ~68 seconds
INFO - No potential duplicates found in the dataset. Skipping connected components pipeline.
INFO - Fuzzy deduplication pipeline completed in ~80 seconds

Expected output directories:

  • cache_path/MinHashStage/ - MinHash signature parquet files
  • cache_path/LSHStage/ - LSH bucket assignments
  • output_path/FuzzyDuplicateIds/ - Duplicate ID parquet files
  • output_path/fuzzy_id_generator.json - ID generator state mapping

References

  • Documentation: NeMo Curator GPU Deduplication - Fuzzy Duplicate Removal
  • Container Image: nvcr.io/nvidia/nemo-curator:25.09
  • Related Files:
    • /opt/Curator/nemo_curator/stages/deduplication/id_generator.py
    • /opt/Curator/nemo_curator/stages/deduplication/fuzzy/workflow.py
    • /opt/Curator/nemo_curator/stages/deduplication/fuzzy/minhash.py

Willingness to Contribute

I'm happy to:

  • Provide additional testing or debugging information
  • Submit a pull request with the proposed fix
  • Test the fix on additional GPU configurations
  • Help update documentation

Thank you for maintaining NeMo Curator! This bug prevents users from following the documented fuzzy deduplication workflow, so a fix would be greatly appreciated.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions