Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class FileBasedStreamConfig(BaseModel):
default=None,
gt=0,
)
use_first_found_file_for_schema_discovery: Optional[bool] = Field(
title="Use first found file for schema discovery",
description="When enable, the source will use the first found file for schema discovery. Helps to avoid long discovery step",
default=False,
)

@validator("input_schema", pre=True)
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ def _get_raw_json_schema(self) -> JsonSchema:
return self.config.get_input_schema() # type: ignore
elif self.config.schemaless:
return schemaless_schema
elif self.config.use_first_found_file_for_schema_discovery:
self.logger.info(msg=f"Using only first found file for schema discovery.")
files = [next(iter(self.get_files()))]
first_n_files = len(files)
else:
files = self.list_files()
first_n_files = len(files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,28 @@ def test_override_max_n_files_for_schema_inference_is_respected(self) -> None:
}
assert self._parser.infer_schema.call_count == 3

def test_use_first_found_file_for_schema_discovery(self) -> None:
self._stream.config.use_first_found_file_for_schema_discovery = True

self._discovery_policy.get_max_n_files_for_schema_inference.return_value = 3
self._discovery_policy.n_concurrent_requests = 1
self._stream.config.input_schema = None
self._stream.config.schemaless = None
self._stream.config.recent_n_files_to_read_for_schema_discovery = None
self._parser.infer_schema.return_value = {"data": {"type": "string"}}
files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)]
self._stream_reader.get_matching_files.return_value = files

schema = self._stream.get_json_schema()
assert schema == {
"properties": {
"_ab_source_file_last_modified": {"type": "string"},
"_ab_source_file_url": {"type": "string"},
"data": {"type": ["null", "string"]},
},
"type": "object",
}

def _iter(self, x: Iterable[Any]) -> Iterator[Any]:
for item in x:
if isinstance(item, Exception):
Expand Down
Loading