-
Notifications
You must be signed in to change notification settings - Fork 133
UDF Checkpoints cleanup #1454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: ilongin/1392-udf-checkpoints
Are you sure you want to change the base?
UDF Checkpoints cleanup #1454
Conversation
Implements checkpoint cleanup with TTL-based garbage collection: - cleanup_checkpoints() method in Catalog to remove old checkpoints - _remove_checkpoint() helper to clean checkpoint and its UDF tables - list_checkpoints() enhancement to support time-based filtering - get_descendant_job_ids() to find child jobs for dependency checking - remove_checkpoint() in metastore to delete checkpoint records - CLI integration in garbage_collect command - Comprehensive tests for cleanup scenarios This builds on top of the core checkpoint functionality to provide automatic cleanup of outdated checkpoints and their associated tables. Related to #1453
Reviewer's GuideThis PR introduces TTL-based garbage collection for outdated UDF checkpoints by extending the Catalog and Metastore with time-based filtering, descendant job detection, and removal routines; integrates the cleanup into the CLI; and adds comprehensive tests covering various cleanup scenarios and metastore methods. Sequence diagram for TTL-based checkpoint cleanup via CLIsequenceDiagram
actor User
participant CLI
participant Catalog
participant Metastore
participant WarehouseDB
User->>CLI: Run garbage_collect command
CLI->>Catalog: cleanup_checkpoints()
Catalog->>Metastore: list_checkpoints(created_before=ttl_threshold)
loop For each outdated checkpoint
Catalog->>Metastore: get_descendant_job_ids(ch.job_id)
Catalog->>Metastore: list_checkpoints(desc_id, created_after=ttl_threshold)
alt No active descendants
Catalog->>Metastore: remove_checkpoint(checkpoint)
Catalog->>WarehouseDB: list_tables(prefix=table_prefix)
Catalog->>WarehouseDB: cleanup_tables(matching_tables)
end
end
Class diagram for updated Metastore and Catalog checkpoint cleanupclassDiagram
class AbstractMetastore {
+get_descendant_job_ids(job_id, conn)
+list_checkpoints(job_id=None, created_after=None, created_before=None, conn=None)
+remove_checkpoint(checkpoint, conn=None)
}
class AbstractDBMetastore {
+get_descendant_job_ids(job_id, conn)
+list_checkpoints(job_id=None, created_after=None, created_before=None, conn=None)
+remove_checkpoint(checkpoint, conn=None)
}
AbstractDBMetastore --|> AbstractMetastore
class Catalog {
+_remove_checkpoint(checkpoint)
+cleanup_checkpoints(ttl_seconds=None)
}
Catalog --> AbstractMetastore : metastore
Catalog --> WarehouseDB : warehouse
class WarehouseDB {
+list_tables(prefix)
+cleanup_tables(tables)
}
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Deploying datachain-documentation with
|
| Latest commit: |
91f7da5
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://f4332ecc.datachain-documentation.pages.dev |
| Branch Preview URL: | https://ilongin-1453-checkpoint-udf.datachain-documentation.pages.dev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `src/datachain/catalog/catalog.py:2061` </location>
<code_context>
+ # Remove job-specific tables for this checkpoint
+ # Table patterns: udf_{job_id}_{hash}_{suffix}
+ # where suffix can be: input, output, output_partial, processed
+ job_id_sanitized = checkpoint.job_id.replace("-", "")
+ table_prefix = f"udf_{job_id_sanitized}_{checkpoint.hash}_"
+ matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)
</code_context>
<issue_to_address>
**suggestion:** Sanitizing job_id by removing hyphens may not be sufficient for table name safety.
Consider validating job_id to exclude all invalid characters and reserved words to ensure table names remain safe and compliant.
Suggested implementation:
```python
# Remove job-specific tables for this checkpoint
# Table patterns: udf_{job_id}_{hash}_{suffix}
# where suffix can be: input, output, output_partial, processed
job_id_sanitized = self._sanitize_job_id(checkpoint.job_id)
table_prefix = f"udf_{job_id_sanitized}_{checkpoint.hash}_"
matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)
```
```python
def _sanitize_job_id(self, job_id: str) -> str:
"""
Sanitize job_id for safe table naming:
- Remove invalid characters (only allow alphanumeric and underscores)
- Ensure it does not match reserved words (simple check)
"""
import re
# Only allow alphanumeric and underscores
sanitized = re.sub(r'[^a-zA-Z0-9_]', '', job_id)
# List of reserved words (example, expand as needed)
reserved_words = {"select", "table", "from", "where", "insert", "update", "delete"}
if sanitized.lower() in reserved_words:
sanitized = f"job_{sanitized}"
return sanitized
def cleanup_checkpoints(self, ttl_seconds: int | None = None) -> None:
"""
Clean up outdated checkpoints and their associated UDF tables.
```
</issue_to_address>
### Comment 2
<location> `src/datachain/cli/commands/misc.py:21-22` </location>
<code_context>
print(f"Garbage collecting {len(temp_tables)} temporary tables.")
catalog.cleanup_tables(temp_tables)
+ print("Cleaning up outdated checkpoints.")
+ catalog.cleanup_checkpoints()
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider handling and reporting errors from cleanup_checkpoints.
Wrap catalog.cleanup_checkpoints() in a try/except block to prevent abrupt termination and provide user-friendly error reporting.
```suggestion
print("Cleaning up outdated checkpoints.")
try:
catalog.cleanup_checkpoints()
except Exception as e:
print(f"Error cleaning up checkpoints: {e}")
```
</issue_to_address>
### Comment 3
<location> `tests/func/test_checkpoints.py:111-120` </location>
<code_context>
+def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset):
</code_context>
<issue_to_address>
**suggestion (testing):** Test for boundary TTL values is missing.
Please add a test case where the checkpoint age matches the TTL to verify correct handling at the boundary and avoid off-by-one errors.
Suggested implementation:
```python
def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset):
"""Test that cleanup_checkpoints respects custom TTL parameter."""
from datetime import datetime, timedelta, timezone
catalog = test_session.catalog
metastore = catalog.metastore
# Create some checkpoints
reset_session_job_state()
chain = dc.read_dataset("nums", session=test_session)
chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
def test_cleanup_checkpoints_boundary_ttl(test_session, monkeypatch, nums_dataset):
"""Test cleanup_checkpoints at the boundary where checkpoint age == TTL."""
from datetime import datetime, timedelta, timezone
catalog = test_session.catalog
metastore = catalog.metastore
# Create a checkpoint
reset_session_job_state()
chain = dc.read_dataset("nums", session=test_session)
chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
job_id = chain.job_id
checkpoints = list(metastore.list_checkpoints(job_id))
assert len(checkpoints) > 0
# Set the checkpoint timestamp to exactly TTL seconds ago
ttl_seconds = 3600
boundary_time = datetime.now(timezone.utc) - timedelta(seconds=ttl_seconds)
for cp in checkpoints:
metastore.update_checkpoint_timestamp(cp.id, boundary_time)
# Run cleanup
catalog.cleanup_checkpoints(ttl_seconds=ttl_seconds)
# Check if checkpoint is retained (not deleted) at boundary
checkpoints_after = list(metastore.list_checkpoints(job_id))
# If the semantics are "older than TTL" is deleted, "equal to TTL" is retained
assert len(checkpoints_after) > 0, "Checkpoint at boundary TTL should not be deleted"
```
- Ensure that `metastore.update_checkpoint_timestamp` exists and works as expected. If not, you may need to implement a helper to set the checkpoint timestamp for testing.
- Adjust the assertion if your cleanup logic is supposed to delete checkpoints at exactly TTL (change the assertion to `assert len(checkpoints_after) == 0`).
- If your checkpoint model or metastore API differs, adapt the checkpoint timestamp update logic accordingly.
</issue_to_address>
### Comment 4
<location> `tests/func/test_checkpoints.py:171-180` </location>
<code_context>
+def test_cleanup_checkpoints_preserves_with_active_descendants(
</code_context>
<issue_to_address>
**suggestion (testing):** Missing test for multiple descendant branches.
Add a test case with a parent job and multiple child jobs, mixing active and inactive checkpoints, to verify correct checkpoint preservation in branching scenarios.
</issue_to_address>
### Comment 5
<location> `tests/func/test_checkpoints.py:272-281` </location>
<code_context>
+def test_cleanup_checkpoints_branch_pruning(test_session, nums_dataset):
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for partial pruning in a mixed lineage.
Please add a test case where only some jobs in a lineage are outdated, ensuring that cleanup_checkpoints correctly prunes only fully outdated branches and preserves those with active checkpoints.
</issue_to_address>
### Comment 6
<location> `tests/func/test_checkpoints.py:91-96` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 7
<location> `tests/func/test_checkpoints.py:131-136` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 8
<location> `tests/func/test_checkpoints.py:202-207` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 9
<location> `tests/func/test_checkpoints.py:246-251` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 10
<location> `tests/func/test_checkpoints.py:306-313` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 11
<location> `tests/func/test_checkpoints.py:308-313` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 12
<location> `tests/func/test_checkpoints.py:319-321` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 13
<location> `tests/func/test_metastore.py:963-973` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 14
<location> `src/datachain/catalog/catalog.py:2063-2065` </location>
<code_context>
def _remove_checkpoint(self, checkpoint: Checkpoint) -> None:
"""
Remove a checkpoint and its associated job-specific UDF tables.
Since tables are now job-scoped, this removes only the tables
belonging to this specific checkpoint's job.
Args:
checkpoint: The checkpoint object to remove.
"""
# Remove the checkpoint from metastore first
self.metastore.remove_checkpoint(checkpoint)
# Remove job-specific tables for this checkpoint
# Table patterns: udf_{job_id}_{hash}_{suffix}
# where suffix can be: input, output, output_partial, processed
job_id_sanitized = checkpoint.job_id.replace("-", "")
table_prefix = f"udf_{job_id_sanitized}_{checkpoint.hash}_"
matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)
if matching_tables:
self.warehouse.cleanup_tables(matching_tables)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
```suggestion
if matching_tables := self.warehouse.db.list_tables(prefix=table_prefix):
```
</issue_to_address>
### Comment 15
<location> `tests/func/test_checkpoints.py:103` </location>
<code_context>
def test_cleanup_checkpoints_with_ttl(test_session, monkeypatch, nums_dataset):
"""Test that cleanup_checkpoints removes old checkpoints and their UDF tables."""
from datetime import datetime, timedelta, timezone
catalog = test_session.catalog
metastore = catalog.metastore
warehouse = catalog.warehouse
# Create some checkpoints by running a chain with map (which creates UDF tables)
reset_session_job_state()
chain = dc.read_dataset("nums", session=test_session)
chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
chain.map(tripled=lambda num: num * 3, output=int).save("nums_tripled")
job_id = test_session.get_or_create_job().id
checkpoints_before = list(metastore.list_checkpoints(job_id))
assert len(checkpoints_before) == 4
assert all(c.partial is False for c in checkpoints_before)
# Verify UDF tables exist by checking all tables with udf_ prefix
# Note: Due to checkpoint skipping, some jobs may reuse parent tables
all_udf_tables_before = warehouse.db.list_tables(prefix="udf_")
# At least some UDF tables should exist from the operations
assert len(all_udf_tables_before) > 0
# Modify checkpoint created_at to be older than TTL (4 hours by default)
ch = metastore._checkpoints
old_time = datetime.now(timezone.utc) - timedelta(hours=5)
for checkpoint in checkpoints_before:
metastore.db.execute(
metastore._checkpoints.update()
.where(ch.c.id == checkpoint.id)
.values(created_at=old_time)
)
# Run cleanup_checkpoints with default TTL (4 hours)
catalog.cleanup_checkpoints()
# Verify checkpoints were removed
checkpoints_after = list(metastore.list_checkpoints(job_id))
assert len(checkpoints_after) == 0
# Verify job-specific UDF tables were removed
job_id_sanitized = job_id.replace("-", "")
udf_tables_after = warehouse.db.list_tables(prefix=f"udf_{job_id_sanitized}_")
assert len(udf_tables_after) == 0
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify sequence length comparison ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))
```suggestion
assert not checkpoints_after
```
</issue_to_address>
### Comment 16
<location> `tests/func/test_checkpoints.py:143` </location>
<code_context>
def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset):
"""Test that cleanup_checkpoints respects custom TTL parameter."""
from datetime import datetime, timedelta, timezone
catalog = test_session.catalog
metastore = catalog.metastore
# Create some checkpoints
reset_session_job_state()
chain = dc.read_dataset("nums", session=test_session)
chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
job_id = test_session.get_or_create_job().id
checkpoints = list(metastore.list_checkpoints(job_id))
assert len(checkpoints) == 2
assert all(c.partial is False for c in checkpoints)
# Modify all checkpoints to be 2 hours old
ch = metastore._checkpoints
old_time = datetime.now(timezone.utc) - timedelta(hours=2)
for checkpoint in checkpoints:
metastore.db.execute(
metastore._checkpoints.update()
.where(ch.c.id == checkpoint.id)
.values(created_at=old_time)
)
# Run cleanup with custom TTL of 1 hour (3600 seconds)
# Checkpoints are 2 hours old, so they should be removed
catalog.cleanup_checkpoints(ttl_seconds=3600)
# Verify checkpoints were removed
assert len(list(metastore.list_checkpoints(job_id))) == 0
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify sequence length comparison ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))
```suggestion
assert not list(metastore.list_checkpoints(job_id))
```
</issue_to_address>
### Comment 17
<location> `tests/func/test_checkpoints.py:211` </location>
<code_context>
def test_cleanup_checkpoints_preserves_with_active_descendants(
test_session, nums_dataset
):
"""
Test that outdated parent checkpoints are preserved when descendants have
active checkpoints.
"""
from datetime import datetime, timedelta, timezone
catalog = test_session.catalog
metastore = catalog.metastore
# Create parent job with checkpoints
reset_session_job_state()
chain = dc.read_dataset("nums", session=test_session)
chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
parent_job_id = test_session.get_or_create_job().id
# Create child job (will have parent_job_id set) with more recent checkpoints
reset_session_job_state()
chain.map(tripled=lambda num: num * 3, output=int).save("nums_tripled")
child_job_id = test_session.get_or_create_job().id
# Verify parent job is set correctly
child_job = metastore.get_job(child_job_id)
assert child_job.parent_job_id == parent_job_id
# Make parent checkpoints old (outdated)
parent_checkpoints = list(metastore.list_checkpoints(parent_job_id))
ch = metastore._checkpoints
old_time = datetime.now(timezone.utc) - timedelta(hours=5)
for checkpoint in parent_checkpoints:
metastore.db.execute(
metastore._checkpoints.update()
.where(ch.c.id == checkpoint.id)
.values(created_at=old_time)
)
# Child checkpoints remain recent (within TTL)
child_checkpoints = list(metastore.list_checkpoints(child_job_id))
assert len(child_checkpoints) > 0
# Run cleanup with default TTL (4 hours)
catalog.cleanup_checkpoints()
# Verify parent checkpoints were NOT removed (child still needs them)
parent_after = list(metastore.list_checkpoints(parent_job_id))
assert len(parent_after) == len(parent_checkpoints)
# Child checkpoints should still be there
child_after = list(metastore.list_checkpoints(child_job_id))
assert len(child_after) == len(child_checkpoints)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify sequence length comparison ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))
```suggestion
assert child_checkpoints
```
</issue_to_address>
### Comment 18
<location> `tests/func/test_checkpoints.py:321` </location>
<code_context>
def test_cleanup_checkpoints_branch_pruning(test_session, nums_dataset):
"""
Test that entire outdated job lineages are cleaned in one pass (branch pruning).
"""
from datetime import datetime, timedelta, timezone
catalog = test_session.catalog
metastore = catalog.metastore
# Create a lineage: root -> child -> grandchild
reset_session_job_state()
chain = dc.read_dataset("nums", session=test_session)
chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
root_job_id = test_session.get_or_create_job().id
reset_session_job_state()
chain.map(tripled=lambda num: num * 3, output=int).save("nums_tripled")
child_job_id = test_session.get_or_create_job().id
reset_session_job_state()
chain.map(quadrupled=lambda num: num * 4, output=int).save("nums_quadrupled")
grandchild_job_id = test_session.get_or_create_job().id
# Verify lineage
child_job = metastore.get_job(child_job_id)
grandchild_job = metastore.get_job(grandchild_job_id)
assert child_job.parent_job_id == root_job_id
assert grandchild_job.parent_job_id == child_job_id
# Make ALL checkpoints outdated (older than TTL)
all_job_ids = [root_job_id, child_job_id, grandchild_job_id]
ch = metastore._checkpoints
old_time = datetime.now(timezone.utc) - timedelta(hours=5)
for job_id in all_job_ids:
checkpoints = list(metastore.list_checkpoints(job_id))
for checkpoint in checkpoints:
metastore.db.execute(
metastore._checkpoints.update()
.where(ch.c.id == checkpoint.id)
.values(created_at=old_time)
)
# Run cleanup once
catalog.cleanup_checkpoints()
# Verify ALL jobs were cleaned in single pass (branch pruning)
for job_id in all_job_ids:
remaining = list(metastore.list_checkpoints(job_id))
assert len(remaining) == 0, f"Job {job_id} should have been cleaned"
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify sequence length comparison ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))
```suggestion
assert not remaining, f"Job {job_id} should have been cleaned"
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| # Remove job-specific tables for this checkpoint | ||
| # Table patterns: udf_{job_id}_{hash}_{suffix} | ||
| # where suffix can be: input, output, output_partial, processed | ||
| job_id_sanitized = checkpoint.job_id.replace("-", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Sanitizing job_id by removing hyphens may not be sufficient for table name safety.
Consider validating job_id to exclude all invalid characters and reserved words to ensure table names remain safe and compliant.
Suggested implementation:
# Remove job-specific tables for this checkpoint
# Table patterns: udf_{job_id}_{hash}_{suffix}
# where suffix can be: input, output, output_partial, processed
job_id_sanitized = self._sanitize_job_id(checkpoint.job_id)
table_prefix = f"udf_{job_id_sanitized}_{checkpoint.hash}_"
matching_tables = self.warehouse.db.list_tables(prefix=table_prefix) def _sanitize_job_id(self, job_id: str) -> str:
"""
Sanitize job_id for safe table naming:
- Remove invalid characters (only allow alphanumeric and underscores)
- Ensure it does not match reserved words (simple check)
"""
import re
# Only allow alphanumeric and underscores
sanitized = re.sub(r'[^a-zA-Z0-9_]', '', job_id)
# List of reserved words (example, expand as needed)
reserved_words = {"select", "table", "from", "where", "insert", "update", "delete"}
if sanitized.lower() in reserved_words:
sanitized = f"job_{sanitized}"
return sanitized
def cleanup_checkpoints(self, ttl_seconds: int | None = None) -> None:
"""
Clean up outdated checkpoints and their associated UDF tables.| print("Cleaning up outdated checkpoints.") | ||
| catalog.cleanup_checkpoints() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Consider handling and reporting errors from cleanup_checkpoints.
Wrap catalog.cleanup_checkpoints() in a try/except block to prevent abrupt termination and provide user-friendly error reporting.
| print("Cleaning up outdated checkpoints.") | |
| catalog.cleanup_checkpoints() | |
| print("Cleaning up outdated checkpoints.") | |
| try: | |
| catalog.cleanup_checkpoints() | |
| except Exception as e: | |
| print(f"Error cleaning up checkpoints: {e}") |
| def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset): | ||
| """Test that cleanup_checkpoints respects custom TTL parameter.""" | ||
| from datetime import datetime, timedelta, timezone | ||
|
|
||
| catalog = test_session.catalog | ||
| metastore = catalog.metastore | ||
|
|
||
| # Create some checkpoints | ||
| reset_session_job_state() | ||
| chain = dc.read_dataset("nums", session=test_session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Test for boundary TTL values is missing.
Please add a test case where the checkpoint age matches the TTL to verify correct handling at the boundary and avoid off-by-one errors.
Suggested implementation:
def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset):
"""Test that cleanup_checkpoints respects custom TTL parameter."""
from datetime import datetime, timedelta, timezone
catalog = test_session.catalog
metastore = catalog.metastore
# Create some checkpoints
reset_session_job_state()
chain = dc.read_dataset("nums", session=test_session)
chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
def test_cleanup_checkpoints_boundary_ttl(test_session, monkeypatch, nums_dataset):
"""Test cleanup_checkpoints at the boundary where checkpoint age == TTL."""
from datetime import datetime, timedelta, timezone
catalog = test_session.catalog
metastore = catalog.metastore
# Create a checkpoint
reset_session_job_state()
chain = dc.read_dataset("nums", session=test_session)
chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
job_id = chain.job_id
checkpoints = list(metastore.list_checkpoints(job_id))
assert len(checkpoints) > 0
# Set the checkpoint timestamp to exactly TTL seconds ago
ttl_seconds = 3600
boundary_time = datetime.now(timezone.utc) - timedelta(seconds=ttl_seconds)
for cp in checkpoints:
metastore.update_checkpoint_timestamp(cp.id, boundary_time)
# Run cleanup
catalog.cleanup_checkpoints(ttl_seconds=ttl_seconds)
# Check if checkpoint is retained (not deleted) at boundary
checkpoints_after = list(metastore.list_checkpoints(job_id))
# If the semantics are "older than TTL" is deleted, "equal to TTL" is retained
assert len(checkpoints_after) > 0, "Checkpoint at boundary TTL should not be deleted"- Ensure that
metastore.update_checkpoint_timestampexists and works as expected. If not, you may need to implement a helper to set the checkpoint timestamp for testing. - Adjust the assertion if your cleanup logic is supposed to delete checkpoints at exactly TTL (change the assertion to
assert len(checkpoints_after) == 0). - If your checkpoint model or metastore API differs, adapt the checkpoint timestamp update logic accordingly.
| def test_cleanup_checkpoints_preserves_with_active_descendants( | ||
| test_session, nums_dataset | ||
| ): | ||
| """ | ||
| Test that outdated parent checkpoints are preserved when descendants have | ||
| active checkpoints. | ||
| """ | ||
| from datetime import datetime, timedelta, timezone | ||
|
|
||
| catalog = test_session.catalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Missing test for multiple descendant branches.
Add a test case with a parent job and multiple child jobs, mixing active and inactive checkpoints, to verify correct checkpoint preservation in branching scenarios.
| def test_cleanup_checkpoints_branch_pruning(test_session, nums_dataset): | ||
| """ | ||
| Test that entire outdated job lineages are cleaned in one pass (branch pruning). | ||
| """ | ||
| from datetime import datetime, timedelta, timezone | ||
|
|
||
| catalog = test_session.catalog | ||
| metastore = catalog.metastore | ||
|
|
||
| # Create a lineage: root -> child -> grandchild |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Consider adding a test for partial pruning in a mixed lineage.
Please add a test case where only some jobs in a lineage are outdated, ensuring that cleanup_checkpoints correctly prunes only fully outdated branches and preserves those with active checkpoints.
| matching_tables = self.warehouse.db.list_tables(prefix=table_prefix) | ||
|
|
||
| if matching_tables: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Use named expression to simplify assignment and conditional (use-named-expression)
| matching_tables = self.warehouse.db.list_tables(prefix=table_prefix) | |
| if matching_tables: | |
| if matching_tables := self.warehouse.db.list_tables(prefix=table_prefix): |
|
|
||
| # Verify checkpoints were removed | ||
| checkpoints_after = list(metastore.list_checkpoints(job_id)) | ||
| assert len(checkpoints_after) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Simplify sequence length comparison (simplify-len-comparison)
| assert len(checkpoints_after) == 0 | |
| assert not checkpoints_after |
| catalog.cleanup_checkpoints(ttl_seconds=3600) | ||
|
|
||
| # Verify checkpoints were removed | ||
| assert len(list(metastore.list_checkpoints(job_id))) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Simplify sequence length comparison (simplify-len-comparison)
| assert len(list(metastore.list_checkpoints(job_id))) == 0 | |
| assert not list(metastore.list_checkpoints(job_id)) |
|
|
||
| # Child checkpoints remain recent (within TTL) | ||
| child_checkpoints = list(metastore.list_checkpoints(child_job_id)) | ||
| assert len(child_checkpoints) > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Simplify sequence length comparison (simplify-len-comparison)
| assert len(child_checkpoints) > 0 | |
| assert child_checkpoints |
| # Verify ALL jobs were cleaned in single pass (branch pruning) | ||
| for job_id in all_job_ids: | ||
| remaining = list(metastore.list_checkpoints(job_id)) | ||
| assert len(remaining) == 0, f"Job {job_id} should have been cleaned" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Simplify sequence length comparison (simplify-len-comparison)
| assert len(remaining) == 0, f"Job {job_id} should have been cleaned" | |
| assert not remaining, f"Job {job_id} should have been cleaned" |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## ilongin/1392-udf-checkpoints #1454 +/- ##
===============================================================
Coverage ? 87.93%
===============================================================
Files ? 160
Lines ? 15638
Branches ? 2272
===============================================================
Hits ? 13752
Misses ? 1359
Partials ? 527
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Implements checkpoint cleanup with TTL-based garbage collection:
This builds on top of the core checkpoint functionality to provide automatic cleanup of outdated checkpoints and their associated tables.
Related to #1453
Summary by Sourcery
Add TTL-based garbage collection for UDF checkpoints to prevent storage bloat while preserving dependencies.
New Features:
Tests: