feat: native bulk ingest paths across adapters#540
Merged
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #540 +/- ##
==========================================
+ Coverage 75.35% 75.54% +0.19%
==========================================
Files 444 444
Lines 58256 58791 +535
Branches 9030 9132 +102
==========================================
+ Hits 43896 44412 +516
+ Misses 11514 11506 -8
- Partials 2846 2873 +27
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Add enable_local_infile_bulk_load driver feature that routes load_from_arrow through LOAD DATA LOCAL INFILE when local_infile=True. TSV encoding helpers duplicated in pymysql/core.py; config raises ImproperConfigurationError at construction when the gate is unmet.
Add enable_local_infile_bulk_load driver feature routing load_from_arrow through LOAD DATA LOCAL INFILE; consumes the existing allow_local_infile gate (effective local_infile must be True). TSV helpers duplicated in asyncmy/core.py; config raises ImproperConfigurationError when unmet.
Add enable_local_infile_bulk_load driver feature routing load_from_arrow through LOAD DATA LOCAL INFILE; gate requires enable_local_infile=True (or local_infile=True). TSV helpers duplicated in aiomysql/core.py.
Add enable_local_infile_bulk_load driver feature to both sync and async drivers, routing load_from_arrow through LOAD DATA LOCAL INFILE. Gate requires allow_local_infile=True; both config classes raise ImproperConfigurationError when unmet. TSV helpers duplicated in core.py.
execute_many now reads oracle_batch_errors / oracle_array_dml_row_counts from statement_config.execution_args (asyncpg COPY precedent). Errors surface on SQLResult.metadata[oracle_batch_errors]; array DML counts on [oracle_dml_row_counts] with rowcount_override = sum(counts). Default path passes batcherrors=False/arraydmlrowcounts=False (unchanged).
Add enable_direct_path_load driver feature routing load_from_arrow through Connection.direct_path_load (Thin-mode only). Runtime guard (hasattr + thin) falls back to executemany for Thick mode or older drivers. Schema resolved from qualified table or connection.username.
…ction When the driver owns the transaction (connection not already in one), overwrite-DELETE and executemany run inside one BEGIN IMMEDIATE -> commit, with rollback on sqlite3.Error. Caller-managed transactions are left untouched (no commit/rollback). Makes Arrow ingest atomic.
…nsaction Async mirror of the sqlite transaction wrap: overwrite-DELETE and executemany run inside one BEGIN IMMEDIATE -> commit when the driver owns the transaction, with rollback on aiosqlite/sqlite3 errors. Caller-owned transactions are passed through untouched.
Lock _coerce_bulk_copy_result stat passthrough (incl. unknown future keys) and rowcount fallback, and assert bulk_copy() defaults match upstream cursor.bulkcopy() for both sync and async wrappers. Tests only; no production change (behavior already shipped).
Extend StorageFormat with avro/orc, map csv/avro/orc in _map_bigquery_source_format, and drop the stale parquet-only guard in load_from_storage so the generic storage bridge routes every BigQuery load_table_from_uri source format. arrow-ipc still raises. No new public surface.
…tations Replace the per-row INSERT...VALUES batch_update construction in load_from_arrow with chunked Transaction.insert_or_update mutations (<=20k cells per flush). Upsert semantics make re-runs idempotent; no database handle, no new public surface. overwrite still DELETEs first. Update _SpannerWriteProtocol.
Add supports_native_bulk_ingest DriverCase flag (True for the six native-COPY/Arrow adapters: asyncpg, psycopg sync/async, psqlpy, adbc, duckdb) plus paired sync/async behavior asserts covering row-count fidelity, overwrite, and append. Non-flagged adapters skip. Error-surfacing is intentionally not asserted: duckdb/adbc propagate raw driver exceptions rather than SQLSpecError, so a portable assertion is not possible without driver-side exception mapping (out of scope here).
…m_arrow Add enable_storage_write_api driver feature. When enabled (and not overwrite), load_from_arrow ingests via a PENDING BigQuery Storage write stream using native arrow_rows (no Parquet round-trip): create stream -> append serialized Arrow batches -> finalize -> batch-commit. Falls back to the Parquet load job on ImportError; overwrite always uses Parquet WRITE_TRUNCATE. Default-off, so existing behavior is unchanged. The streaming/serialization path is covered by unit tests for call orchestration and fallbacks; it needs validation against a live BigQuery instance (the emulator does not implement the Storage Write API).
Add load_from_records(table, records, *, columns=None, overwrite=False) to the sync and async driver base classes. Records (dict or positional + columns) are normalized via _records_to_arrow_table and routed through each adapter's native load_from_arrow path (COPY, executemany, Spanner mutations, etc.), so every adapter with load_from_arrow gains it for free and mssql_python/arrow_odbc keep raising. Empty/mismatched input raises ImproperConfigurationError. Add supports_load_from_records contract coverage (dict + positional + input validation).
New docs/usage/bulk_ingest.rst documents load_from_arrow / load_from_storage / load_from_records, a per-adapter capability matrix, the security/opt-in gates (MySQL local infile, Oracle direct path load, BigQuery Storage Write API), and MySQL/Oracle examples. Wired into the usage toctree with a seealso cross-link from the ETL page.
491f90c to
a4a6e8d
Compare
_coerce_bulk_copy_result expects MssqlPythonRawCursor; cast the test's _FakeCursor at the call sites so pyright (make type-check) passes.
Add load_from_arrow / load_from_storage overrides (sync + async) that ingest Arrow data through cursor.bulkcopy(), and flip supports_native_arrow_import. overwrite issues DELETE first. This also lights up the generic load_from_records for SQL Server via the base delegation path.
…rt_arrow Wire arrow-odbc's bulk_insert_arrow into the storage-bridge load_from_arrow / load_from_storage methods (overwrite issues DELETE first). This also enables the generic load_from_records for arrow-odbc through the base delegation path.
Add enable_batch_write_api driver feature. When set, load_from_arrow routes through Database.mutation_groups().batch_write() (one mutation group per <=20k-cell chunk) for high-throughput, independently committed insert_or_update groups; the Database is reached from the connection's session. Default path (single in-transaction insert_or_update) is unchanged. Upsert semantics keep replay idempotent. Docs: refresh the bulk-ingest matrix for the mssql/arrow_odbc load_from_arrow wiring and the Spanner Batch Write opt-in.
BigQuery Storage Write API now splits AppendRowsRequest payloads to stay under the 20MB append limit, surfacing an oversized single row as a storage error, and the load-job source-format mapping is broadened. Spanner insert_or_update mutation batching is tidied. Records-to-Arrow normalization moves into the shared arrow/storage helpers so compiled drivers reuse it. Adds unit coverage for the MySQL family LOAD DATA LOCAL INFILE and executemany paths (pymysql, aiomysql, asyncmy, mysql-connector), the Oracle async direct-path-load branch, the sqlite/aiosqlite overwrite and rollback transaction paths, and the BigQuery, Spanner, and load_from_records helpers.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Expose native bulk-ingest fast paths across every adapter behind the existing storage-bridge surface (
load_from_arrow/load_from_storage/ newload_from_records) — no new database-specific public APIs. High-volume writes use each driver's primitive (COPY, LOAD DATA, direct path load, BulkCopy, Arrow ingest, load jobs, mutations) instead of generic row-by-row execution. Builds on the cloud job session controls already onmain.Each change is landed test-first (unit + integration) and is independently revertable.
What's included
PostgreSQL / SQLite already-native paths — covered by a new
supports_native_bulk_ingestcontract family (row-count fidelity, overwrite, append) for asyncpg, psycopg (sync/async), psqlpy, adbc, duckdb.MySQL family — opt-in
LOAD DATA LOCAL INFILE(pymysql, asyncmy, aiomysql, mysql-connector sync+async) viaenable_local_infile_bulk_load, gated on the connection's local-infile setting (raises at config construction otherwise).Oracle — per-call
oracle_batch_errors/oracle_array_dml_row_countsonexecute_many(surfaced onSQLResult.metadata); opt-inenable_direct_path_load(Thin mode) forload_from_arrow.SQLite / aiosqlite —
load_from_arrowwrapped in oneBEGIN IMMEDIATEtransaction when the driver owns it (atomic ingest; caller transactions pass through).BigQuery —
load_from_storagehonors Parquet/JSON/JSONL/CSV/Avro/ORC; opt-inenable_storage_write_apiArrow Storage Write API transport forload_from_arrow(Parquet fallback).Spanner —
load_from_arrowuses chunkedTransaction.insert_or_updatemutations (idempotent upsert); opt-inenable_batch_write_apiroutes throughDatabase.mutation_groups().batch_write()for high-throughput groups.SQL Server (mssql-python) —
load_from_arrow/load_from_storagewired tocursor.bulkcopy()(sync + async); BulkCopy stats regression-locked.arrow-odbc —
load_from_arrow/load_from_storagewired tobulk_insert_arrow.Cross-adapter — new generic
load_from_records(table, records, *, columns=None, overwrite=False)on the sync/async base classes: dict or positional records are normalized and routed through each adapter's nativeload_from_arrow, so every Arrow-capable adapter (now including mssql-python and arrow-odbc) supports it. Invalid input raisesImproperConfigurationError. Covered by asupports_load_from_recordscontract family.Docs —
docs/usage/bulk_ingest.rstcapability matrix + opt-in/security gates + examples.Notes for review
UNIMPLEMENTED.Testing
All new unit tests pass locally; SQLite/aiosqlite atomicity,
load_from_records, and the duckdb/adbc-duckdb contract cases run containerless and pass. Docs build clean undersphinx-build -W.make type-check(mypy + pyright) is clean on the changed modules. Container/cloud integration tests run in CI.