-
Notifications
You must be signed in to change notification settings - Fork 132
Add many-to-many relationship between dataset versions and jobs #1480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements a many-to-many relationship between dataset versions and jobs to fix an issue where datasets created in failed jobs become invisible in subsequent job runs. The solution introduces a junction table to track which jobs created or used each dataset version, and updates checkpoint resolution to find the correct dataset version based on job ancestry rather than just using the latest version.
Key changes:
- Added
dataset_version_jobsjunction table withis_creatorflag to track many-to-many relationships - Implemented job ancestry traversal using recursive CTEs to find correct dataset versions in checkpoint chains
- Refactored
parse_schemafrom a static method to a module-level function for better reusability
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/datachain/data_storage/metastore.py | Added junction table schema, abstract methods, and implementations for linking dataset versions to jobs and traversing job ancestry |
| src/datachain/data_storage/sqlite.py | Added table initialization for the new junction table |
| src/datachain/lib/dc/datachain.py | Updated checkpoint resolution to use job ancestry for finding correct dataset versions |
| src/datachain/query/dataset.py | Added job_id parameter and linking logic when datasets are created |
| src/datachain/dataset.py | Refactored parse_schema from DatasetRecord static method to module-level function with updated type signature |
| src/datachain/catalog/catalog.py | Updated to use new parse_schema function |
| tests/unit/test_dataset.py | Updated test calls to use module-level parse_schema function |
| tests/unit/lib/test_checkpoints.py | Added comprehensive tests for dataset-job linking and checkpoint reuse scenarios |
| tests/func/test_metastore.py | Added tests for get_ancestor_job_ids with various hierarchy depths |
| tests/utils.py | Added DATACHAIN_JOB_ID environment variable cleanup to reset_session_job_state() |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ❌ Deployment failed View logs |
datachain-docs | 6dc3739 | Dec 03 2025, 02:20 AM |
| is_creator: bool = False, | ||
| conn=None, | ||
| ) -> None: | ||
| query = self._dataset_version_jobs_insert().values( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we check that both objects are actually related and belong to the same team?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both objects are guaranteed to belong to the same team because they're both created/fetched through the same metastore instance which is scoped to a team_id. The method is only called from internal business logic within the same session/team context, not from any external API that could pass arbitrary IDs. Therefor I don't think we need to add this kind of checks everywhere.... what is enough is that those CRUD database functions e.g _dataset_version_jobs_insert are enforced to be used and cannot be bypassed but this is way out of the scope of this PR .. I think we need another issue with adding those and related Studio tests and anything else that's needed.
| max_depth = 100 | ||
|
|
||
| ancestors_cte = ( | ||
| self._jobs_select( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does _jobs_select filter by team? do we have a test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it does filter by team_id. Safest thing is always use these helper methods for any kind of start of the query as it's then overriden in Studio to include team_id filter. We generally don't have explicit tests for these methods .. we should think of what is the best way to add them as they are not really for datachain repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- we should be raising an exception by default in each of those helpers (do we do this?) in abstract metastore
- we can have test on the Studio side - we already have plenty of tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in related comment #1480 (comment) we need separate task for this as in every PR we have similar questions / issues.
| ) -> "Select": | ||
| """Build query to find dataset version created by job ancestry.""" | ||
| return ( | ||
| self._datasets_versions_select() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do both selects filter by team id already? do we have tests for this?
can we actually make all those helpers raise if metastore doesn't implement them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DatasetVersion table is one of those old tables which doesn't have copied team_id but has connection to Dataset which does have team_id. In this case, whole _get_dataset_version_for_job_ancestry_query method is overriden in Studio by adding team_id filter since we couldn't do the same with _datasets_versions_select
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding other question: Since this repeats for all models (we have the same problem) I think we need separate task to:
- Think of how to automate this or raise exception or something as you mentioned
- Write tests for it
src/datachain/lib/dc/datachain.py
Outdated
| # checkpoint found → find which dataset version to reuse | ||
|
|
||
| # Find dataset version that was created by any ancestor job | ||
| dataset_version = metastore.get_dataset_version_for_job_ancestry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add inside those implementations some logs to actually trace it then (if job was found or not), if found - print its id, etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added couple of debugs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I means Studio primarily btw, not datachain (or datachain if we know that it will be visible on backend in Studio)
| dataset_version.id, job.id, is_creator=False | ||
| ) | ||
|
|
||
| # Update dataset_version.job_id to point to the latest job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this pointer?
we need then think about transaction here (link above and this update)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left this since it will be used in pipelines (this was the main issue for that task) and to be backward compatible but yes, this is now redundant and should be deprecated or even maybe removed .I will see if pipelines can just use the same get_dataset_version_for_job_ancestry method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove it then for now, and add separately? otherwise we have a risk on leaving as is and fighting then with some half-broken state (I saw this happening already)
src/datachain/query/dataset.py
Outdated
| self.catalog.update_dataset_version_with_warehouse_info(dataset, version) | ||
|
|
||
| # Link this dataset version to the job that created it | ||
| if job_id and dataset.has_version(version): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can it be that we don't have job_id?
how can it be we don't have this version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we create temp datasets with .persist we don't track that with a job so job_id is not defined.
Second check for dataset version is redundant - removing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, a lot of questions ...
- why do we do things like
self.catalog.update_dataset_version_with_warehouse_info(dataset, version)for persist()? - if this is a difference between different types - it is better to have some explicit flag / name. It is weird that I have to remember not to pass job_id in persist. Or what we decide to refactor and always get it here instead of providing it?
Deploying datachain with
|
| Latest commit: |
aeb9307
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://ee17ccbc.datachain-2g6.pages.dev |
| Branch Preview URL: | https://ilongin-1477-fix-dataset-job.datachain-2g6.pages.dev |
When a dataset is created in a job that fails, re-running the script creates a new job but the dataset still references the old (failed) job. This makes the dataset invisible in subsequent job runs in Studio UI.
This PR implements a many-to-many relationship between dataset versions and jobs using a dataset_version_jobs junction table with an is_creator flag. It also fixes current issue in checkpoint code - when checkpoint is found what dataset version to return? Currently we return latest version but that's wrong as it might not be the one that is created in this job chain (it may be created by some other random job) so in this PR we try to find the correct version created in this specif job chain / hierarchy.
Changes:
dataset_version_jobstable to metastore schemaslink_dataset_version_to_job(),get_ancestor_job_ids(), andget_dataset_version_for_job_ancestry()metastore methodsDataChain._resolve_checkpoint()to use job ancestry for finding dataset versionsDatasetQuery.save()to link datasets to jobs on creationThe existing
job_idcolumn indataset_versionstable remains unchanged for backward compatibility and can be deprecated in a future release.Jobs that create datasets are linked with
is_creator=True, while jobs that reuse datasets via checkpoints are linked withis_creator=False.