Skip to content

Conversation

@ilongin
Copy link
Contributor

@ilongin ilongin commented Nov 9, 2025

Implements checkpoint cleanup with TTL-based garbage collection:

  • cleanup_checkpoints() method in Catalog to remove old checkpoints
  • _remove_checkpoint() helper to clean checkpoint and its UDF tables
  • list_checkpoints() enhancement to support time-based filtering
  • get_descendant_job_ids() to find child jobs for dependency checking
  • remove_checkpoint() in metastore to delete checkpoint records
  • CLI integration in garbage_collect command
  • Comprehensive tests for cleanup scenarios

This builds on top of the core checkpoint functionality to provide automatic cleanup of outdated checkpoints and their associated tables.

Related to #1453

Summary by Sourcery

Add TTL-based garbage collection for UDF checkpoints to prevent storage bloat while preserving dependencies.

New Features:

  • Implement Catalog.cleanup_checkpoints to automatically remove outdated checkpoints and their UDF tables based on a configurable TTL with branch pruning logic.
  • Add Catalog._remove_checkpoint helper to delete individual checkpoints and associated job-specific UDF tables.
  • Extend metastore with get_descendant_job_ids, remove_checkpoint, and time-based list_checkpoints filters (created_before and created_after).
  • Integrate checkpoint cleanup into the CLI garbage_collect command.

Tests:

  • Add functional tests for default TTL cleanup, custom TTL, no-op when no old checkpoints exist, preservation of checkpoints with active descendants, partial job cleanup within a single job, full branch pruning of outdated lineages, and descendant job ID retrieval.

Implements checkpoint cleanup with TTL-based garbage collection:
- cleanup_checkpoints() method in Catalog to remove old checkpoints
- _remove_checkpoint() helper to clean checkpoint and its UDF tables
- list_checkpoints() enhancement to support time-based filtering
- get_descendant_job_ids() to find child jobs for dependency checking
- remove_checkpoint() in metastore to delete checkpoint records
- CLI integration in garbage_collect command
- Comprehensive tests for cleanup scenarios

This builds on top of the core checkpoint functionality to provide
automatic cleanup of outdated checkpoints and their associated tables.

Related to #1453
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Nov 9, 2025

Reviewer's Guide

This PR introduces TTL-based garbage collection for outdated UDF checkpoints by extending the Catalog and Metastore with time-based filtering, descendant job detection, and removal routines; integrates the cleanup into the CLI; and adds comprehensive tests covering various cleanup scenarios and metastore methods.

Sequence diagram for TTL-based checkpoint cleanup via CLI

sequenceDiagram
    actor User
    participant CLI
    participant Catalog
    participant Metastore
    participant WarehouseDB

    User->>CLI: Run garbage_collect command
    CLI->>Catalog: cleanup_checkpoints()
    Catalog->>Metastore: list_checkpoints(created_before=ttl_threshold)
    loop For each outdated checkpoint
        Catalog->>Metastore: get_descendant_job_ids(ch.job_id)
        Catalog->>Metastore: list_checkpoints(desc_id, created_after=ttl_threshold)
        alt No active descendants
            Catalog->>Metastore: remove_checkpoint(checkpoint)
            Catalog->>WarehouseDB: list_tables(prefix=table_prefix)
            Catalog->>WarehouseDB: cleanup_tables(matching_tables)
        end
    end
Loading

Class diagram for updated Metastore and Catalog checkpoint cleanup

classDiagram
    class AbstractMetastore {
        +get_descendant_job_ids(job_id, conn)
        +list_checkpoints(job_id=None, created_after=None, created_before=None, conn=None)
        +remove_checkpoint(checkpoint, conn=None)
    }
    class AbstractDBMetastore {
        +get_descendant_job_ids(job_id, conn)
        +list_checkpoints(job_id=None, created_after=None, created_before=None, conn=None)
        +remove_checkpoint(checkpoint, conn=None)
    }
    AbstractDBMetastore --|> AbstractMetastore
    class Catalog {
        +_remove_checkpoint(checkpoint)
        +cleanup_checkpoints(ttl_seconds=None)
    }
    Catalog --> AbstractMetastore : metastore
    Catalog --> WarehouseDB : warehouse
    class WarehouseDB {
        +list_tables(prefix)
        +cleanup_tables(tables)
    }
Loading

File-Level Changes

Change Details Files
Add checkpoint cleanup routines in Catalog
  • Implemented _remove_checkpoint to delete checkpoint records and associated UDF tables
  • Added cleanup_checkpoints with TTL threshold, branch pruning, and descendant-check caching
src/datachain/catalog/catalog.py
Enhance Metastore with time-based filtering and descendant detection
  • Added abstract get_descendant_job_ids and implemented recursive CTE in DB metastore
  • Extended list_checkpoints to accept created_before/created_after filters
  • Added abstract and concrete remove_checkpoint to delete checkpoint entries
src/datachain/data_storage/metastore.py
Integrate checkpoint cleanup into CLI
  • Updated garbage_collect command to invoke catalog.cleanup_checkpoints
  • Added user feedback when cleaning outdated checkpoints
src/datachain/cli/commands/misc.py
Add tests for cleanup and descendant job IDs
  • Created functional tests for cleanup_checkpoints under various scenarios (default/custom TTL, no old checkpoints, active descendants, partial cleanup, branch pruning)
  • Added parameterized tests for get_descendant_job_ids with different hierarchy depths
tests/func/test_checkpoints.py
tests/func/test_metastore.py

Possibly linked issues

  • UDF Checkpoints cleanup #1453: The PR implements the automatic cleanup of outdated UDF checkpoints and their associated UDF tables based on a time-to-live (TTL) setting, directly addressing the issue's requirement.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@ilongin ilongin linked an issue Nov 9, 2025 that may be closed by this pull request
@cloudflare-workers-and-pages
Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 91f7da5
Status: ✅  Deploy successful!
Preview URL: https://f4332ecc.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-1453-checkpoint-udf.datachain-documentation.pages.dev

View logs

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `src/datachain/catalog/catalog.py:2061` </location>
<code_context>
+        # Remove job-specific tables for this checkpoint
+        # Table patterns: udf_{job_id}_{hash}_{suffix}
+        # where suffix can be: input, output, output_partial, processed
+        job_id_sanitized = checkpoint.job_id.replace("-", "")
+        table_prefix = f"udf_{job_id_sanitized}_{checkpoint.hash}_"
+        matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)
</code_context>

<issue_to_address>
**suggestion:** Sanitizing job_id by removing hyphens may not be sufficient for table name safety.

Consider validating job_id to exclude all invalid characters and reserved words to ensure table names remain safe and compliant.

Suggested implementation:

```python
        # Remove job-specific tables for this checkpoint
        # Table patterns: udf_{job_id}_{hash}_{suffix}
        # where suffix can be: input, output, output_partial, processed
        job_id_sanitized = self._sanitize_job_id(checkpoint.job_id)
        table_prefix = f"udf_{job_id_sanitized}_{checkpoint.hash}_"
        matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)

```

```python
    def _sanitize_job_id(self, job_id: str) -> str:
        """
        Sanitize job_id for safe table naming:
        - Remove invalid characters (only allow alphanumeric and underscores)
        - Ensure it does not match reserved words (simple check)
        """
        import re

        # Only allow alphanumeric and underscores
        sanitized = re.sub(r'[^a-zA-Z0-9_]', '', job_id)

        # List of reserved words (example, expand as needed)
        reserved_words = {"select", "table", "from", "where", "insert", "update", "delete"}
        if sanitized.lower() in reserved_words:
            sanitized = f"job_{sanitized}"

        return sanitized

    def cleanup_checkpoints(self, ttl_seconds: int | None = None) -> None:
        """
        Clean up outdated checkpoints and their associated UDF tables.

```
</issue_to_address>

### Comment 2
<location> `src/datachain/cli/commands/misc.py:21-22` </location>
<code_context>
         print(f"Garbage collecting {len(temp_tables)} temporary tables.")
         catalog.cleanup_tables(temp_tables)

+    print("Cleaning up outdated checkpoints.")
+    catalog.cleanup_checkpoints()
+
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Consider handling and reporting errors from cleanup_checkpoints.

Wrap catalog.cleanup_checkpoints() in a try/except block to prevent abrupt termination and provide user-friendly error reporting.

```suggestion
    print("Cleaning up outdated checkpoints.")
    try:
        catalog.cleanup_checkpoints()
    except Exception as e:
        print(f"Error cleaning up checkpoints: {e}")
```
</issue_to_address>

### Comment 3
<location> `tests/func/test_checkpoints.py:111-120` </location>
<code_context>
+def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset):
</code_context>

<issue_to_address>
**suggestion (testing):** Test for boundary TTL values is missing.

Please add a test case where the checkpoint age matches the TTL to verify correct handling at the boundary and avoid off-by-one errors.

Suggested implementation:

```python
def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset):
    """Test that cleanup_checkpoints respects custom TTL parameter."""
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore

    # Create some checkpoints
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")


def test_cleanup_checkpoints_boundary_ttl(test_session, monkeypatch, nums_dataset):
    """Test cleanup_checkpoints at the boundary where checkpoint age == TTL."""
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore

    # Create a checkpoint
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")

    job_id = chain.job_id
    checkpoints = list(metastore.list_checkpoints(job_id))
    assert len(checkpoints) > 0

    # Set the checkpoint timestamp to exactly TTL seconds ago
    ttl_seconds = 3600
    boundary_time = datetime.now(timezone.utc) - timedelta(seconds=ttl_seconds)
    for cp in checkpoints:
        metastore.update_checkpoint_timestamp(cp.id, boundary_time)

    # Run cleanup
    catalog.cleanup_checkpoints(ttl_seconds=ttl_seconds)

    # Check if checkpoint is retained (not deleted) at boundary
    checkpoints_after = list(metastore.list_checkpoints(job_id))
    # If the semantics are "older than TTL" is deleted, "equal to TTL" is retained
    assert len(checkpoints_after) > 0, "Checkpoint at boundary TTL should not be deleted"

```

- Ensure that `metastore.update_checkpoint_timestamp` exists and works as expected. If not, you may need to implement a helper to set the checkpoint timestamp for testing.
- Adjust the assertion if your cleanup logic is supposed to delete checkpoints at exactly TTL (change the assertion to `assert len(checkpoints_after) == 0`).
- If your checkpoint model or metastore API differs, adapt the checkpoint timestamp update logic accordingly.
</issue_to_address>

### Comment 4
<location> `tests/func/test_checkpoints.py:171-180` </location>
<code_context>
+def test_cleanup_checkpoints_preserves_with_active_descendants(
</code_context>

<issue_to_address>
**suggestion (testing):** Missing test for multiple descendant branches.

Add a test case with a parent job and multiple child jobs, mixing active and inactive checkpoints, to verify correct checkpoint preservation in branching scenarios.
</issue_to_address>

### Comment 5
<location> `tests/func/test_checkpoints.py:272-281` </location>
<code_context>
+def test_cleanup_checkpoints_branch_pruning(test_session, nums_dataset):
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding a test for partial pruning in a mixed lineage.

Please add a test case where only some jobs in a lineage are outdated, ensuring that cleanup_checkpoints correctly prunes only fully outdated branches and preserves those with active checkpoints.
</issue_to_address>

### Comment 6
<location> `tests/func/test_checkpoints.py:91-96` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 7
<location> `tests/func/test_checkpoints.py:131-136` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 8
<location> `tests/func/test_checkpoints.py:202-207` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 9
<location> `tests/func/test_checkpoints.py:246-251` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 10
<location> `tests/func/test_checkpoints.py:306-313` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 11
<location> `tests/func/test_checkpoints.py:308-313` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 12
<location> `tests/func/test_checkpoints.py:319-321` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 13
<location> `tests/func/test_metastore.py:963-973` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 14
<location> `src/datachain/catalog/catalog.py:2063-2065` </location>
<code_context>
    def _remove_checkpoint(self, checkpoint: Checkpoint) -> None:
        """
        Remove a checkpoint and its associated job-specific UDF tables.

        Since tables are now job-scoped, this removes only the tables
        belonging to this specific checkpoint's job.

        Args:
            checkpoint: The checkpoint object to remove.
        """
        # Remove the checkpoint from metastore first
        self.metastore.remove_checkpoint(checkpoint)

        # Remove job-specific tables for this checkpoint
        # Table patterns: udf_{job_id}_{hash}_{suffix}
        # where suffix can be: input, output, output_partial, processed
        job_id_sanitized = checkpoint.job_id.replace("-", "")
        table_prefix = f"udf_{job_id_sanitized}_{checkpoint.hash}_"
        matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)

        if matching_tables:
            self.warehouse.cleanup_tables(matching_tables)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))

```suggestion
        if matching_tables := self.warehouse.db.list_tables(prefix=table_prefix):
```
</issue_to_address>

### Comment 15
<location> `tests/func/test_checkpoints.py:103` </location>
<code_context>
def test_cleanup_checkpoints_with_ttl(test_session, monkeypatch, nums_dataset):
    """Test that cleanup_checkpoints removes old checkpoints and their UDF tables."""
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore
    warehouse = catalog.warehouse

    # Create some checkpoints by running a chain with map (which creates UDF tables)
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
    chain.map(tripled=lambda num: num * 3, output=int).save("nums_tripled")
    job_id = test_session.get_or_create_job().id

    checkpoints_before = list(metastore.list_checkpoints(job_id))
    assert len(checkpoints_before) == 4
    assert all(c.partial is False for c in checkpoints_before)

    # Verify UDF tables exist by checking all tables with udf_ prefix
    # Note: Due to checkpoint skipping, some jobs may reuse parent tables
    all_udf_tables_before = warehouse.db.list_tables(prefix="udf_")

    # At least some UDF tables should exist from the operations
    assert len(all_udf_tables_before) > 0

    # Modify checkpoint created_at to be older than TTL (4 hours by default)
    ch = metastore._checkpoints
    old_time = datetime.now(timezone.utc) - timedelta(hours=5)
    for checkpoint in checkpoints_before:
        metastore.db.execute(
            metastore._checkpoints.update()
            .where(ch.c.id == checkpoint.id)
            .values(created_at=old_time)
        )

    # Run cleanup_checkpoints with default TTL (4 hours)
    catalog.cleanup_checkpoints()

    # Verify checkpoints were removed
    checkpoints_after = list(metastore.list_checkpoints(job_id))
    assert len(checkpoints_after) == 0

    # Verify job-specific UDF tables were removed
    job_id_sanitized = job_id.replace("-", "")
    udf_tables_after = warehouse.db.list_tables(prefix=f"udf_{job_id_sanitized}_")
    assert len(udf_tables_after) == 0

</code_context>

<issue_to_address>
**suggestion (code-quality):** Simplify sequence length comparison ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))

```suggestion
    assert not checkpoints_after
```
</issue_to_address>

### Comment 16
<location> `tests/func/test_checkpoints.py:143` </location>
<code_context>
def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset):
    """Test that cleanup_checkpoints respects custom TTL parameter."""
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore

    # Create some checkpoints
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
    job_id = test_session.get_or_create_job().id

    checkpoints = list(metastore.list_checkpoints(job_id))
    assert len(checkpoints) == 2
    assert all(c.partial is False for c in checkpoints)

    # Modify all checkpoints to be 2 hours old
    ch = metastore._checkpoints
    old_time = datetime.now(timezone.utc) - timedelta(hours=2)
    for checkpoint in checkpoints:
        metastore.db.execute(
            metastore._checkpoints.update()
            .where(ch.c.id == checkpoint.id)
            .values(created_at=old_time)
        )

    # Run cleanup with custom TTL of 1 hour (3600 seconds)
    # Checkpoints are 2 hours old, so they should be removed
    catalog.cleanup_checkpoints(ttl_seconds=3600)

    # Verify checkpoints were removed
    assert len(list(metastore.list_checkpoints(job_id))) == 0

</code_context>

<issue_to_address>
**suggestion (code-quality):** Simplify sequence length comparison ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))

```suggestion
    assert not list(metastore.list_checkpoints(job_id))
```
</issue_to_address>

### Comment 17
<location> `tests/func/test_checkpoints.py:211` </location>
<code_context>
def test_cleanup_checkpoints_preserves_with_active_descendants(
    test_session, nums_dataset
):
    """
    Test that outdated parent checkpoints are preserved when descendants have
    active checkpoints.
    """
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore

    # Create parent job with checkpoints
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
    parent_job_id = test_session.get_or_create_job().id

    # Create child job (will have parent_job_id set) with more recent checkpoints
    reset_session_job_state()
    chain.map(tripled=lambda num: num * 3, output=int).save("nums_tripled")
    child_job_id = test_session.get_or_create_job().id

    # Verify parent job is set correctly
    child_job = metastore.get_job(child_job_id)
    assert child_job.parent_job_id == parent_job_id

    # Make parent checkpoints old (outdated)
    parent_checkpoints = list(metastore.list_checkpoints(parent_job_id))
    ch = metastore._checkpoints
    old_time = datetime.now(timezone.utc) - timedelta(hours=5)
    for checkpoint in parent_checkpoints:
        metastore.db.execute(
            metastore._checkpoints.update()
            .where(ch.c.id == checkpoint.id)
            .values(created_at=old_time)
        )

    # Child checkpoints remain recent (within TTL)
    child_checkpoints = list(metastore.list_checkpoints(child_job_id))
    assert len(child_checkpoints) > 0

    # Run cleanup with default TTL (4 hours)
    catalog.cleanup_checkpoints()

    # Verify parent checkpoints were NOT removed (child still needs them)
    parent_after = list(metastore.list_checkpoints(parent_job_id))
    assert len(parent_after) == len(parent_checkpoints)

    # Child checkpoints should still be there
    child_after = list(metastore.list_checkpoints(child_job_id))
    assert len(child_after) == len(child_checkpoints)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Simplify sequence length comparison ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))

```suggestion
    assert child_checkpoints
```
</issue_to_address>

### Comment 18
<location> `tests/func/test_checkpoints.py:321` </location>
<code_context>
def test_cleanup_checkpoints_branch_pruning(test_session, nums_dataset):
    """
    Test that entire outdated job lineages are cleaned in one pass (branch pruning).
    """
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore

    # Create a lineage: root -> child -> grandchild
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
    root_job_id = test_session.get_or_create_job().id

    reset_session_job_state()
    chain.map(tripled=lambda num: num * 3, output=int).save("nums_tripled")
    child_job_id = test_session.get_or_create_job().id

    reset_session_job_state()
    chain.map(quadrupled=lambda num: num * 4, output=int).save("nums_quadrupled")
    grandchild_job_id = test_session.get_or_create_job().id

    # Verify lineage
    child_job = metastore.get_job(child_job_id)
    grandchild_job = metastore.get_job(grandchild_job_id)
    assert child_job.parent_job_id == root_job_id
    assert grandchild_job.parent_job_id == child_job_id

    # Make ALL checkpoints outdated (older than TTL)
    all_job_ids = [root_job_id, child_job_id, grandchild_job_id]
    ch = metastore._checkpoints
    old_time = datetime.now(timezone.utc) - timedelta(hours=5)

    for job_id in all_job_ids:
        checkpoints = list(metastore.list_checkpoints(job_id))
        for checkpoint in checkpoints:
            metastore.db.execute(
                metastore._checkpoints.update()
                .where(ch.c.id == checkpoint.id)
                .values(created_at=old_time)
            )

    # Run cleanup once
    catalog.cleanup_checkpoints()

    # Verify ALL jobs were cleaned in single pass (branch pruning)
    for job_id in all_job_ids:
        remaining = list(metastore.list_checkpoints(job_id))
        assert len(remaining) == 0, f"Job {job_id} should have been cleaned"

</code_context>

<issue_to_address>
**suggestion (code-quality):** Simplify sequence length comparison ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))

```suggestion
        assert not remaining, f"Job {job_id} should have been cleaned"
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

# Remove job-specific tables for this checkpoint
# Table patterns: udf_{job_id}_{hash}_{suffix}
# where suffix can be: input, output, output_partial, processed
job_id_sanitized = checkpoint.job_id.replace("-", "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Sanitizing job_id by removing hyphens may not be sufficient for table name safety.

Consider validating job_id to exclude all invalid characters and reserved words to ensure table names remain safe and compliant.

Suggested implementation:

        # Remove job-specific tables for this checkpoint
        # Table patterns: udf_{job_id}_{hash}_{suffix}
        # where suffix can be: input, output, output_partial, processed
        job_id_sanitized = self._sanitize_job_id(checkpoint.job_id)
        table_prefix = f"udf_{job_id_sanitized}_{checkpoint.hash}_"
        matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)
    def _sanitize_job_id(self, job_id: str) -> str:
        """
        Sanitize job_id for safe table naming:
        - Remove invalid characters (only allow alphanumeric and underscores)
        - Ensure it does not match reserved words (simple check)
        """
        import re

        # Only allow alphanumeric and underscores
        sanitized = re.sub(r'[^a-zA-Z0-9_]', '', job_id)

        # List of reserved words (example, expand as needed)
        reserved_words = {"select", "table", "from", "where", "insert", "update", "delete"}
        if sanitized.lower() in reserved_words:
            sanitized = f"job_{sanitized}"

        return sanitized

    def cleanup_checkpoints(self, ttl_seconds: int | None = None) -> None:
        """
        Clean up outdated checkpoints and their associated UDF tables.

Comment on lines +21 to +22
print("Cleaning up outdated checkpoints.")
catalog.cleanup_checkpoints()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Consider handling and reporting errors from cleanup_checkpoints.

Wrap catalog.cleanup_checkpoints() in a try/except block to prevent abrupt termination and provide user-friendly error reporting.

Suggested change
print("Cleaning up outdated checkpoints.")
catalog.cleanup_checkpoints()
print("Cleaning up outdated checkpoints.")
try:
catalog.cleanup_checkpoints()
except Exception as e:
print(f"Error cleaning up checkpoints: {e}")

Comment on lines +111 to +120
def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset):
"""Test that cleanup_checkpoints respects custom TTL parameter."""
from datetime import datetime, timedelta, timezone

catalog = test_session.catalog
metastore = catalog.metastore

# Create some checkpoints
reset_session_job_state()
chain = dc.read_dataset("nums", session=test_session)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Test for boundary TTL values is missing.

Please add a test case where the checkpoint age matches the TTL to verify correct handling at the boundary and avoid off-by-one errors.

Suggested implementation:

def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset):
    """Test that cleanup_checkpoints respects custom TTL parameter."""
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore

    # Create some checkpoints
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")


def test_cleanup_checkpoints_boundary_ttl(test_session, monkeypatch, nums_dataset):
    """Test cleanup_checkpoints at the boundary where checkpoint age == TTL."""
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore

    # Create a checkpoint
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")

    job_id = chain.job_id
    checkpoints = list(metastore.list_checkpoints(job_id))
    assert len(checkpoints) > 0

    # Set the checkpoint timestamp to exactly TTL seconds ago
    ttl_seconds = 3600
    boundary_time = datetime.now(timezone.utc) - timedelta(seconds=ttl_seconds)
    for cp in checkpoints:
        metastore.update_checkpoint_timestamp(cp.id, boundary_time)

    # Run cleanup
    catalog.cleanup_checkpoints(ttl_seconds=ttl_seconds)

    # Check if checkpoint is retained (not deleted) at boundary
    checkpoints_after = list(metastore.list_checkpoints(job_id))
    # If the semantics are "older than TTL" is deleted, "equal to TTL" is retained
    assert len(checkpoints_after) > 0, "Checkpoint at boundary TTL should not be deleted"
  • Ensure that metastore.update_checkpoint_timestamp exists and works as expected. If not, you may need to implement a helper to set the checkpoint timestamp for testing.
  • Adjust the assertion if your cleanup logic is supposed to delete checkpoints at exactly TTL (change the assertion to assert len(checkpoints_after) == 0).
  • If your checkpoint model or metastore API differs, adapt the checkpoint timestamp update logic accordingly.

Comment on lines +171 to +180
def test_cleanup_checkpoints_preserves_with_active_descendants(
test_session, nums_dataset
):
"""
Test that outdated parent checkpoints are preserved when descendants have
active checkpoints.
"""
from datetime import datetime, timedelta, timezone

catalog = test_session.catalog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Missing test for multiple descendant branches.

Add a test case with a parent job and multiple child jobs, mixing active and inactive checkpoints, to verify correct checkpoint preservation in branching scenarios.

Comment on lines +272 to +281
def test_cleanup_checkpoints_branch_pruning(test_session, nums_dataset):
"""
Test that entire outdated job lineages are cleaned in one pass (branch pruning).
"""
from datetime import datetime, timedelta, timezone

catalog = test_session.catalog
metastore = catalog.metastore

# Create a lineage: root -> child -> grandchild
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding a test for partial pruning in a mixed lineage.

Please add a test case where only some jobs in a lineage are outdated, ensuring that cleanup_checkpoints correctly prunes only fully outdated branches and preserves those with active checkpoints.

Comment on lines +2063 to +2065
matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)

if matching_tables:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Use named expression to simplify assignment and conditional (use-named-expression)

Suggested change
matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)
if matching_tables:
if matching_tables := self.warehouse.db.list_tables(prefix=table_prefix):


# Verify checkpoints were removed
checkpoints_after = list(metastore.list_checkpoints(job_id))
assert len(checkpoints_after) == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Simplify sequence length comparison (simplify-len-comparison)

Suggested change
assert len(checkpoints_after) == 0
assert not checkpoints_after

catalog.cleanup_checkpoints(ttl_seconds=3600)

# Verify checkpoints were removed
assert len(list(metastore.list_checkpoints(job_id))) == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Simplify sequence length comparison (simplify-len-comparison)

Suggested change
assert len(list(metastore.list_checkpoints(job_id))) == 0
assert not list(metastore.list_checkpoints(job_id))


# Child checkpoints remain recent (within TTL)
child_checkpoints = list(metastore.list_checkpoints(child_job_id))
assert len(child_checkpoints) > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Simplify sequence length comparison (simplify-len-comparison)

Suggested change
assert len(child_checkpoints) > 0
assert child_checkpoints

# Verify ALL jobs were cleaned in single pass (branch pruning)
for job_id in all_job_ids:
remaining = list(metastore.list_checkpoints(job_id))
assert len(remaining) == 0, f"Job {job_id} should have been cleaned"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Simplify sequence length comparison (simplify-len-comparison)

Suggested change
assert len(remaining) == 0, f"Job {job_id} should have been cleaned"
assert not remaining, f"Job {job_id} should have been cleaned"

@codecov
Copy link

codecov bot commented Nov 9, 2025

Codecov Report

❌ Patch coverage is 95.34884% with 2 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (ilongin/1392-udf-checkpoints@5f6f183). Learn more about missing BASE report.

Files with missing lines Patch % Lines
src/datachain/catalog/catalog.py 89.47% 1 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@                       Coverage Diff                       @@
##             ilongin/1392-udf-checkpoints    #1454   +/-   ##
===============================================================
  Coverage                                ?   87.93%           
===============================================================
  Files                                   ?      160           
  Lines                                   ?    15638           
  Branches                                ?     2272           
===============================================================
  Hits                                    ?    13752           
  Misses                                  ?     1359           
  Partials                                ?      527           
Flag Coverage Δ
datachain 87.89% <95.34%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/datachain/cli/commands/misc.py 64.70% <100.00%> (ø)
src/datachain/data_storage/metastore.py 93.57% <100.00%> (ø)
src/datachain/catalog/catalog.py 83.75% <89.47%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ilongin ilongin marked this pull request as draft November 9, 2025 22:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

UDF Checkpoints cleanup

2 participants