diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index eb592a4aa..4946fb626 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -3,9 +3,9 @@ # from enum import Enum -from typing import Any, List, Mapping, Optional, Union +from typing import Any, Dict, List, Mapping, Optional, Union -from pydantic.v1 import BaseModel, Field, validator +from pydantic.v1 import BaseModel, Field, root_validator, validator from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat @@ -74,6 +74,11 @@ class FileBasedStreamConfig(BaseModel): default=None, gt=0, ) + use_first_found_file_for_schema_discovery: bool = Field( + title="Use First Found File For Schema Discover", + description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.", + default=False, + ) @validator("input_schema", pre=True) def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: @@ -84,6 +89,35 @@ def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA) return None + @root_validator + def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]: + """ + Please update this validation when new related to schema discovery field is added. + Validates schema discovery options compatibility. + Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided. + So this method doesn't check it to do not break already created connections. + If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided, + recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default. + """ + input_schema = values["input_schema"] is not None + schemaless = values["schemaless"] + recent_n_files_to_read_for_schema_discovery = ( + values["recent_n_files_to_read_for_schema_discovery"] is not None + ) + use_first_found_file_for_schema_discovery = values[ + "use_first_found_file_for_schema_discovery" + ] + + if ( + recent_n_files_to_read_for_schema_discovery + and use_first_found_file_for_schema_discovery + ) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1: + raise ConfigValidationError( + FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS + ) + + return values + def get_input_schema(self) -> Optional[Mapping[str, Any]]: """ User defined input_schema is defined as a string in the config. This method takes the string representation diff --git a/airbyte_cdk/sources/file_based/exceptions.py b/airbyte_cdk/sources/file_based/exceptions.py index 75f7d3f83..c75f3257f 100644 --- a/airbyte_cdk/sources/file_based/exceptions.py +++ b/airbyte_cdk/sources/file_based/exceptions.py @@ -23,6 +23,7 @@ class FileBasedSourceError(Enum): "The provided schema could not be transformed into valid JSON Schema." ) ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy." + ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time." ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema." diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 2d34fe5dc..df469e834 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -156,13 +156,20 @@ def check_connection( """ try: streams = self.streams(config) - except Exception as config_exception: + except ConfigValidationError as config_exception: raise AirbyteTracedException( internal_message="Please check the logged errors for more information.", - message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value, + message=str(config_exception), exception=AirbyteTracedException(exception=config_exception), failure_type=FailureType.config_error, ) + except Exception as exp: + raise AirbyteTracedException( + internal_message="Please check the logged errors for more information.", + message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value, + exception=AirbyteTracedException(exception=exp), + failure_type=FailureType.config_error, + ) if len(streams) == 0: return ( False, @@ -250,7 +257,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: if (state_manager and catalog_stream) else None ) - self._validate_input_schema(stream_config) sync_mode = self._get_sync_mode_from_catalog(stream_config.name) @@ -457,10 +463,3 @@ def _validate_and_get_validation_policy( model=FileBasedStreamConfig, ) return self.validation_policies[stream_config.validation_policy] - - def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None: - if stream_config.schemaless and stream_config.input_schema: - raise ValidationError( - "`input_schema` and `schemaless` options cannot both be set", - model=FileBasedStreamConfig, - ) diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 3053a74d2..588c4a18e 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -273,6 +273,12 @@ def _get_raw_json_schema(self) -> JsonSchema: return self.config.get_input_schema() # type: ignore elif self.config.schemaless: return schemaless_schema + elif self.config.use_first_found_file_for_schema_discovery: + self.logger.info( + msg=f"Using only first found file for schema discovery for stream {self.name} due to limitation in config." + ) + files = list(itertools.islice(self.get_files(), 1)) + first_n_files = len(files) else: files = self.list_files() first_n_files = len(files) diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index e67365ae3..f31585412 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -485,6 +485,12 @@ "exclusiveMinimum": 0, "type": "integer", }, + "use_first_found_file_for_schema_discovery": { + "default": False, + "description": "When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.", + "title": "Use First Found File For Schema Discover", + "type": "boolean", + }, }, "required": ["name", "format"], }, @@ -2114,12 +2120,14 @@ } ) .set_expected_check_status("FAILED") - .set_expected_check_error(None, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value) + .set_expected_check_error( + None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) .set_expected_discover_error( - ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value ) .set_expected_read_error( - ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value ) ).build() @@ -2217,12 +2225,229 @@ } ) .set_expected_check_status("FAILED") - .set_expected_check_error(None, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value) + .set_expected_check_error( + None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) + .set_expected_discover_error( + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) + .set_expected_read_error( + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) +).build() + +recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario: TestScenario[ + InMemoryFilesSource +] = ( + TestScenarioBuilder[InMemoryFilesSource]() + .set_name( + "recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario" + ) + .set_config( + { + "streams": [ + { + "name": "stream1", + "format": {"filetype": "csv"}, + "globs": ["a.csv"], + "validation_policy": "Skip Record", + "recent_n_files_to_read_for_schema_discovery": 5, + "use_first_found_file_for_schema_discovery": True, + }, + { + "name": "stream2", + "format": {"filetype": "csv"}, + "globs": ["b.csv"], + "validation_policy": "Skip Record", + }, + ] + } + ) + .set_source_builder( + FileBasedSourceBuilder() + .set_files( + { + "a.csv": { + "contents": [ + ("col1", "col2"), + ("val11a", "val12a"), + ("val21a", "val22a"), + ], + "last_modified": "2023-06-05T03:54:07.000Z", + }, + "b.csv": { + "contents": [ + ("col3",), + ("val13b",), + ("val23b",), + ], + "last_modified": "2023-06-05T03:54:07.000Z", + }, + } + ) + .set_file_type("csv") + ) + .set_catalog( + CatalogBuilder() + .with_stream("stream1", SyncMode.full_refresh) + .with_stream("stream2", SyncMode.full_refresh) + .build() + ) + .set_expected_catalog( + { + "streams": [ + { + "json_schema": { + "type": "object", + "properties": { + "data": {"type": "object"}, + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + }, + }, + "name": "stream1", + "supported_sync_modes": ["full_refresh", "incremental"], + "is_resumable": True, + "is_file_based": False, + "source_defined_cursor": True, + "default_cursor_field": ["_ab_source_file_last_modified"], + }, + { + "json_schema": { + "type": "object", + "properties": { + "col3": {"type": ["null", "string"]}, + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + }, + }, + "name": "stream2", + "source_defined_cursor": True, + "default_cursor_field": ["_ab_source_file_last_modified"], + "supported_sync_modes": ["full_refresh", "incremental"], + "is_resumable": True, + "is_file_based": False, + }, + ] + } + ) + .set_expected_check_status("FAILED") + .set_expected_check_error( + None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) + .set_expected_discover_error( + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) + .set_expected_read_error( + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) +).build() + + +schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario: TestScenario[ + InMemoryFilesSource +] = ( + TestScenarioBuilder[InMemoryFilesSource]() + .set_name( + "schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario" + ) + .set_config( + { + "streams": [ + { + "name": "stream1", + "format": {"filetype": "csv"}, + "globs": ["a.csv"], + "validation_policy": "Skip Record", + "schemaless": True, + "use_first_found_file_for_schema_discovery": True, + }, + { + "name": "stream2", + "format": {"filetype": "csv"}, + "globs": ["b.csv"], + "validation_policy": "Skip Record", + }, + ] + } + ) + .set_source_builder( + FileBasedSourceBuilder() + .set_files( + { + "a.csv": { + "contents": [ + ("col1", "col2"), + ("val11a", "val12a"), + ("val21a", "val22a"), + ], + "last_modified": "2023-06-05T03:54:07.000Z", + }, + "b.csv": { + "contents": [ + ("col3",), + ("val13b",), + ("val23b",), + ], + "last_modified": "2023-06-05T03:54:07.000Z", + }, + } + ) + .set_file_type("csv") + ) + .set_catalog( + CatalogBuilder() + .with_stream("stream1", SyncMode.full_refresh) + .with_stream("stream2", SyncMode.full_refresh) + .build() + ) + .set_expected_catalog( + { + "streams": [ + { + "json_schema": { + "type": "object", + "properties": { + "data": {"type": "object"}, + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + }, + }, + "name": "stream1", + "supported_sync_modes": ["full_refresh", "incremental"], + "is_resumable": True, + "is_file_based": False, + "source_defined_cursor": True, + "default_cursor_field": ["_ab_source_file_last_modified"], + }, + { + "json_schema": { + "type": "object", + "properties": { + "col3": {"type": ["null", "string"]}, + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + }, + }, + "name": "stream2", + "source_defined_cursor": True, + "default_cursor_field": ["_ab_source_file_last_modified"], + "supported_sync_modes": ["full_refresh", "incremental"], + "is_resumable": True, + "is_file_based": False, + }, + ] + } + ) + .set_expected_check_status("FAILED") + .set_expected_check_error( + None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) .set_expected_discover_error( - ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value ) .set_expected_read_error( - ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value ) ).build() diff --git a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py index 60716b771..54394a36d 100644 --- a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py +++ b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py @@ -209,6 +209,7 @@ def test_override_max_n_files_for_schema_inference_is_respected(self) -> None: self._discovery_policy.get_max_n_files_for_schema_inference.return_value = 3 self._stream.config.input_schema = None self._stream.config.schemaless = None + self._stream.config.use_first_found_file_for_schema_discovery = False self._parser.infer_schema.return_value = {"data": {"type": "string"}} files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)] self._stream_reader.get_matching_files.return_value = files @@ -226,6 +227,30 @@ def test_override_max_n_files_for_schema_inference_is_respected(self) -> None: } assert self._parser.infer_schema.call_count == 3 + def test_use_first_found_file_for_schema_discovery(self) -> None: + self._stream.config.use_first_found_file_for_schema_discovery = True + + self._discovery_policy.get_max_n_files_for_schema_inference.return_value = 3 + self._discovery_policy.n_concurrent_requests = 1 + self._stream.config.input_schema = None + self._stream.config.schemaless = None + self._stream.config.recent_n_files_to_read_for_schema_discovery = None + self._parser.infer_schema.return_value = {"data": {"type": "string"}} + files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)] + self._stream_reader.get_matching_files.return_value = files + + schema = self._stream.get_json_schema() + assert self._parser.infer_schema.call_count == 1 + assert self._parser.infer_schema.call_args[0][1].uri == "file0" + assert schema == { + "properties": { + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + "data": {"type": ["null", "string"]}, + }, + "type": "object", + } + def _iter(self, x: Iterable[Any]) -> Iterator[Any]: for item in x: if isinstance(item, Exception): diff --git a/unit_tests/sources/file_based/test_file_based_scenarios.py b/unit_tests/sources/file_based/test_file_based_scenarios.py index 278dcf1ac..36c9f42fc 100644 --- a/unit_tests/sources/file_based/test_file_based_scenarios.py +++ b/unit_tests/sources/file_based/test_file_based_scenarios.py @@ -81,8 +81,10 @@ multi_csv_stream_n_file_exceeds_config_limit_for_inference, multi_csv_stream_n_file_exceeds_limit_for_inference, multi_stream_custom_format, + recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, schemaless_csv_multi_stream_scenario, schemaless_csv_scenario, + schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, schemaless_with_user_input_schema_fails_connection_check_multi_stream_scenario, schemaless_with_user_input_schema_fails_connection_check_scenario, single_csv_scenario, @@ -207,6 +209,8 @@ schemaless_csv_scenario, schemaless_csv_multi_stream_scenario, schemaless_with_user_input_schema_fails_connection_check_multi_stream_scenario, + recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, + schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, schemaless_with_user_input_schema_fails_connection_check_scenario, single_stream_user_input_schema_scenario_schema_is_invalid, single_stream_user_input_schema_scenario_emit_nonconforming_records, @@ -312,6 +316,8 @@ success_multi_stream_scenario, success_user_provided_schema_scenario, schemaless_with_user_input_schema_fails_connection_check_multi_stream_scenario, + recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, + schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, schemaless_with_user_input_schema_fails_connection_check_scenario, valid_single_stream_user_input_schema_scenario, single_avro_scenario,