11import logging
22import os
33import sqlite3
4- import sys
5- import time
6- import traceback
7- import weakref
84from collections .abc import Callable , Iterable , Sequence
95from contextlib import contextmanager
106from functools import cached_property , wraps
6460RETRY_MAX_TIMES = 10
6561RETRY_FACTOR = 2
6662
67-
68- def _env_flag (name : str ) -> bool :
69- value = os .environ .get (name )
70- if value is None :
71- return False
72- return value .lower () in {"1" , "true" , "yes" , "on" }
73-
74-
75- LOG_CONNECTION_EVENTS = _env_flag ("DATACHAIN_LOG_SQLITE_CONNECTIONS" )
76- MAX_CONNECTION_EVENT_HISTORY = int (
77- os .environ .get ("DATACHAIN_CONNECTION_EVENT_HISTORY" , "256" )
78- )
79-
8063DETECT_TYPES = sqlite3 .PARSE_DECLTYPES | sqlite3 .PARSE_COLNAMES
8164
8265datachain .sql .sqlite .setup ()
@@ -134,53 +117,6 @@ class SQLiteDatabaseEngine(DatabaseEngine):
134117 db : sqlite3 .Connection
135118 db_file : str | None
136119 is_closed : bool
137- _open_connections : ClassVar [dict [int , dict [str , Any ]]] = {}
138- _connection_events : ClassVar [list [dict [str , Any ]]] = []
139- _max_connection_events : ClassVar [int ] = MAX_CONNECTION_EVENT_HISTORY
140-
141- @classmethod
142- def _record_connection_event (
143- cls ,
144- * ,
145- action : str ,
146- connection_id : int ,
147- engine_id : int | None ,
148- db_file : str | None ,
149- reason : str | None ,
150- creation_stack : list [str ] | None ,
151- ) -> None :
152- if cls ._max_connection_events <= 0 :
153- return
154- event = {
155- "timestamp" : time .time (),
156- "action" : action ,
157- "connection_id" : connection_id ,
158- "engine_id" : engine_id ,
159- "db_file" : db_file ,
160- "reason" : reason ,
161- "creation_stack" : creation_stack ,
162- "event_stack" : traceback .format_stack (limit = 10 ),
163- }
164- cls ._connection_events .append (event )
165- overflow = len (cls ._connection_events ) - cls ._max_connection_events
166- if overflow > 0 :
167- del cls ._connection_events [:overflow ]
168- if LOG_CONNECTION_EVENTS :
169- logger .info (
170- "SQLite connection %s (conn=%s engine=%s reason=%s db=%s)" ,
171- action ,
172- connection_id ,
173- engine_id ,
174- reason ,
175- db_file ,
176- )
177-
178- @classmethod
179- def connection_event_log (cls , * , clear : bool = False ) -> list [dict [str , Any ]]:
180- events = list (cls ._connection_events )
181- if clear :
182- cls ._connection_events .clear ()
183- return events
184120
185121 def __init__ (
186122 self ,
@@ -196,7 +132,6 @@ def __init__(
196132 self .db_file = db_file
197133 self .is_closed = False
198134 self .max_variable_number = max_variable_number
199- self ._register_open_connection (reason = "init" )
200135
201136 @classmethod
202137 def from_db_file (cls , db_file : str | None = None ) -> "SQLiteDatabaseEngine" :
@@ -270,85 +205,6 @@ def clone_params(self) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]:
270205 def serialize_callable_name (cls ) -> str :
271206 return "sqlite.from_db_file"
272207
273- def _connection_key (self ) -> int :
274- return id (self .db )
275-
276- def _register_open_connection (self , * , reason : str ) -> None :
277- if getattr (self , "db" , None ) is None :
278- return
279- cls = type (self )
280- key = self ._connection_key ()
281- try :
282- engine_ref = weakref .ref (self )
283- except TypeError : # pragma: no cover - defensive
284- engine_ref = None
285- stack = traceback .format_stack (limit = 10 )
286- meta = {
287- "engine" : engine_ref ,
288- "db_file" : self .db_file ,
289- "engine_id" : id (self ),
290- "reason" : reason ,
291- "created_at" : time .time (),
292- "stack" : stack ,
293- }
294- cls ._open_connections [key ] = meta
295- cls ._record_connection_event (
296- action = "open" ,
297- connection_id = key ,
298- engine_id = id (self ),
299- db_file = self .db_file ,
300- reason = reason ,
301- creation_stack = stack ,
302- )
303-
304- def _unregister_open_connection (self ) -> None :
305- if getattr (self , "db" , None ) is None :
306- return
307- cls = type (self )
308- key = self ._connection_key ()
309- meta = cls ._open_connections .pop (key , None )
310- cls ._record_connection_event (
311- action = "close" ,
312- connection_id = key ,
313- engine_id = (meta or {}).get ("engine_id" ),
314- db_file = (meta or {}).get ("db_file" ),
315- reason = (meta or {}).get ("reason" ),
316- creation_stack = (meta or {}).get ("stack" ),
317- )
318-
319- @classmethod
320- def open_connection_snapshot (cls ) -> list [dict [str , Any ]]:
321- snapshot : list [dict [str , Any ]] = []
322- stale_keys : list [int ] = []
323- for key , meta in cls ._open_connections .items ():
324- engine_ref = meta .get ("engine" )
325- engine = (
326- engine_ref () if isinstance (engine_ref , weakref .ReferenceType ) else None
327- )
328- if engine is None or engine .is_closed :
329- stale_keys .append (key )
330- continue
331- conn = engine .db
332- refcount = sys .getrefcount (conn ) if conn is not None else None
333- snapshot .append (
334- {
335- "connection_id" : key ,
336- "db_file" : meta .get ("db_file" ),
337- "engine_id" : meta .get ("engine_id" ),
338- "reason" : meta .get ("reason" ),
339- "created_at" : meta .get ("created_at" ),
340- "refcount" : refcount ,
341- "stack" : meta .get ("stack" ),
342- }
343- )
344- for key in stale_keys :
345- cls ._open_connections .pop (key , None )
346- return snapshot
347-
348- @classmethod
349- def count_open_connections (cls ) -> int :
350- return len (cls .open_connection_snapshot ())
351-
352208 def _reconnect (self ) -> None :
353209 if not self .is_closed :
354210 raise RuntimeError ("Cannot reconnect on still-open DB!" )
@@ -361,7 +217,6 @@ def _reconnect(self) -> None:
361217 self .db_file = db_file
362218 self .max_variable_number = max_variable_number
363219 self .is_closed = False
364- self ._register_open_connection (reason = "reconnect" )
365220
366221 def get_table (self , name : str ) -> Table :
367222 if self .is_closed :
@@ -440,7 +295,6 @@ def close(self) -> None:
440295 return
441296 self .db .close ()
442297 self .is_closed = True
443- self ._unregister_open_connection ()
444298
445299 @contextmanager
446300 def transaction (self ):
0 commit comments