-
Notifications
You must be signed in to change notification settings - Fork 133
cleanup warnings, fail on any new warnings #1474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
8ff056a to
14ef1ad
Compare
2e1f3ee to
fc1654e
Compare
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
datachain-docs | f0bce78 | Dec 01 2025, 06:59 PM |
a07be3c to
aded330
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request focuses on improving resource cleanup and handling throughout the codebase to avoid warnings in tests and prevent potential resource leaks. The main changes include:
- Converting functions to use context managers for proper resource cleanup
- Adding try/finally blocks in tests to ensure resources are closed
- Using WeakSet for tracking sessions instead of gc.get_objects()
- Making pytest treat warnings as errors by default to catch issues early
- Properly closing connections and file handles
Reviewed changes
Copilot reviewed 33 out of 33 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/test_warehouse.py | Added try/finally blocks to ensure cloned warehouse objects are closed |
| tests/unit/test_session.py | Updated test to use catalog fixture and proper session context manager |
| tests/unit/test_metastore.py | Wrapped metastore cloning tests in try/finally blocks for cleanup |
| tests/unit/test_listing.py | Converted listing fixture to use yield and cleanup in finally block |
| tests/unit/test_datachain_hash.py | Added session parameter to avoid creating unmanaged sessions |
| tests/unit/test_database_engine.py | Added close() call after database engine test |
| tests/unit/test_catalog_loader.py | Added resource cleanup with clones and try/finally blocks |
| tests/unit/lib/test_file.py | Added pytest.deprecated_call() to handle resolve() deprecation warning |
| tests/unit/lib/test_datachain.py | Added deprecation warnings handling for batch_map tests |
| tests/test_query_e2e.py | Added pipe closure in subprocess cleanup, converted stdin handling to context manager |
| tests/func/test_video.py | Wrapped PIL Image.open() calls in context managers |
| tests/func/test_to_database.py | Used closing() for sqlite3 connection context management |
| tests/func/test_retry.py | Fixed type issues with to_list() return values using cast |
| tests/func/test_read_database.py | Properly managed sqlite3 connection lifecycle |
| tests/func/test_image.py | Wrapped PIL Image.open() calls in context managers |
| tests/func/test_datachain.py | Removed unused catalog parameter, wrapped Image.open() in context managers |
| tests/func/test_catalog.py | Added enlist_source context manager wrapper for proper listing cleanup |
| tests/func/model/test_yolo.py | Wrapped Image.open() calls in context managers and improved tensor construction |
| tests/conftest.py | Moved Session.cleanup_for_tests() to after yield, added catalog.close() calls |
| src/datachain/studio.py | Suppressed dateparser deprecation warning with documentation |
| src/datachain/sql/sqlite/base.py | Added closing() for sqlite3 connections to avoid resource warnings |
| src/datachain/query/session.py | Replaced gc.get_objects() with WeakSet for session tracking, added _close_all_contexts() |
| src/datachain/listing.py | Added _closed flag to prevent double-closing, ensure both metastore and warehouse are closed |
| src/datachain/lib/pytorch.py | Closed catalog after initialization if created internally |
| src/datachain/lib/file.py | Changed to use Session.get().catalog instead of creating new catalog |
| src/datachain/lib/dc/database.py | Used StaticPool for sqlite3 connections to properly manage connection lifecycle |
| src/datachain/data_storage/warehouse.py | Converted comment to docstring |
| src/datachain/data_storage/sqlite.py | Added _init_guard() context manager for cleanup on initialization failure |
| src/datachain/data_storage/metastore.py | Added _init_guard() context manager base implementation |
| src/datachain/cli/commands/ls.py | Used context managers for catalog lifecycle management |
| src/datachain/cli/init.py | Added catalog.close() in finally block |
| src/datachain/catalog/catalog.py | Added Catalog context manager support, converted enlist_sources to context manager, added NodeGroup.close() |
| pyproject.toml | Made pytest treat all warnings as errors by default with specific third-party exceptions |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 34 out of 34 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
src/datachain/catalog/catalog.py:745
- The
enlist_sources_groupedmethod createsListingobjects (lines 665-670) with cloned metastore and warehouse connections. If an exception occurs during the loop (e.g., duringto_db_records()call or in subsequent iterations), these listings would leak resources since they won't be cleaned up. Consider wrapping this method's body in a try/finally block to close any created listings on failure, similar to howenlist_sourcesis now a context manager.
def enlist_sources_grouped(
self,
sources: list[str],
update: bool,
no_glob: bool = False,
client_config=None,
) -> list[NodeGroup]:
from datachain.listing import Listing
from datachain.query.dataset import DatasetQuery
def _row_to_node(d: dict[str, Any]) -> Node:
del d["file__source"]
return Node.from_row(d)
enlisted_sources: list[tuple[bool, bool, Any]] = []
client_config = client_config or self.client_config
for src in sources: # Opt: parallel
listing: Listing | None
if src.startswith("ds://"):
ds_name, ds_version = parse_dataset_uri(src)
ds_namespace, ds_project, ds_name = parse_dataset_name(ds_name)
assert ds_namespace
assert ds_project
dataset = self.get_dataset(
ds_name, namespace_name=ds_namespace, project_name=ds_project
)
if not ds_version:
ds_version = dataset.latest_version
dataset_sources = self.warehouse.get_dataset_sources(
dataset,
ds_version,
)
indexed_sources = []
for source in dataset_sources:
client = self.get_client(source, **client_config)
uri = client.uri
dataset_name, _, _, _ = get_listing(uri, self.session)
assert dataset_name
listing = Listing(
self.metastore.clone(),
self.warehouse.clone(),
client,
dataset_name=dataset_name,
)
rows = DatasetQuery(
name=dataset.name,
namespace_name=dataset.project.namespace.name,
project_name=dataset.project.name,
version=ds_version,
catalog=self,
).to_db_records()
indexed_sources.append(
(
listing,
client,
source,
[_row_to_node(r) for r in rows],
ds_name,
ds_version,
) # type: ignore [arg-type]
)
enlisted_sources.append((False, True, indexed_sources))
else:
listing, client, source_path = self.enlist_source(
src, update, client_config=client_config
)
enlisted_sources.append((False, False, (listing, client, source_path)))
node_groups = []
for is_datachain, is_dataset, payload in enlisted_sources: # Opt: parallel
if is_dataset:
for (
listing,
client,
source_path,
nodes,
dataset_name,
dataset_version,
) in payload:
assert listing
dsrc = [DataSource(listing, client, node) for node in nodes]
node_groups.append(
NodeGroup(
listing,
client,
dsrc,
source_path,
dataset_name=dataset_name,
dataset_version=dataset_version,
)
)
elif is_datachain:
for listing, source_path, paths in payload:
assert listing
dsrc = [
DataSource(listing, listing.client, listing.resolve_path(p))
for p in paths
]
node_groups.append(
NodeGroup(
listing,
listing.client,
dsrc,
source_path,
)
)
else:
listing, client, source_path = payload
if not listing:
nodes = [Node.from_file(client.get_file_info(source_path))]
as_container = False
else:
as_container = source_path.endswith("/")
nodes = listing.expand_path(source_path, use_glob=not no_glob)
dsrc = [DataSource(listing, client, n, as_container) for n in nodes]
node_groups.append(NodeGroup(listing, client, dsrc, source_path))
return node_groups
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| def copy(self, cache=True, db=True): | ||
| """ | ||
| Create a shallow copy of this catalog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this kind of copy even needed? Why we just cannot copy the connections (metastore/warehoue objects) as well to avoid extra complexity and potential sources for strange / hard to debug issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is indeed look redundant (there is some logic on CH side where we share connections, but this probably even not related directly). Proper / careful research is needed to remove the whole copy probably. Goes a bit beyond the intention of this PR
(close is still needed, context manager methods like _exit, etc are also good to have to my mind since we have closeable resources anyways)
| self.close() | ||
|
|
||
| @contextmanager | ||
| def _init_guard(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
little bit weird name IMO but cannot think of alternatives though. Maybe _init_cleanup() or _on_failure_cleanup()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw why did we need this as a special method? Maybe it would be easier to just move this code where it's used, since it's used only in one place....it will be clear what it does and saves reader for one jump / search for this method.
So something like:
try:
self._init_meta_table()
self._init_meta_schema_value()
self._check_schema_version()
self._init_tables()
self._init_namespaces_projects()
except Exception:
with suppress(Exception):
self.close_on_exit()
raiseThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, intention was to also use it in the CH warehouse (it requires a separate PR to cleanup resource leaks)
| return self.db.cursor(factory) | ||
|
|
||
| def close(self) -> None: | ||
| if self.is_closed: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think close() is idempotent so we can call it multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, but I think it fine to not abuse it if the call is not needed
|
|
||
| GLOBAL_SESSION_CTX: "Session | None" = None | ||
| SESSION_CONTEXTS: ClassVar[list["Session"]] = [] | ||
| _ALL_SESSIONS: ClassVar[WeakSet["Session"]] = WeakSet() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between _ALL_SESSIONS and SESSION_CONTEXTS ? Aren't they redundant or I'm missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SESSION_CONTEXTS is managed by with Session ... (enter / exit mechanism) to keep the current session at the top of the stack, etc. If someone directly creates a session w/o enter / exit - we sill want to clean it up at exit at least. In certain cases it might be hard to wrap all code into with (e.g. we pass it down and we don't control anymore its lifecycle, etc). It is also not 100% clear that we have to require from user always using with.
Some sessions might be created automatically (e.g. with some custom config) for users.
Even if we decide to make it very strict - it is also a bigger change that goes beyond this PR's intention to my mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, make sense since we shouldn't enforce usage of context manager.
dreadatour
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me overall. A lot of changes, hard to review them all in one go. I am totally OK with merging this and move forward. Added one tiny non-related comment.
| "error::pytest_mock.PytestMockWarning", | ||
| "error::pytest.PytestCollectionWarning", | ||
| "error::sqlalchemy.exc.SADeprecationWarning", | ||
| "error", # Treat all warnings as errors by default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome, thank you! 🔥
| DataSource(listing, client, node, dir_only) for node in nodes | ||
| ) | ||
| return dsrc_all | ||
| ) -> Iterator[list["DataSource"] | None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to improve this in a follow-up. Iterator returning a list or None looks a bit odd to me. I may be wrong, anyway, it looks like it is a bit out of the scope of this PR.
Fix resources cleanup. Mostly to avoid tons of warnings in tests, but also probably avoid some leaks.