Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/reference/adapters/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,51 @@ BigQuery

Google BigQuery adapter with Arrow result support.

Job Controls
============

BigQuery job behavior is configured on ``BigQueryConfig.driver_features`` and
then applied by the normal driver methods. SQLSpec does not expose a separate
``execute_with_job()`` method, and per-call job options are not threaded through
generic ``execute(**kwargs)``.

``job_retry_deadline`` controls retry construction for BigQuery query, load, and
export jobs. The default is ``60.0`` seconds. Values less than or equal to zero
disable the BigQuery job retry wrapper by passing ``job_retry=None`` to the
client call.

``job_result_timeout`` bounds waits on ``QueryJob.result()`` and load job
completion. ``request_timeout`` bounds the initial BigQuery API request; when it
is omitted, SQLSpec derives a request timeout from ``job_result_timeout`` when
that value is numeric.

``use_query_and_wait=True`` switches simple query execution from
``Client.query()`` plus ``QueryJob.result()`` to ``Client.query_and_wait()``.
The public SQLSpec call remains ``execute()``, ``select_*()``, or
``select_to_arrow()``; configured retry and timeout values are applied
transparently.

Load Jobs
=========

``load_from_arrow()`` and ``load_from_storage()`` use the configured BigQuery
job retry and timeout controls for the load request and load-job completion.
Use ``job_retry_deadline=0`` when running against emulators or other endpoints
where retrying invalid or unsupported jobs would only extend failures.

Result Exports
==============

BigQuery result export uses the existing ``select_to_storage()`` method. There
is no public ``export_table_to_storage()`` API. ``select_to_storage()`` runs the
query with the configured job controls, waits for the query job according to the
configured result timeout, and then writes through the selected storage bridge
destination.

Billing follows BigQuery query-job behavior: exporting query results still runs
the SQL statement, so the query can scan and bill data before SQLSpec writes the
result to local or object storage.

Configuration
=============

Expand Down
63 changes: 63 additions & 0 deletions docs/reference/adapters/spanner.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,69 @@ Spanner
Google Cloud Spanner adapter using the Spanner client library with session
pool management.

Request And Session Controls
============================

Spanner request behavior stays on the existing execution APIs. SQLSpec does not
expose public ``execute_with_options()``, ``execute_partitioned_dml()``,
``apply_mutations()``, or ``provide_batch_snapshot()`` methods.

Default request controls can be configured through
``SpannerSyncConfig.driver_features``:

``request_options``
Forwarded to Spanner ``execute_sql()``, ``execute_update()``, and
``batch_update()`` calls. Use this for request tags, transaction tags, and
priority options supported by the Google Cloud Spanner client.

``directed_read_options``
Forwarded only to read calls that use ``execute_sql()``. Directed reads are
not forwarded to DML calls.

``retry`` and ``timeout``
Forwarded to Spanner statement execution calls when provided.

Per-call overrides use the existing ``execute()``, ``execute_many()``, and
``execute_script()`` methods:

.. code-block:: python

result = driver.execute(
"SELECT id FROM users WHERE id = @id",
{"id": "u-1"},
request_options={"request_tag": "users.lookup"},
directed_read_options=directed_read_options,
timeout=10.0,
)

``directed_read_options`` only applies to read statements. The driver accepts
the argument for a DML statement so call sites can share option plumbing, but it
does not forward directed-read options to ``execute_update()`` or
``batch_update()``.

Session-Scoped Controls
=======================

``SpannerSyncConfig.provide_session()`` also accepts explicit Spanner controls
for the returned session context:

.. code-block:: python

with config.provide_session(
request_options={"transaction_tag": "orders.write"},
retry=retry,
timeout=20.0,
) as driver:
driver.execute("UPDATE orders SET status = @status WHERE id = @id", params)

The explicit ``provide_session()`` arguments are copied into the returned
driver's feature set and do not mutate ``config.driver_features``. They also do
not hide a ``database_provider`` feature for unrelated database-level methods.

``provide_read_session()`` is the read-only helper for single-use snapshot
reads. For DDL, DML, and write-capable transactions, use ``provide_session()``
or ``provide_write_session()``.

Configuration
=============

Expand Down
4 changes: 4 additions & 0 deletions sqlspec/adapters/bigquery/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ class BigQueryDriverFeatures(TypedDict):
job_result_timeout: Timeout (seconds) for polling ``QueryJob.result()``. Defaults to the
client polling default (waits indefinitely for the job using the API's per-call default
timeouts). Also used as the per-request HTTP timeout when ``request_timeout`` is unset.
use_query_and_wait: Use ``Client.query_and_wait()`` for single-statement queries.
Pair with ``connection_config["default_job_creation_mode"]="JOB_CREATION_OPTIONAL"``
to allow short queries to run without creating a job. Defaults to False.
request_timeout: Per-request HTTP transport timeout (seconds) for the API calls that start
query jobs. Bounds each request so a server that accepts the request but never responds
(e.g. a wedged emulator) raises instead of blocking indefinitely. Defaults to
Expand All @@ -111,6 +114,7 @@ class BigQueryDriverFeatures(TypedDict):
events_backend: NotRequired[str]
job_retry_deadline: NotRequired[float]
job_result_timeout: NotRequired[float]
use_query_and_wait: NotRequired[bool]
request_timeout: NotRequired[float]
query_page_size: NotRequired[int]
query_max_results: NotRequired[int]
Expand Down
57 changes: 55 additions & 2 deletions sqlspec/adapters/bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def try_bulk_insert(
expression: "exp.Expr | None" = None,
*,
allow_parse: bool = True,
result_timeout: float | None = None,
) -> "int | None":
"""Attempt bulk insert via Parquet load.

Expand All @@ -179,6 +180,7 @@ def try_bulk_insert(
parameters: Parameter dictionaries for the insert.
expression: Optional parsed expression to reuse.
allow_parse: Whether to parse SQL when expression is unavailable.
result_timeout: Timeout forwarded to the load job request and result wait.

Returns:
Inserted row count if bulk insert succeeds, otherwise None.
Expand All @@ -204,8 +206,8 @@ def try_bulk_insert(
buffer.seek(0)

job_config = build_load_job_config("parquet", overwrite=False)
job = connection.load_table_from_file(buffer, table_name, job_config=job_config)
job.result()
job = connection.load_table_from_file(buffer, table_name, job_config=job_config, timeout=result_timeout)
job.result(timeout=result_timeout)
return len(parameters)
except ImportError:
logger.debug("pyarrow not available, falling back to literal inlining")
Expand Down Expand Up @@ -535,6 +537,10 @@ def run_query_job(
retry: Retry | None = None,
timeout: float | None = None,
job_retry: Retry | None = None,
api_method: str | None = None,
timestamp_precision: Any | None = None,
job_id: str | None = None,
job_id_prefix: str | None = None,
) -> QueryJob:
"""Execute a BigQuery query job with merged configuration.

Expand All @@ -552,6 +558,11 @@ def run_query_job(
job_retry: Retry policy attached to the returned query job. ``None``
disables job retries and the client's built-in ``jobs.insert``
retry wrapper (which carries a fixed 600s deadline).
api_method: Optional query API method override.
timestamp_precision: Optional timestamp precision override.
job_id: Explicit BigQuery job ID.
job_id_prefix: Prefix used by BigQuery to generate a job ID when
``job_id`` is not provided.

Returns:
QueryJob object representing the executed job.
Expand All @@ -569,9 +580,51 @@ def run_query_job(
"timeout": timeout,
"job_retry": job_retry,
}
if api_method is not None:
query_kwargs["api_method"] = api_method
if timestamp_precision is not None:
query_kwargs["timestamp_precision"] = timestamp_precision
if job_id is not None:
query_kwargs["job_id"] = job_id
elif job_id_prefix is not None:
query_kwargs["job_id_prefix"] = job_id_prefix
return connection.query(sql, **query_kwargs)


def _run_query_and_wait(
connection: "BigQueryConnection",
sql: str,
parameters: Any,
*,
default_job_config: QueryJobConfig | None,
json_serializer: "Callable[[Any], str]",
retry: Retry | None = None,
wait_timeout: float | None = None,
job_retry: Retry | None = None,
page_size: int | None = None,
max_results: int | None = None,
) -> Any:
"""Execute a BigQuery query via query_and_wait and return the row iterator."""
final_job_config = QueryJobConfig()
if default_job_config:
copy_job_config(default_job_config, final_job_config)
final_job_config.query_parameters = create_parameters(parameters, json_serializer)

query_kwargs: dict[str, Any] = {"job_config": final_job_config}
if retry is not None:
query_kwargs["retry"] = retry
if wait_timeout is not None:
query_kwargs["api_timeout"] = wait_timeout
query_kwargs["wait_timeout"] = wait_timeout
if job_retry is not None:
query_kwargs["job_retry"] = job_retry
if page_size is not None:
query_kwargs["page_size"] = page_size
if max_results is not None:
query_kwargs["max_results"] = max_results
return connection.query_and_wait(sql, **query_kwargs)


def build_load_job_config(file_format: "StorageFormat", overwrite: bool) -> "LoadJobConfig":
job_config = LoadJobConfig()
job_config.source_format = _map_bigquery_source_format(file_format)
Expand Down
Loading
Loading