Skip to content

Conversation

@shcheklein
Copy link
Contributor

Fix resources cleanup. Mostly to avoid tons of warnings in tests, but also probably avoid some leaks.

@codecov
Copy link

codecov bot commented Nov 24, 2025

@shcheklein shcheklein force-pushed the cleanup-resource-lifecycle branch 3 times, most recently from 8ff056a to 14ef1ad Compare November 24, 2025 03:02
@shcheklein shcheklein self-assigned this Nov 24, 2025
@shcheklein shcheklein force-pushed the cleanup-resource-lifecycle branch 3 times, most recently from 2e1f3ee to fc1654e Compare November 29, 2025 15:22
@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Nov 30, 2025

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Updated (UTC)
✅ Deployment successful!
View logs
datachain-docs f0bce78 Dec 01 2025, 06:59 PM

@shcheklein shcheklein force-pushed the cleanup-resource-lifecycle branch from a07be3c to aded330 Compare December 1, 2025 02:30
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request focuses on improving resource cleanup and handling throughout the codebase to avoid warnings in tests and prevent potential resource leaks. The main changes include:

  • Converting functions to use context managers for proper resource cleanup
  • Adding try/finally blocks in tests to ensure resources are closed
  • Using WeakSet for tracking sessions instead of gc.get_objects()
  • Making pytest treat warnings as errors by default to catch issues early
  • Properly closing connections and file handles

Reviewed changes

Copilot reviewed 33 out of 33 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/unit/test_warehouse.py Added try/finally blocks to ensure cloned warehouse objects are closed
tests/unit/test_session.py Updated test to use catalog fixture and proper session context manager
tests/unit/test_metastore.py Wrapped metastore cloning tests in try/finally blocks for cleanup
tests/unit/test_listing.py Converted listing fixture to use yield and cleanup in finally block
tests/unit/test_datachain_hash.py Added session parameter to avoid creating unmanaged sessions
tests/unit/test_database_engine.py Added close() call after database engine test
tests/unit/test_catalog_loader.py Added resource cleanup with clones and try/finally blocks
tests/unit/lib/test_file.py Added pytest.deprecated_call() to handle resolve() deprecation warning
tests/unit/lib/test_datachain.py Added deprecation warnings handling for batch_map tests
tests/test_query_e2e.py Added pipe closure in subprocess cleanup, converted stdin handling to context manager
tests/func/test_video.py Wrapped PIL Image.open() calls in context managers
tests/func/test_to_database.py Used closing() for sqlite3 connection context management
tests/func/test_retry.py Fixed type issues with to_list() return values using cast
tests/func/test_read_database.py Properly managed sqlite3 connection lifecycle
tests/func/test_image.py Wrapped PIL Image.open() calls in context managers
tests/func/test_datachain.py Removed unused catalog parameter, wrapped Image.open() in context managers
tests/func/test_catalog.py Added enlist_source context manager wrapper for proper listing cleanup
tests/func/model/test_yolo.py Wrapped Image.open() calls in context managers and improved tensor construction
tests/conftest.py Moved Session.cleanup_for_tests() to after yield, added catalog.close() calls
src/datachain/studio.py Suppressed dateparser deprecation warning with documentation
src/datachain/sql/sqlite/base.py Added closing() for sqlite3 connections to avoid resource warnings
src/datachain/query/session.py Replaced gc.get_objects() with WeakSet for session tracking, added _close_all_contexts()
src/datachain/listing.py Added _closed flag to prevent double-closing, ensure both metastore and warehouse are closed
src/datachain/lib/pytorch.py Closed catalog after initialization if created internally
src/datachain/lib/file.py Changed to use Session.get().catalog instead of creating new catalog
src/datachain/lib/dc/database.py Used StaticPool for sqlite3 connections to properly manage connection lifecycle
src/datachain/data_storage/warehouse.py Converted comment to docstring
src/datachain/data_storage/sqlite.py Added _init_guard() context manager for cleanup on initialization failure
src/datachain/data_storage/metastore.py Added _init_guard() context manager base implementation
src/datachain/cli/commands/ls.py Used context managers for catalog lifecycle management
src/datachain/cli/init.py Added catalog.close() in finally block
src/datachain/catalog/catalog.py Added Catalog context manager support, converted enlist_sources to context manager, added NodeGroup.close()
pyproject.toml Made pytest treat all warnings as errors by default with specific third-party exceptions

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@shcheklein shcheklein requested a review from a team December 1, 2025 18:56
@shcheklein shcheklein changed the title cleanup and fix resource handling cleanup warnings, fail on any new warnings Dec 1, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 34 out of 34 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

src/datachain/catalog/catalog.py:745

  • The enlist_sources_grouped method creates Listing objects (lines 665-670) with cloned metastore and warehouse connections. If an exception occurs during the loop (e.g., during to_db_records() call or in subsequent iterations), these listings would leak resources since they won't be cleaned up. Consider wrapping this method's body in a try/finally block to close any created listings on failure, similar to how enlist_sources is now a context manager.
    def enlist_sources_grouped(
        self,
        sources: list[str],
        update: bool,
        no_glob: bool = False,
        client_config=None,
    ) -> list[NodeGroup]:
        from datachain.listing import Listing
        from datachain.query.dataset import DatasetQuery

        def _row_to_node(d: dict[str, Any]) -> Node:
            del d["file__source"]
            return Node.from_row(d)

        enlisted_sources: list[tuple[bool, bool, Any]] = []
        client_config = client_config or self.client_config
        for src in sources:  # Opt: parallel
            listing: Listing | None
            if src.startswith("ds://"):
                ds_name, ds_version = parse_dataset_uri(src)
                ds_namespace, ds_project, ds_name = parse_dataset_name(ds_name)
                assert ds_namespace
                assert ds_project
                dataset = self.get_dataset(
                    ds_name, namespace_name=ds_namespace, project_name=ds_project
                )
                if not ds_version:
                    ds_version = dataset.latest_version
                dataset_sources = self.warehouse.get_dataset_sources(
                    dataset,
                    ds_version,
                )
                indexed_sources = []
                for source in dataset_sources:
                    client = self.get_client(source, **client_config)
                    uri = client.uri
                    dataset_name, _, _, _ = get_listing(uri, self.session)
                    assert dataset_name
                    listing = Listing(
                        self.metastore.clone(),
                        self.warehouse.clone(),
                        client,
                        dataset_name=dataset_name,
                    )
                    rows = DatasetQuery(
                        name=dataset.name,
                        namespace_name=dataset.project.namespace.name,
                        project_name=dataset.project.name,
                        version=ds_version,
                        catalog=self,
                    ).to_db_records()
                    indexed_sources.append(
                        (
                            listing,
                            client,
                            source,
                            [_row_to_node(r) for r in rows],
                            ds_name,
                            ds_version,
                        )  # type: ignore [arg-type]
                    )

                enlisted_sources.append((False, True, indexed_sources))
            else:
                listing, client, source_path = self.enlist_source(
                    src, update, client_config=client_config
                )
                enlisted_sources.append((False, False, (listing, client, source_path)))

        node_groups = []
        for is_datachain, is_dataset, payload in enlisted_sources:  # Opt: parallel
            if is_dataset:
                for (
                    listing,
                    client,
                    source_path,
                    nodes,
                    dataset_name,
                    dataset_version,
                ) in payload:
                    assert listing
                    dsrc = [DataSource(listing, client, node) for node in nodes]
                    node_groups.append(
                        NodeGroup(
                            listing,
                            client,
                            dsrc,
                            source_path,
                            dataset_name=dataset_name,
                            dataset_version=dataset_version,
                        )
                    )
            elif is_datachain:
                for listing, source_path, paths in payload:
                    assert listing
                    dsrc = [
                        DataSource(listing, listing.client, listing.resolve_path(p))
                        for p in paths
                    ]
                    node_groups.append(
                        NodeGroup(
                            listing,
                            listing.client,
                            dsrc,
                            source_path,
                        )
                    )
            else:
                listing, client, source_path = payload
                if not listing:
                    nodes = [Node.from_file(client.get_file_info(source_path))]
                    as_container = False
                else:
                    as_container = source_path.endswith("/")
                    nodes = listing.expand_path(source_path, use_glob=not no_glob)
                dsrc = [DataSource(listing, client, n, as_container) for n in nodes]
                node_groups.append(NodeGroup(listing, client, dsrc, source_path))

        return node_groups

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


def copy(self, cache=True, db=True):
"""
Create a shallow copy of this catalog.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this kind of copy even needed? Why we just cannot copy the connections (metastore/warehoue objects) as well to avoid extra complexity and potential sources for strange / hard to debug issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is indeed look redundant (there is some logic on CH side where we share connections, but this probably even not related directly). Proper / careful research is needed to remove the whole copy probably. Goes a bit beyond the intention of this PR

(close is still needed, context manager methods like _exit, etc are also good to have to my mind since we have closeable resources anyways)

self.close()

@contextmanager
def _init_guard(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

little bit weird name IMO but cannot think of alternatives though. Maybe _init_cleanup() or _on_failure_cleanup()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw why did we need this as a special method? Maybe it would be easier to just move this code where it's used, since it's used only in one place....it will be clear what it does and saves reader for one jump / search for this method.
So something like:

  try:
      self._init_meta_table()
      self._init_meta_schema_value()
      self._check_schema_version()
      self._init_tables()
      self._init_namespaces_projects()
  except Exception:
      with suppress(Exception):
          self.close_on_exit()
      raise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, intention was to also use it in the CH warehouse (it requires a separate PR to cleanup resource leaks)

return self.db.cursor(factory)

def close(self) -> None:
if self.is_closed:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think close() is idempotent so we can call it multiple times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, but I think it fine to not abuse it if the call is not needed


GLOBAL_SESSION_CTX: "Session | None" = None
SESSION_CONTEXTS: ClassVar[list["Session"]] = []
_ALL_SESSIONS: ClassVar[WeakSet["Session"]] = WeakSet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between _ALL_SESSIONS and SESSION_CONTEXTS ? Aren't they redundant or I'm missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SESSION_CONTEXTS is managed by with Session ... (enter / exit mechanism) to keep the current session at the top of the stack, etc. If someone directly creates a session w/o enter / exit - we sill want to clean it up at exit at least. In certain cases it might be hard to wrap all code into with (e.g. we pass it down and we don't control anymore its lifecycle, etc). It is also not 100% clear that we have to require from user always using with.

Some sessions might be created automatically (e.g. with some custom config) for users.

Even if we decide to make it very strict - it is also a bigger change that goes beyond this PR's intention to my mind.

Copy link
Contributor

@ilongin ilongin Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, make sense since we shouldn't enforce usage of context manager.

@shcheklein shcheklein requested a review from ilongin December 2, 2025 18:02
Copy link
Contributor

@dreadatour dreadatour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall. A lot of changes, hard to review them all in one go. I am totally OK with merging this and move forward. Added one tiny non-related comment.

"error::pytest_mock.PytestMockWarning",
"error::pytest.PytestCollectionWarning",
"error::sqlalchemy.exc.SADeprecationWarning",
"error", # Treat all warnings as errors by default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome, thank you! 🔥

DataSource(listing, client, node, dir_only) for node in nodes
)
return dsrc_all
) -> Iterator[list["DataSource"] | None]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want to improve this in a follow-up. Iterator returning a list or None looks a bit odd to me. I may be wrong, anyway, it looks like it is a bit out of the scope of this PR.

@shcheklein shcheklein merged commit df75ea1 into main Dec 3, 2025
40 of 41 checks passed
@shcheklein shcheklein deleted the cleanup-resource-lifecycle branch December 3, 2025 14:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants