Skip to content

Conversation

@ilongin
Copy link
Contributor

@ilongin ilongin commented Nov 26, 2025

When a dataset is created in a job that fails, re-running the script creates a new job but the dataset still references the old (failed) job. This makes the dataset invisible in subsequent job runs in Studio UI.
This PR implements a many-to-many relationship between dataset versions and jobs using a dataset_version_jobs junction table with an is_creator flag. It also fixes current issue in checkpoint code - when checkpoint is found what dataset version to return? Currently we return latest version but that's wrong as it might not be the one that is created in this job chain (it may be created by some other random job) so in this PR we try to find the correct version created in this specif job chain / hierarchy.

Changes:

  • Added dataset_version_jobs table to metastore schemas
  • Added link_dataset_version_to_job(), get_ancestor_job_ids(), and get_dataset_version_for_job_ancestry() metastore methods
  • Updated DataChain._resolve_checkpoint() to use job ancestry for finding dataset versions
  • Updated DatasetQuery.save() to link datasets to jobs on creation

The existing job_id column in dataset_versions table remains unchanged for backward compatibility and can be deprecated in a future release.
Jobs that create datasets are linked with is_creator=True, while jobs that reuse datasets via checkpoints are linked with is_creator=False.

@ilongin ilongin marked this pull request as draft November 26, 2025 10:01
@ilongin ilongin linked an issue Nov 26, 2025 that may be closed by this pull request
@codecov
Copy link

codecov bot commented Nov 26, 2025

Codecov Report

❌ Patch coverage is 90.54054% with 7 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/datachain/data_storage/metastore.py 87.80% 2 Missing and 3 partials ⚠️
src/datachain/lib/dc/datachain.py 66.66% 1 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

@ilongin ilongin marked this pull request as ready for review November 27, 2025 15:36
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a many-to-many relationship between dataset versions and jobs to fix an issue where datasets created in failed jobs become invisible in subsequent job runs. The solution introduces a junction table to track which jobs created or used each dataset version, and updates checkpoint resolution to find the correct dataset version based on job ancestry rather than just using the latest version.

Key changes:

  • Added dataset_version_jobs junction table with is_creator flag to track many-to-many relationships
  • Implemented job ancestry traversal using recursive CTEs to find correct dataset versions in checkpoint chains
  • Refactored parse_schema from a static method to a module-level function for better reusability

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/datachain/data_storage/metastore.py Added junction table schema, abstract methods, and implementations for linking dataset versions to jobs and traversing job ancestry
src/datachain/data_storage/sqlite.py Added table initialization for the new junction table
src/datachain/lib/dc/datachain.py Updated checkpoint resolution to use job ancestry for finding correct dataset versions
src/datachain/query/dataset.py Added job_id parameter and linking logic when datasets are created
src/datachain/dataset.py Refactored parse_schema from DatasetRecord static method to module-level function with updated type signature
src/datachain/catalog/catalog.py Updated to use new parse_schema function
tests/unit/test_dataset.py Updated test calls to use module-level parse_schema function
tests/unit/lib/test_checkpoints.py Added comprehensive tests for dataset-job linking and checkpoint reuse scenarios
tests/func/test_metastore.py Added tests for get_ancestor_job_ids with various hierarchy depths
tests/utils.py Added DATACHAIN_JOB_ID environment variable cleanup to reset_session_job_state()

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Dec 1, 2025

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Updated (UTC)
❌ Deployment failed
View logs
datachain-docs 6dc3739 Dec 03 2025, 02:20 AM

@ilongin ilongin requested a review from amritghimire December 1, 2025 15:12
is_creator: bool = False,
conn=None,
) -> None:
query = self._dataset_version_jobs_insert().values(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we check that both objects are actually related and belong to the same team?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both objects are guaranteed to belong to the same team because they're both created/fetched through the same metastore instance which is scoped to a team_id. The method is only called from internal business logic within the same session/team context, not from any external API that could pass arbitrary IDs. Therefor I don't think we need to add this kind of checks everywhere.... what is enough is that those CRUD database functions e.g _dataset_version_jobs_insert are enforced to be used and cannot be bypassed but this is way out of the scope of this PR .. I think we need another issue with adding those and related Studio tests and anything else that's needed.

max_depth = 100

ancestors_cte = (
self._jobs_select(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does _jobs_select filter by team? do we have a test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does filter by team_id. Safest thing is always use these helper methods for any kind of start of the query as it's then overriden in Studio to include team_id filter. We generally don't have explicit tests for these methods .. we should think of what is the best way to add them as they are not really for datachain repo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • we should be raising an exception by default in each of those helpers (do we do this?) in abstract metastore
  • we can have test on the Studio side - we already have plenty of tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in related comment #1480 (comment) we need separate task for this as in every PR we have similar questions / issues.

) -> "Select":
"""Build query to find dataset version created by job ancestry."""
return (
self._datasets_versions_select()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do both selects filter by team id already? do we have tests for this?

can we actually make all those helpers raise if metastore doesn't implement them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DatasetVersion table is one of those old tables which doesn't have copied team_id but has connection to Dataset which does have team_id. In this case, whole _get_dataset_version_for_job_ancestry_query method is overriden in Studio by adding team_id filter since we couldn't do the same with _datasets_versions_select

Copy link
Contributor Author

@ilongin ilongin Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding other question: Since this repeats for all models (we have the same problem) I think we need separate task to:

  1. Think of how to automate this or raise exception or something as you mentioned
  2. Write tests for it

# checkpoint found → find which dataset version to reuse

# Find dataset version that was created by any ancestor job
dataset_version = metastore.get_dataset_version_for_job_ancestry(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add inside those implementations some logs to actually trace it then (if job was found or not), if found - print its id, etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added couple of debugs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I means Studio primarily btw, not datachain (or datachain if we know that it will be visible on backend in Studio)

dataset_version.id, job.id, is_creator=False
)

# Update dataset_version.job_id to point to the latest job
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this pointer?

we need then think about transaction here (link above and this update)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this since it will be used in pipelines (this was the main issue for that task) and to be backward compatible but yes, this is now redundant and should be deprecated or even maybe removed .I will see if pipelines can just use the same get_dataset_version_for_job_ancestry method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove it then for now, and add separately? otherwise we have a risk on leaving as is and fighting then with some half-broken state (I saw this happening already)

self.catalog.update_dataset_version_with_warehouse_info(dataset, version)

# Link this dataset version to the job that created it
if job_id and dataset.has_version(version):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can it be that we don't have job_id?

how can it be we don't have this version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we create temp datasets with .persist we don't track that with a job so job_id is not defined.
Second check for dataset version is redundant - removing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, a lot of questions ...

  • why do we do things like self.catalog.update_dataset_version_with_warehouse_info(dataset, version) for persist()?
  • if this is a difference between different types - it is better to have some explicit flag / name. It is weird that I have to remember not to pass job_id in persist. Or what we decide to refactor and always get it here instead of providing it?

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Dec 3, 2025

Deploying datachain with  Cloudflare Pages  Cloudflare Pages

Latest commit: aeb9307
Status: ✅  Deploy successful!
Preview URL: https://ee17ccbc.datachain-2g6.pages.dev
Branch Preview URL: https://ilongin-1477-fix-dataset-job.datachain-2g6.pages.dev

View logs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Associate dataset to latest job

4 participants