Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c6f4586
feat(pymysql): add LOAD DATA LOCAL INFILE bulk-load fast path
cofin Jun 25, 2026
3fd1b4e
feat(asyncmy): add LOAD DATA LOCAL INFILE bulk-load fast path
cofin Jun 25, 2026
c49828b
feat(aiomysql): add LOAD DATA LOCAL INFILE bulk-load fast path
cofin Jun 25, 2026
5b1fe9a
feat(mysqlconnector): add LOAD DATA LOCAL INFILE bulk-load fast path
cofin Jun 25, 2026
fddab5c
feat(oracledb): surface batcherrors/arraydmlrowcounts via execution_args
cofin Jun 25, 2026
192fee6
feat(oracledb): add opt-in direct path load in load_from_arrow
cofin Jun 25, 2026
0aee686
feat(sqlite): wrap load_from_arrow in explicit BEGIN IMMEDIATE transa…
cofin Jun 25, 2026
c4a7617
feat(aiosqlite): wrap load_from_arrow in explicit BEGIN IMMEDIATE tra…
cofin Jun 25, 2026
140326e
test(mssql_python): regression-lock BulkCopy stats and wrapper defaults
cofin Jun 25, 2026
c3cd9ef
feat(bigquery): load_from_storage honors JSON/JSONL/CSV/Avro/ORC
cofin Jun 25, 2026
b7ffe1f
feat(spanner): swap load_from_arrow Batch-DML for insert_or_update mu…
cofin Jun 25, 2026
66274b7
test(contracts): add native bulk-ingest contract family
cofin Jun 25, 2026
8053ccc
feat(bigquery): opt-in Storage Write API Arrow transport for load_fro…
cofin Jun 25, 2026
399fe28
feat(driver): add generic load_from_records storage-bridge method
cofin Jun 25, 2026
a4a6e8d
docs(usage): add bulk ingest capability matrix
cofin Jun 25, 2026
9aebcf2
test(mssql_python): cast fake cursor to satisfy pyright
cofin Jun 25, 2026
515741f
feat(mssql_python): wire bulk_copy into load_from_arrow storage bridge
cofin Jun 25, 2026
9a2bc61
feat(arrow_odbc): add load_from_arrow/load_from_storage via bulk_inse…
cofin Jun 25, 2026
bd61e25
feat(spanner): add opt-in Batch Write API ingest transport
cofin Jun 25, 2026
e9bf844
Add initial test suite for Arrow ODBC adapter
cofin Jun 25, 2026
365a301
feat(adapters): refine native bulk-ingest transports and expand coverage
cofin Jun 25, 2026
df8f6b1
fix(oracledb): execute overwrite truncate via cursor
cofin Jun 25, 2026
a656c43
feat(oracledb): default to direct path bulk loads
cofin Jun 25, 2026
4109491
perf(arrow): use pyarrow constructors for record coercion
cofin Jun 25, 2026
a15186e
feat(bigquery): add load_from_records method and enable storage write…
cofin Jun 25, 2026
0b68195
fix: remove unsupported asyncmy local infile path
cofin Jun 25, 2026
6eeca5b
fix: commit spanner mutation-only write sessions
cofin Jun 25, 2026
2abbfa4
fix: parse quoted table identifiers for bulk ingest
cofin Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ repos:
- id: mixed-line-ending
- id: trailing-whitespace
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: "v0.15.18"
rev: "v0.15.20"
hooks:
- id: ruff
args: ["--fix"]
Expand Down
157 changes: 157 additions & 0 deletions docs/usage/bulk_ingest.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
Bulk Ingest
===========

SQLSpec exposes native bulk-ingest fast paths through a small, adapter-agnostic
storage-bridge API. High-volume writes use each driver's database primitive --
``COPY``, ``LOAD DATA LOCAL INFILE``, direct path load, ``BulkCopy``, Arrow
ingest, load jobs, or mutations -- instead of generic row-by-row execution.

The API
-------

Three methods cover the common shapes. They share return type
``StorageBridgeJob`` (its ``telemetry`` dict reports ``rows_processed``):

- ``load_from_arrow(table, source, *, overwrite=False)`` -- load an Arrow table
(or anything coercible to one) using the adapter's native ingest path.
- ``load_from_storage(table, source, *, file_format, overwrite=False)`` -- load
a staged artifact (a local path or cloud URI) into a table.
- ``load_from_records(table, records, *, columns=None, overwrite=False)`` --
load in-memory rows. ``records`` may be mappings (columns derived from the
keys) or positional sequences (``columns`` required). Records are normalized
and routed through the adapter's native ``load_from_arrow`` path, so every
adapter that supports Arrow ingest supports records too.

.. code-block:: python

# dict records -- columns inferred from keys
driver.load_from_records("orders", [{"id": 1, "total": 9.99}, {"id": 2, "total": 4.50}])

# positional records -- columns required
driver.load_from_records("orders", [(3, 1.0), (4, 2.0)], columns=["id", "total"])

Empty input, mismatched mapping keys, or a positional/column width mismatch
raise :class:`~sqlspec.exceptions.ImproperConfigurationError`.

Capability matrix
-----------------

.. list-table::
:header-rows: 1
:widths: 18 42 22 18

* - Adapter
- Native ingest path
- Transactionality
- Gate / opt-in
* - asyncpg
- ``COPY`` (``copy_records_to_table``)
- Atomic; exact row counts
- Always on
* - psycopg (sync/async)
- ``COPY`` streaming ``write_row``
- Atomic; exact row counts
- Always on
* - psqlpy
- Binary ``COPY`` with ``INSERT`` fallback
- Atomic
- Always on
* - adbc
- ``adbc_ingest`` (append/replace)
- Driver-dependent; FlightSQL falls back to per-row
- Always on
* - duckdb
- ``register`` + ``INSERT ... SELECT``
- Single connection transaction
- Always on
* - sqlite / aiosqlite
- ``executemany`` inside one ``BEGIN IMMEDIATE``
- Atomic when the driver owns the transaction; rolls back on error
- Always on
* - oracledb
- direct path load (Thin mode, default); ``executemany`` fallback
- Per ``execute_many``; array-DML row counts available
- ``enable_direct_path_load=False`` to force fallback; ``oracle_batch_errors`` /
``oracle_array_dml_row_counts`` execution args
* - MySQL family (pymysql, asyncmy, aiomysql, mysql-connector)
- ``executemany`` (default); ``LOAD DATA LOCAL INFILE`` (opt-in)
- Server-managed
- ``enable_local_infile_bulk_load`` + connection ``local_infile`` /
``allow_local_infile``
* - bigquery
- Parquet load job (default); Arrow Storage Write API (opt-in)
- All-or-nothing load job / PENDING write stream
- ``enable_storage_write_api``; load retry/timeout via job-control features
* - spanner
- ``Transaction.insert_or_update`` mutations (upsert); Batch Write API (opt-in)
- In-transaction (default); independently committed groups (Batch Write)
- Always on; ``enable_batch_write_api`` for high-throughput groups
* - mssql-python
- ``cursor.bulkcopy()`` via ``load_from_arrow``
- Driver-managed
- Always on
* - arrow_odbc
- ``bulk_insert_arrow`` via ``load_from_arrow``
- Driver-managed
- Always on

Security and opt-in paths
-------------------------

Some fast paths are opt-in because they read local files or change semantics:

- **MySQL ``LOAD DATA LOCAL INFILE``** requires both the adapter feature
``enable_local_infile_bulk_load`` and the connection's local-infile setting
(``local_infile=True`` for pymysql/aiomysql/asyncmy, ``allow_local_infile=True``
for mysql-connector). Enabling the feature without the connection gate raises
:class:`~sqlspec.exceptions.ImproperConfigurationError` at config construction.
The MySQL server must also have ``local_infile`` enabled. mysql-connector
additionally honors ``allow_local_infile_in_path`` -- the staged temp file must
live under that directory when it is set.
- **Oracle direct path load** is the default bulk-ingest transport in Thin mode.
Set ``enable_direct_path_load=False`` to force ``executemany``. Connections
that do not expose the Direct Path Load API, including Thick-mode connections,
silently fall back to ``executemany``.
- **BigQuery Storage Write API** (``enable_storage_write_api``) streams Arrow
rows for ``load_from_arrow`` appends and falls back to the Parquet load job
when the Storage client is unavailable; ``overwrite=True`` always uses a
Parquet ``WRITE_TRUNCATE`` load job.
- **Spanner Batch Write API** (``enable_batch_write_api``) routes
``load_from_arrow`` through ``Database.mutation_groups().batch_write()`` for
high-throughput, independently committed ``insert_or_update`` groups instead
of a single in-transaction flush. The upsert semantics keep each group
idempotent on replay.

Examples
--------

MySQL ``LOAD DATA LOCAL INFILE``:

.. code-block:: python

from sqlspec.adapters.pymysql import PyMysqlConfig

config = PyMysqlConfig(
connection_config={"host": "localhost", "local_infile": True},
driver_features={"enable_local_infile_bulk_load": True},
)
with config.provide_session() as driver:
driver.load_from_arrow("orders", arrow_table)

Oracle per-call batch error and array-DML row-count reporting:

.. code-block:: python

statement_config = driver.statement_config.replace(
execution_args={"oracle_batch_errors": True, "oracle_array_dml_row_counts": True}
)
result = driver.execute_many(
"INSERT INTO orders (id, total) VALUES (:1, :2)", rows, statement_config=statement_config
)
failures = result.metadata["oracle_batch_errors"] # list of {offset, code, message}
row_counts = result.metadata["oracle_dml_row_counts"] # per-statement affected rows

.. note::

Spanner ``load_from_arrow`` uses ``insert_or_update`` mutations, so re-running
the same rows is an idempotent upsert rather than a primary-key collision.
5 changes: 5 additions & 0 deletions docs/usage/etl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ Supported ``return_format`` values:
- ``"batches"`` -- iterator of ``RecordBatch`` objects
- ``"reader"`` -- ``RecordBatchReader`` for streaming

.. seealso::

:doc:`bulk_ingest` for the inbound side -- loading Arrow tables, staged
files, and in-memory records into a table via native driver primitives.

DuckDB as Staging Layer
-----------------------

Expand Down
1 change: 1 addition & 0 deletions docs/usage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,6 @@ Recommended Path
filtering
testing
etl
bulk_ingest
performance
../extensions/index
8 changes: 8 additions & 0 deletions sqlspec/adapters/aiomysql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from sqlspec.adapters.aiomysql.driver import AiomysqlDriver, AiomysqlExceptionHandler
from sqlspec.config import AsyncDatabaseConfig, ExtensionConfigs
from sqlspec.driver._async import AsyncPoolConnectionContext, AsyncPoolSessionFactory
from sqlspec.exceptions import ImproperConfigurationError
from sqlspec.extensions.events import EventRuntimeHints
from sqlspec.utils.config_tools import normalize_connection_config

Expand Down Expand Up @@ -138,6 +139,7 @@ class AiomysqlDriverFeatures(TypedDict):
on_connection_create: "NotRequired[Callable[[AiomysqlConnection], Awaitable[None]]]"
enable_events: NotRequired[bool]
events_backend: NotRequired[str]
enable_local_infile_bulk_load: NotRequired[bool]


class _AiomysqlSessionFactory(AsyncPoolSessionFactory):
Expand Down Expand Up @@ -251,6 +253,12 @@ def __init__(
# Track initialized connections to ensure callback runs exactly once per physical connection
self._initialized_connections: WeakSet[Any] = WeakSet()

if features_dict.get("enable_local_infile_bulk_load") and not (
connection_config.get("enable_local_infile") or connection_config.get("local_infile")
):
msg = "enable_local_infile_bulk_load requires enable_local_infile=True (or local_infile=True) in connection_config."
raise ImproperConfigurationError(msg)

super().__init__(
connection_config=connection_config,
connection_instance=connection_instance,
Expand Down
32 changes: 30 additions & 2 deletions sqlspec/adapters/aiomysql/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
UniqueViolationError,
)
from sqlspec.utils.serializers import from_json, to_json
from sqlspec.utils.text import quote_backtick_identifier
from sqlspec.utils.text import quote_backtick_identifier, split_qualified_identifier
from sqlspec.utils.type_converters import build_uuid_coercions
from sqlspec.utils.type_guards import has_cursor_metadata, has_lastrowid, has_rowcount

Expand All @@ -36,6 +36,7 @@
"AiomysqlStreamSource",
"apply_driver_features",
"build_insert_statement",
"build_load_data_statement",
"build_profile",
"build_statement_config",
"collect_rows",
Expand All @@ -44,6 +45,7 @@
"detect_json_columns",
"detect_json_columns_from_description",
"driver_profile",
"encode_records_for_local_infile",
"format_identifier",
"normalize_execute_many_parameters",
"normalize_execute_parameters",
Expand Down Expand Up @@ -124,7 +126,7 @@ def format_identifier(identifier: str) -> str:
if not cleaned:
msg = "Table name must not be empty"
raise SQLSpecError(msg)
parts = [part for part in cleaned.split(".") if part]
parts = split_qualified_identifier(cleaned, quote_chars="`", allow_bracket_quotes=False)
formatted = ".".join(quote_backtick_identifier(part) for part in parts)
return formatted or quote_backtick_identifier(cleaned)

Expand All @@ -135,6 +137,32 @@ def build_insert_statement(table: str, columns: "list[str]") -> str:
return f"INSERT INTO {format_identifier(table)} ({column_clause}) VALUES ({placeholders})"


def encode_records_for_local_infile(records: "list[tuple[Any, ...]]") -> bytes:
lines: list[str] = []
for record in records:
fields: list[str] = []
for value in record:
if value is None:
fields.append("\\N")
continue
if isinstance(value, bool):
value = int(value)
text = value if isinstance(value, str) else str(value)
text = text.replace("\\", "\\\\").replace("\t", "\\t").replace("\n", "\\n").replace("\r", "\\r")
fields.append(text)
lines.append("\t".join(fields))
return ("\n".join(lines) + "\n").encode("utf-8")


def build_load_data_statement(table: str, columns: "list[str]", file_path: str) -> str:
column_list = ", ".join(format_identifier(column) for column in columns)
return (
f"LOAD DATA LOCAL INFILE '{file_path}' INTO TABLE {format_identifier(table)} "
"CHARACTER SET utf8mb4 FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' "
f"LINES TERMINATED BY '\\n' ({column_list})"
)


def normalize_execute_parameters(parameters: Any) -> Any:
"""Normalize parameters for aiomysql execute calls.

Expand Down
43 changes: 32 additions & 11 deletions sqlspec/adapters/aiomysql/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
rather than a driver-specific error module.
"""

import tempfile
from collections.abc import Sized
from pathlib import Path
from typing import TYPE_CHECKING, Any, Final, cast

import pymysql.err # pyright: ignore
Expand All @@ -17,11 +19,13 @@
from sqlspec.adapters.aiomysql.core import (
AiomysqlStreamSource,
build_insert_statement,
build_load_data_statement,
collect_rows,
create_mapped_exception,
default_statement_config,
detect_json_columns_from_description,
driver_profile,
encode_records_for_local_infile,
format_identifier,
normalize_execute_many_parameters,
normalize_execute_parameters,
Expand Down Expand Up @@ -314,17 +318,34 @@ async def load_from_arrow(

columns, records = self._arrow_table_to_rows(arrow_table)
if records:
insert_sql = build_insert_statement(table, columns)
prepared_records = (
self.prepare_driver_parameters(records, self.statement_config, is_many=True)
if self._arrow_table_needs_parameter_preparation(arrow_table)
else records
)
exc_handler = self.handle_database_exceptions()
async with exc_handler, self.with_cursor(self.connection) as cursor:
await cursor.executemany(insert_sql, prepared_records)
if exc_handler.pending_exception is not None:
raise exc_handler.pending_exception from None
needs_preparation = self._arrow_table_needs_parameter_preparation(arrow_table)
use_infile = bool(self.driver_features.get("enable_local_infile_bulk_load")) and not needs_preparation
if use_infile:
payload = encode_records_for_local_infile(records)
with tempfile.NamedTemporaryFile(mode="wb", suffix=".tsv", delete=False) as tmp:
tmp.write(payload)
tmp_name = tmp.name
try:
load_sql = build_load_data_statement(table, columns, tmp_name)
exc_handler = self.handle_database_exceptions()
async with exc_handler, self.with_cursor(self.connection) as cursor:
await cursor.execute(load_sql)
if exc_handler.pending_exception is not None:
raise exc_handler.pending_exception from None
finally:
Path(tmp_name).unlink(missing_ok=True) # noqa: ASYNC240
else:
insert_sql = build_insert_statement(table, columns)
prepared_records = (
self.prepare_driver_parameters(records, self.statement_config, is_many=True)
if needs_preparation
else records
)
exc_handler = self.handle_database_exceptions()
async with exc_handler, self.with_cursor(self.connection) as cursor:
await cursor.executemany(insert_sql, prepared_records)
if exc_handler.pending_exception is not None:
raise exc_handler.pending_exception from None

telemetry_payload = self._build_ingest_telemetry(arrow_table)
telemetry_payload["destination"] = table
Expand Down
7 changes: 3 additions & 4 deletions sqlspec/adapters/aiosqlite/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
UniqueViolationError,
)
from sqlspec.utils.serializers import from_json, to_json
from sqlspec.utils.text import quote_identifier
from sqlspec.utils.text import quote_identifier, split_qualified_identifier
from sqlspec.utils.type_converters import build_decimal_converter, build_uuid_coercions, time_iso_convert
from sqlspec.utils.type_guards import has_rowcount, has_sqlite_error

Expand Down Expand Up @@ -90,9 +90,8 @@ def format_identifier(identifier: str) -> str:
if not cleaned:
msg = "Table name must not be empty"
raise SQLSpecError(msg)
if "." not in cleaned:
return quote_identifier(cleaned)
return ".".join(quote_identifier(part) for part in cleaned.split(".") if part)
parts = split_qualified_identifier(cleaned, quote_chars='"')
return ".".join(quote_identifier(part) for part in parts)


def build_insert_statement(table: str, columns: "list[str]") -> str:
Expand Down
Loading
Loading