Skip to content

Commit 02246dc

Browse files
maxi297octavia-squidington-iii
andauthored
chore: non-incremental stream instantiated as defaultstream (airbytehq#691)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 3244eec commit 02246dc

File tree

16 files changed

+419
-476
lines changed

16 files changed

+419
-476
lines changed

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ def run_test_read(
120120
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
121121

122122
schema_inferrer = SchemaInferrer(
123-
self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
123+
self._pk_to_nested_and_composite_field(
124+
stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key # type: ignore # We are accessing the private property here as the primary key is not exposed. We should either expose it or use `as_airbyte_stream` to retrieve it as this is the "official" way where it is exposed in the Airbyte protocol
125+
)
126+
if stream
127+
else None,
124128
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
125129
if stream
126130
else None,

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
import logging
6-
from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
6+
from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
77

88
from airbyte_cdk.models import (
99
AirbyteCatalog,
@@ -15,10 +15,6 @@
1515
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
1616
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
1717
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
18-
from airbyte_cdk.sources.declarative.extractors import RecordSelector
19-
from airbyte_cdk.sources.declarative.extractors.record_filter import (
20-
ClientSideIncrementalRecordFilterDecorator,
21-
)
2218
from airbyte_cdk.sources.declarative.incremental import (
2319
ConcurrentPerPartitionCursor,
2420
GlobalSubstreamCursor,
@@ -28,7 +24,6 @@
2824
PerPartitionWithGlobalCursor,
2925
)
3026
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
31-
from airbyte_cdk.sources.declarative.models import FileUploader
3227
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3328
ConcurrencyLevel as ConcurrencyLevelModel,
3429
)
@@ -84,7 +79,6 @@ def __init__(
8479
# incremental streams running in full refresh.
8580
component_factory = component_factory or ModelToComponentFactory(
8681
emit_connector_builder_messages=emit_connector_builder_messages,
87-
disable_resumable_full_refresh=True,
8882
connector_state_manager=self._connector_state_manager,
8983
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
9084
)
@@ -180,7 +174,7 @@ def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> Airbyte
180174
]
181175
)
182176

183-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
177+
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
184178
"""
185179
The `streams` method is used as part of the AbstractSource in the following cases:
186180
* ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
@@ -210,6 +204,10 @@ def _group_streams(
210204
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
211205
# so we need to treat them as synchronous
212206

207+
if isinstance(declarative_stream, AbstractStream):
208+
concurrent_streams.append(declarative_stream)
209+
continue
210+
213211
supports_file_transfer = (
214212
isinstance(declarative_stream, DeclarativeStream)
215213
and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
@@ -278,10 +276,10 @@ def _group_streams(
278276

279277
partition_generator = StreamSlicerPartitionGenerator(
280278
partition_factory=DeclarativePartitionFactory(
281-
declarative_stream.name,
282-
declarative_stream.get_json_schema(),
283-
retriever,
284-
self.message_repository,
279+
stream_name=declarative_stream.name,
280+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
281+
retriever=retriever,
282+
message_repository=self.message_repository,
285283
),
286284
stream_slicer=declarative_stream.retriever.stream_slicer,
287285
)
@@ -309,10 +307,10 @@ def _group_streams(
309307
)
310308
partition_generator = StreamSlicerPartitionGenerator(
311309
partition_factory=DeclarativePartitionFactory(
312-
declarative_stream.name,
313-
declarative_stream.get_json_schema(),
314-
retriever,
315-
self.message_repository,
310+
stream_name=declarative_stream.name,
311+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
312+
retriever=retriever,
313+
message_repository=self.message_repository,
316314
),
317315
stream_slicer=cursor,
318316
)
@@ -339,10 +337,10 @@ def _group_streams(
339337
) and hasattr(declarative_stream.retriever, "stream_slicer"):
340338
partition_generator = StreamSlicerPartitionGenerator(
341339
DeclarativePartitionFactory(
342-
declarative_stream.name,
343-
declarative_stream.get_json_schema(),
344-
declarative_stream.retriever,
345-
self.message_repository,
340+
stream_name=declarative_stream.name,
341+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
342+
retriever=declarative_stream.retriever,
343+
message_repository=self.message_repository,
346344
),
347345
declarative_stream.retriever.stream_slicer,
348346
)
@@ -399,10 +397,10 @@ def _group_streams(
399397

400398
partition_generator = StreamSlicerPartitionGenerator(
401399
DeclarativePartitionFactory(
402-
declarative_stream.name,
403-
declarative_stream.get_json_schema(),
404-
retriever,
405-
self.message_repository,
400+
stream_name=declarative_stream.name,
401+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
402+
retriever=retriever,
403+
message_repository=self.message_repository,
406404
),
407405
perpartition_cursor,
408406
)

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from copy import deepcopy
99
from importlib import metadata
1010
from types import ModuleType
11-
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
11+
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union
1212

1313
import orjson
1414
import yaml
@@ -66,6 +66,7 @@
6666
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
6767
from airbyte_cdk.sources.declarative.spec.spec import Spec
6868
from airbyte_cdk.sources.message import MessageRepository
69+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
6970
from airbyte_cdk.sources.streams.core import Stream
7071
from airbyte_cdk.sources.types import Config, ConnectionDefinition
7172
from airbyte_cdk.sources.utils.slice_logger import (
@@ -297,7 +298,12 @@ def connection_checker(self) -> ConnectionChecker:
297298
f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
298299
)
299300

300-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
301+
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
302+
"""
303+
As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
304+
Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
305+
fully decouple this from the AbstractSource.
306+
"""
301307
if self._spec_component:
302308
self._spec_component.validate_config(config)
303309

0 commit comments

Comments
 (0)