Skip to content

Commit fb81683

Browse files
committed
Lazy load current records
Why these changes are being introduced: Current TIMDEX records has been a consistent source of complexity and performance concerns. And, one of the defining features of TDA, so it's worth getting right. The previous approach was to materialize lightweight metadata records about all current versions of each record in memory as a DuckDB temp table. This made repeated read queries pulling only current records more efficient, but was unnecessarily loading that data into memory for operations like writing data or reading a specific run (not current record associated). It turns out that reading current records is somewhat rare, and when it does happen, it's usually a one-off request as part of a larger operation like re-indexing a source in TIM. How this addresses that need: The new approach is a hybrid between a view only (lazy evaluation) and a temporary table in memory (eager evaluation). By default, a view is created, which either a) does not get used or b) is used but only 1-2 times per session and the lazy evaluation of a view is okay. Alternatively, TIMDEXDataset can be initialized with 'preload_current_records=True' if it's known that a multiple requests for current records will be needed in the session and it's worth the time and memory hit upfront. Side effects of this change: * For most operations in the TIMDEX ETL pipeline, which don't use current records, the load time and memory usage is fairly dramatically decreased. Relevant ticket(s): - https://mitlibraries.atlassian.net/browse/USE-58 - prep work for new methods
1 parent 279765b commit fb81683

File tree

4 files changed

+118
-27
lines changed

4 files changed

+118
-27
lines changed

tests/test_dataset.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,14 @@ def test_dataset_duckdb_context_creates_data_schema(timdex_dataset):
259259
).fetchone()[0]
260260
== 1
261261
)
262+
263+
264+
def test_dataset_preload_current_records_default_false(timdex_dataset):
265+
assert timdex_dataset.preload_current_records is False
266+
assert timdex_dataset.metadata.preload_current_records is False
267+
268+
269+
def test_dataset_preload_current_records_flag_true(tmp_path):
270+
td = TIMDEXDataset(str(tmp_path), preload_current_records=True)
271+
assert td.preload_current_records is True
272+
assert td.metadata.preload_current_records is True

tests/test_metadata.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from duckdb import DuckDBPyConnection
88

9-
from timdex_dataset_api import TIMDEXDatasetMetadata
9+
from timdex_dataset_api import TIMDEXDataset, TIMDEXDatasetMetadata
1010

1111
ORDERED_METADATA_COLUMN_NAMES = [
1212
"timdex_record_id",
@@ -388,3 +388,51 @@ def test_tdm_prepare_duckdb_secret_and_extensions_home_env_var_set_but_empty(
388388
)
389389
assert df.secret_directory == "/tmp/.duckdb/secrets"
390390
assert df.extension_directory == "/tmp/.duckdb/extensions"
391+
392+
393+
def test_tdm_preload_current_records_default_false(tmp_path):
394+
tdm = TIMDEXDatasetMetadata(str(tmp_path))
395+
assert tdm.preload_current_records is False
396+
397+
398+
def test_tdm_preload_current_records_flag_true(tmp_path):
399+
tdm = TIMDEXDatasetMetadata(str(tmp_path), preload_current_records=True)
400+
assert tdm.preload_current_records is True
401+
402+
403+
def test_tdm_preload_false_no_temp_table(timdex_dataset_with_runs):
404+
# instantiate TIMDEXDataset without preloading current records (default)
405+
td = TIMDEXDataset(timdex_dataset_with_runs.location)
406+
407+
# assert that materialized, temporary table "temp.current_records" does not exist
408+
temp_table_count = td.metadata.conn.query(
409+
"""
410+
select count(*)
411+
from information_schema.tables
412+
where table_catalog = 'temp'
413+
and table_name = 'current_records'
414+
and table_type = 'LOCAL TEMPORARY'
415+
;
416+
"""
417+
).fetchone()[0]
418+
419+
assert temp_table_count == 0
420+
421+
422+
def test_tdm_preload_true_has_temp_table(timdex_dataset_with_runs):
423+
# instantiate TIMDEXDataset with preloading current records
424+
td = TIMDEXDataset(timdex_dataset_with_runs.location, preload_current_records=True)
425+
426+
# assert that materialized, temporary table "temp.current_records" does exist
427+
temp_table_count = td.metadata.conn.query(
428+
"""
429+
select count(*)
430+
from information_schema.tables
431+
where table_catalog = 'temp'
432+
and table_name = 'current_records'
433+
and table_type = 'LOCAL TEMPORARY'
434+
;
435+
"""
436+
).fetchone()[0]
437+
438+
assert temp_table_count == 1

timdex_dataset_api/dataset.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,22 @@ class TIMDEXDataset:
107107
def __init__(
108108
self,
109109
location: str,
110+
*,
110111
config: TIMDEXDatasetConfig | None = None,
112+
preload_current_records: bool = False,
111113
):
112114
"""Initialize TIMDEXDataset object.
113115
114116
Args:
115-
location (str ): Local filesystem path or an S3 URI to a parquet dataset.
117+
location: Local filesystem path or an S3 URI to a parquet dataset.
118+
config: Optional TIMDEXDatasetConfig instance.
119+
preload_current_records: if True, create in-memory temp table for
120+
current_records (faster for repeated queries); if False, create view only
121+
(default, lower memory)
116122
"""
117123
self.config = config or TIMDEXDatasetConfig()
118124
self.location = location
125+
self.preload_current_records = preload_current_records
119126

120127
self.create_data_structure()
121128

@@ -125,7 +132,10 @@ def __init__(
125132
self.dataset = self.load_pyarrow_dataset()
126133

127134
# dataset metadata
128-
self.metadata = TIMDEXDatasetMetadata(location)
135+
self.metadata = TIMDEXDatasetMetadata(
136+
location,
137+
preload_current_records=preload_current_records,
138+
)
129139

130140
# DuckDB context
131141
self.conn = self.setup_duckdb_context()
@@ -145,7 +155,11 @@ def data_records_root(self) -> str:
145155

146156
def refresh(self) -> None:
147157
"""Fully reload TIMDEXDataset instance."""
148-
self.__init__(self.location) # type: ignore[misc]
158+
self.__init__( # type: ignore[misc]
159+
self.location,
160+
config=self.config,
161+
preload_current_records=self.preload_current_records,
162+
)
149163

150164
def create_data_structure(self) -> None:
151165
"""Ensure ETL records data structure exists in TIMDEX dataset."""

timdex_dataset_api/metadata.py

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,20 @@ class TIMDEXDatasetMetadata:
6060
def __init__(
6161
self,
6262
location: str,
63+
*,
64+
preload_current_records: bool = False,
6365
) -> None:
6466
"""Init TIMDEXDatasetMetadata.
6567
6668
Args:
6769
location: root location of TIMDEX dataset, e.g. 's3://timdex/dataset'
70+
preload_current_records: if True, create in-memory temp table for
71+
current_records (faster for repeated queries); if False, create view only
72+
(default, lower memory)
6873
"""
6974
self.location = location
7075
self.config = TIMDEXDatasetMetadataConfig()
76+
self.preload_current_records = preload_current_records
7177

7278
self.create_metadata_structure()
7379
self.conn: DuckDBPyConnection = self.setup_duckdb_context()
@@ -444,26 +450,20 @@ def _create_current_records_view(self, conn: DuckDBPyConnection) -> None:
444450
dataset. With the metadata provided from this view, we can streamline data
445451
retrievals in TIMDEXDataset read methods.
446452
447-
For performance reasons, the final view reads from a DuckDB temporary table that
448-
is constructed, "temp.main.current_records". Because our connection is in memory,
449-
the data in this temporary table is mostly in memory but has the ability to spill
450-
to disk if we risk getting too close to our memory constraints. We explicitly
451-
set the temporary location on disk for DuckDB at "/tmp" to play nice with contexts
452-
like AWS ECS or Lambda, where sometimes the $HOME env var is missing; DuckDB
453-
often tries to utilize the user's home directory and this works around that.
453+
By default, creates a view only (lazy evaluation). If
454+
preload_current_records=True, creates a temp table for better performance
455+
for repeated queries.
456+
457+
For temp table mode, the data is mostly in memory but has the ability to spill to
458+
disk if we risk getting too close to our memory constraints. We explicitly set the
459+
temporary location on disk for DuckDB at "/tmp" to play nice with contexts like
460+
AWS ECS or Lambda, where sometimes the $HOME env var is missing; DuckDB often
461+
tries to utilize the user's home directory and this works around that.
454462
"""
455463
logger.info("creating view of current records metadata")
456464

457-
conn.execute(
458-
"""
459-
set temp_directory = '/tmp';
460-
"""
461-
)
462-
463-
conn.execute(
464-
"""
465-
-- create temp table with current records using CTEs
466-
create or replace temp table temp.main.current_records as
465+
# SQL for the current records logic (CTEs)
466+
current_records_query = """
467467
with
468468
-- CTE of run_timestamp for last source full run
469469
cr_source_last_full as (
@@ -502,13 +502,31 @@ def _create_current_records_view(self, conn: DuckDBPyConnection) -> None:
502502
select
503503
* exclude (rn)
504504
from cr_ranked_records
505-
where rn = 1;
505+
where rn = 1
506+
"""
506507

507-
-- create view in metadata schema
508-
create or replace view metadata.current_records as
509-
select * from temp.main.current_records;
510-
"""
511-
)
508+
# create temp table (materializes in memory)
509+
if self.preload_current_records:
510+
conn.execute("set temp_directory = '/tmp';")
511+
conn.execute(
512+
f"""
513+
create or replace temp table temp.main.current_records as
514+
{current_records_query};
515+
516+
-- create view in metadata schema that points to temp table
517+
create or replace view metadata.current_records as
518+
select * from temp.main.current_records;
519+
"""
520+
)
521+
522+
# create view only (lazy evaluation)
523+
else:
524+
conn.execute(
525+
f"""
526+
create or replace view metadata.current_records as
527+
{current_records_query};
528+
"""
529+
)
512530

513531
def merge_append_deltas(self) -> None:
514532
"""Merge append deltas into the static metadata database file."""

0 commit comments

Comments
 (0)