Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1548,13 +1548,15 @@ definitions:
loaders defined first taking precedence in the event of a conflict.
anyOf:
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/InferredSchemaLoader"
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- title: Multiple Schema Loaders
type: array
items:
anyOf:
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/InferredSchemaLoader"
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- "$ref": "#/definitions/CustomSchemaLoader"
Expand Down Expand Up @@ -2462,6 +2464,40 @@ definitions:
$parameters:
type: object
additionalProperties: true
InferredSchemaLoader:
title: Inferred Schema Loader
description: Infers a JSON Schema by reading a sample of records from the stream at discover time. This is useful for streams where the schema is not known in advance or changes dynamically.
type: object
required:
- type
- retriever
properties:
type:
type: string
enum: [InferredSchemaLoader]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/SimpleRetriever"
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
record_sample_size:
title: Record Sample Size
description: The maximum number of records to read for schema inference. Defaults to 100.
type: integer
default: 100
example:
- 100
- 500
- 1000
stream_name:
title: Stream Name
description: The name of the stream for which to infer the schema. If not provided, it will be inferred from the stream context.
type: string
$parameters:
type: object
additionalProperties: true
InlineSchemaLoader:
title: Inline Schema Loader
description: Loads a schema that is defined directly in the manifest file.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2483,11 +2481,13 @@ class Config:
schema_loader: Optional[
Union[
InlineSchemaLoader,
InferredSchemaLoader,
DynamicSchemaLoader,
JsonFileSchemaLoader,
List[
Union[
InlineSchemaLoader,
InferredSchemaLoader,
DynamicSchemaLoader,
JsonFileSchemaLoader,
CustomSchemaLoader,
Expand Down Expand Up @@ -2753,6 +2753,27 @@ class DynamicSchemaLoader(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class InferredSchemaLoader(BaseModel):
type: Literal["InferredSchemaLoader"]
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
record_sample_size: Optional[int] = Field(
100,
description="The maximum number of records to read for schema inference. Defaults to 100.",
example=[100, 500, 1000],
title="Record Sample Size",
)
stream_name: Optional[str] = Field(
None,
description="The name of the stream for which to infer the schema. If not provided, it will be inferred from the stream context.",
title="Stream Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ParentStreamConfig(BaseModel):
type: Literal["ParentStreamConfig"]
stream: Union[DeclarativeStream, StateDelegatingStream] = Field(
Expand Down Expand Up @@ -3093,6 +3114,7 @@ class DynamicDeclarativeStream(BaseModel):
SessionTokenAuthenticator.update_forward_refs()
HttpRequester.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
InferredSchemaLoader.update_forward_refs()
ParentStreamConfig.update_forward_refs()
PropertiesFromEndpoint.update_forward_refs()
SimpleRetriever.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
IncrementingCountCursor as IncrementingCountCursorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
InferredSchemaLoader as InferredSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
InlineSchemaLoader as InlineSchemaLoaderModel,
)
Expand Down Expand Up @@ -549,6 +552,7 @@
ComplexFieldType,
DefaultSchemaLoader,
DynamicSchemaLoader,
InferredSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
SchemaLoader,
Expand Down Expand Up @@ -748,6 +752,7 @@ def _init_mappings(self) -> None:
HttpRequesterModel: self.create_http_requester,
HttpResponseFilterModel: self.create_http_response_filter,
InlineSchemaLoaderModel: self.create_inline_schema_loader,
InferredSchemaLoaderModel: self.create_inferred_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
Expand Down Expand Up @@ -2500,6 +2505,39 @@ def create_inline_schema_loader(
) -> InlineSchemaLoader:
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})

def create_inferred_schema_loader(
self, model: InferredSchemaLoaderModel, config: Config, **kwargs: Any
) -> InferredSchemaLoader:
name = kwargs.get("name", "inferred_schema")
retriever = self._create_component_from_model(
model=model.retriever,
config=config,
name=name,
primary_key=None,
partition_router=self._build_stream_slicer_from_partition_router(
model.retriever, config
),
transformations=[],
use_cache=True,
log_formatter=(
lambda response: format_http_message(
response,
f"Schema loader '{name}' request",
f"Request performed in order to infer schema.",
name,
is_auxiliary=True,
)
),
)

return InferredSchemaLoader(
retriever=retriever,
config=config,
record_sample_size=model.record_sample_size or 100,
stream_name=model.stream_name or name,
parameters=model.parameters or {},
)

def create_complex_field_type(
self, model: ComplexFieldTypeModel, config: Config, **kwargs: Any
) -> ComplexFieldType:
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
SchemaTypeIdentifier,
TypesMap,
)
from airbyte_cdk.sources.declarative.schema.inferred_schema_loader import InferredSchemaLoader
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
Expand All @@ -18,6 +19,7 @@
"DefaultSchemaLoader",
"SchemaLoader",
"InlineSchemaLoader",
"InferredSchemaLoader",
"DynamicSchemaLoader",
"ComplexFieldType",
"TypesMap",
Expand Down
112 changes: 112 additions & 0 deletions airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from collections.abc import Mapping as ABCMapping
from collections.abc import Sequence
from dataclasses import InitVar, dataclass
from typing import Any, Mapping

from airbyte_cdk.models import AirbyteRecordMessage
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.types import Config
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer


def _to_builtin_types(value: Any) -> Any:
"""
Recursively convert Mapping-like and Sequence-like objects to plain Python types.
This is necessary because genson's schema inference doesn't handle custom Mapping
or Sequence implementations properly. We need to convert everything to plain dicts,
lists, and primitive types.
Args:
value: The value to convert
Returns:
The value converted to plain Python types
"""
if isinstance(value, ABCMapping):
return {k: _to_builtin_types(v) for k, v in value.items()}
elif isinstance(value, (list, tuple)):
return [_to_builtin_types(item) for item in value]
elif isinstance(value, Sequence) and not isinstance(value, (str, bytes)):
return [_to_builtin_types(item) for item in value]
else:
return value


@dataclass
class InferredSchemaLoader(SchemaLoader):
"""
Infers a JSON Schema by reading a sample of records from the stream at discover time.
This schema loader reads up to `record_sample_size` records from the stream and uses
the SchemaInferrer to generate a JSON schema based on the structure of those records.
This is useful for streams where the schema is not known in advance or changes dynamically.
Attributes:
retriever (Retriever): The retriever used to fetch records from the stream
config (Config): The user-provided configuration as specified by the source's spec
parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
record_sample_size (int): The maximum number of records to read for schema inference. Defaults to 100.
stream_name (str): The name of the stream for which to infer the schema
"""

retriever: Retriever
config: Config
parameters: InitVar[Mapping[str, Any]]
record_sample_size: int = 100
stream_name: str = ""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
if not self.stream_name:
self.stream_name = parameters.get("name", "")
if not self.stream_name:
raise ValueError(
"stream_name must be provided either directly or via the 'name' parameter"
)

def get_json_schema(self) -> Mapping[str, Any]:
"""
Infers and returns a JSON schema by reading a sample of records from the stream.
This method reads up to `record_sample_size` records from the stream and uses
the SchemaInferrer to generate a JSON schema. If no records are available,
it returns an empty schema.
Returns:
A mapping representing the inferred JSON schema for the stream
"""
schema_inferrer = SchemaInferrer()

record_count = 0
for stream_slice in self.retriever.stream_slices():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean that during a read, we will read some records twice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! The InferredSchemaLoader reads records during discover time (when get_json_schema() is called), not during the read operation. So the flow is:

  1. Discover phase: InferredSchemaLoader reads up to record_sample_size records to infer the schema
  2. Read phase: The stream reads records normally (potentially including the same records from discover)

So yes, some records may be read twice - once during discover and once during read. This is intentional and consistent with how schema inference works. The alternative would be to cache records from discover and replay them during read, but that would add complexity and memory overhead.

This behavior is similar to how test_read works in the CDK - it also reads sample records separately from the main read operation.

Is this acceptable, or would you prefer a different approach (e.g., caching records from discover)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the schema loader called during read here which would mean that the HTTP requests are done once for self._schema_loader.get_json_schema() and another time for self._retriever.read_records(...)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent catch! You're absolutely right - get_json_schema() is called during read operations (in DeclarativePartition.read() line 92), not just during discover. This means InferredSchemaLoader would read up to record_sample_size records for every partition/slice during a sync, which is extremely inefficient.

I checked other schema loaders:

  • InlineSchemaLoader: Returns static schema (no issue)
  • DynamicSchemaLoader: Also reads records on every call to get_json_schema() without caching, but only reads 1 record so the impact is minimal

Proposed Solution:
Add internal memoization to InferredSchemaLoader with thread-safe caching:

  • Cache the inferred schema after first call
  • Use threading.Lock to prevent duplicate work under concurrent partition reads
  • Ensure the schema loader uses its own retriever instance (not shared with the stream) to avoid state side-effects

This will ensure schema inference happens only once per sync job, not once per partition.

I'll implement this fix now. Does this approach sound good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented the caching solution! 🎉

Changes made:

  • Added threading.Lock and _cached_schema instance variables in __post_init__
  • Implemented double-checked locking pattern in get_json_schema() to prevent duplicate work under concurrent calls
  • Added unit test test_inferred_schema_loader_caches_schema() to verify the retriever is only called once even when get_json_schema() is called multiple times

Result:
Schema inference now happens only once per sync job (on first call to get_json_schema()), not once per partition. This prevents the performance issue where InferredSchemaLoader would read up to record_sample_size records for every partition/slice during a sync.

The implementation follows standard thread-safe memoization patterns and ensures the cached schema is returned on all subsequent calls.

for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice):
if record_count >= self.record_sample_size:
break

# Convert all Mapping-like and Sequence-like objects to plain Python types
# This is necessary because genson doesn't handle custom implementations properly
record = _to_builtin_types(record)

airbyte_record = AirbyteRecordMessage(
stream=self.stream_name,
data=record, # type: ignore[arg-type]
emitted_at=0,
)

schema_inferrer.accumulate(airbyte_record)
record_count += 1

if record_count >= self.record_sample_size:
break

inferred_schema: Mapping[str, Any] | None = schema_inferrer.get_stream_schema(
self.stream_name
)

return inferred_schema if inferred_schema else {}
Loading
Loading