-
Notifications
You must be signed in to change notification settings - Fork 30
feat: Add InferredSchemaLoader for runtime schema inference #831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 13 commits
62ba126
ac34204
81cd888
c671ccf
038b58f
32b512a
02050b8
b276315
ccdda0d
c288814
d99c2fa
ff51fa5
6906cb0
e118a20
66db70e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,112 @@ | ||
| # | ||
| # Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| from collections.abc import Mapping as ABCMapping | ||
| from collections.abc import Sequence | ||
| from dataclasses import InitVar, dataclass | ||
| from typing import Any, Mapping | ||
|
|
||
| from airbyte_cdk.models import AirbyteRecordMessage | ||
| from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever | ||
| from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader | ||
| from airbyte_cdk.sources.types import Config | ||
| from airbyte_cdk.utils.schema_inferrer import SchemaInferrer | ||
|
|
||
|
|
||
| def _to_builtin_types(value: Any) -> Any: | ||
| """ | ||
| Recursively convert Mapping-like and Sequence-like objects to plain Python types. | ||
| This is necessary because genson's schema inference doesn't handle custom Mapping | ||
| or Sequence implementations properly. We need to convert everything to plain dicts, | ||
| lists, and primitive types. | ||
| Args: | ||
| value: The value to convert | ||
| Returns: | ||
| The value converted to plain Python types | ||
| """ | ||
| if isinstance(value, ABCMapping): | ||
| return {k: _to_builtin_types(v) for k, v in value.items()} | ||
| elif isinstance(value, (list, tuple)): | ||
| return [_to_builtin_types(item) for item in value] | ||
| elif isinstance(value, Sequence) and not isinstance(value, (str, bytes)): | ||
| return [_to_builtin_types(item) for item in value] | ||
| else: | ||
| return value | ||
|
|
||
|
|
||
| @dataclass | ||
| class InferredSchemaLoader(SchemaLoader): | ||
| """ | ||
| Infers a JSON Schema by reading a sample of records from the stream at discover time. | ||
| This schema loader reads up to `record_sample_size` records from the stream and uses | ||
| the SchemaInferrer to generate a JSON schema based on the structure of those records. | ||
| This is useful for streams where the schema is not known in advance or changes dynamically. | ||
| Attributes: | ||
| retriever (Retriever): The retriever used to fetch records from the stream | ||
| config (Config): The user-provided configuration as specified by the source's spec | ||
| parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed | ||
| record_sample_size (int): The maximum number of records to read for schema inference. Defaults to 100. | ||
| stream_name (str): The name of the stream for which to infer the schema | ||
| """ | ||
|
|
||
| retriever: Retriever | ||
| config: Config | ||
| parameters: InitVar[Mapping[str, Any]] | ||
| record_sample_size: int = 100 | ||
| stream_name: str = "" | ||
|
|
||
| def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
| self._parameters = parameters | ||
| if not self.stream_name: | ||
| self.stream_name = parameters.get("name", "") | ||
aaronsteers marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if not self.stream_name: | ||
| raise ValueError( | ||
| "stream_name must be provided either directly or via the 'name' parameter" | ||
| ) | ||
|
|
||
| def get_json_schema(self) -> Mapping[str, Any]: | ||
| """ | ||
| Infers and returns a JSON schema by reading a sample of records from the stream. | ||
| This method reads up to `record_sample_size` records from the stream and uses | ||
| the SchemaInferrer to generate a JSON schema. If no records are available, | ||
| it returns an empty schema. | ||
| Returns: | ||
| A mapping representing the inferred JSON schema for the stream | ||
| """ | ||
| schema_inferrer = SchemaInferrer() | ||
|
|
||
| record_count = 0 | ||
| for stream_slice in self.retriever.stream_slices(): | ||
|
||
| for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice): | ||
| if record_count >= self.record_sample_size: | ||
| break | ||
|
|
||
| # Convert all Mapping-like and Sequence-like objects to plain Python types | ||
| # This is necessary because genson doesn't handle custom implementations properly | ||
| record = _to_builtin_types(record) | ||
|
|
||
| airbyte_record = AirbyteRecordMessage( | ||
| stream=self.stream_name, | ||
| data=record, # type: ignore[arg-type] | ||
| emitted_at=0, | ||
| ) | ||
|
|
||
| schema_inferrer.accumulate(airbyte_record) | ||
| record_count += 1 | ||
|
|
||
| if record_count >= self.record_sample_size: | ||
| break | ||
|
|
||
| inferred_schema: Mapping[str, Any] | None = schema_inferrer.get_stream_schema( | ||
| self.stream_name | ||
| ) | ||
|
|
||
| return inferred_schema if inferred_schema else {} | ||
Uh oh!
There was an error while loading. Please reload this page.