Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
38d13d3
feat: Add fetch_one/fetch_record functionality to SimpleRetriever
devin-ai-integration[bot] Nov 12, 2025
052eb81
update type hints
aaronsteers Nov 12, 2025
c581827
Change fetch_one to raise RecordNotFoundException instead of returnin…
devin-ai-integration[bot] Nov 12, 2025
bab2781
Fix ruff format issues
devin-ai-integration[bot] Nov 12, 2025
a2e8e55
Improve path construction robustness in fetch_one
devin-ai-integration[bot] Nov 12, 2025
c0719e2
refactor: Move fetch_record to concurrent declarative classes only
devin-ai-integration[bot] Nov 12, 2025
38f6dc3
fix: Address PR review comments
devin-ai-integration[bot] Nov 13, 2025
0a71543
fix: Simplify error handling in fetch_one
devin-ai-integration[bot] Nov 13, 2025
626848b
fix: Address PR review comments - add type hints and clarifying comments
devin-ai-integration[bot] Nov 13, 2025
bf83bf1
refactor: Use iterator instead of list for fetch_one record parsing
devin-ai-integration[bot] Nov 13, 2025
691927b
refactor: Simplify fetch_one/fetch_record to only accept str for pk_v…
devin-ai-integration[bot] Nov 13, 2025
8a52390
refactor: Rename fetch_one to _fetch_one to mark as internal API
devin-ai-integration[bot] Nov 13, 2025
4386a35
fix: Address PR feedback from brianjlai on fetch_one implementation
devin-ai-integration[bot] Nov 13, 2025
28b9286
Update unit_tests/sources/declarative/retrievers/test_simple_retrieve…
aaronsteers Nov 13, 2025
dfeeec2
Update unit_tests/sources/declarative/retrievers/test_simple_retrieve…
aaronsteers Nov 13, 2025
82add1b
Update unit_tests/sources/declarative/retrievers/test_simple_retrieve…
aaronsteers Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.spec.spec import Spec
from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
Expand Down Expand Up @@ -507,6 +508,62 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon
return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def fetch_record(
self,
stream_name: str,
pk_value: str,
config: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Fetch a single record from a stream by primary key.

Args:
stream_name: Name of the stream to fetch from
pk_value: Primary key value to fetch as a string (e.g., "123")
config: Source configuration (optional, uses instance config if not provided)

Returns:
The fetched record as a dict

Raises:
ValueError: If the stream name is not found in the source
NotImplementedError: If the stream doesn't support fetching individual records
RecordNotFoundException: If the record is not found
"""
config = config or self._config

stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams

stream_config = None
for config_item in stream_configs:
if config_item.get("name") == stream_name:
stream_config = config_item
break

if not stream_config:
available_streams = [c.get("name") for c in stream_configs]
raise ValueError(
f"Stream '{stream_name}' not found in source. "
f"Available streams: {', '.join(available_streams)}"
)

declarative_stream = self._constructor.create_component(
DeclarativeStreamModel,
stream_config,
config,
emit_connector_builder_messages=self._emit_connector_builder_messages,
)

if not isinstance(declarative_stream.retriever, SimpleRetriever):
raise NotImplementedError(
f"Stream '{stream_name}' does not support fetching individual records. "
"Only streams with SimpleRetriever currently support this operation."
)

return declarative_stream.retriever._fetch_one(
pk_value=pk_value, records_schema=declarative_stream.get_json_schema()
)

@property
def dynamic_streams(self) -> List[Dict[str, Any]]:
return self._dynamic_stream_configs(
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ class ReadException(Exception):
"""
Raise when there is an error reading data from an API Source
"""


class RecordNotFoundException(ReadException):
"""Raised when a requested record is not found (e.g., 404 response)."""
97 changes: 97 additions & 0 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor
from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.models import AirbyteMessage
from airbyte_cdk.sources.declarative.exceptions import RecordNotFoundException
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
Expand Down Expand Up @@ -626,6 +627,102 @@ def _to_partition_key(to_serialize: Any) -> str:
# separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

def _fetch_one(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is presumably going to be called from somewhere else outside of SimpleRetriever, we should make this public. While I understand this is not meant to be public facing or used by the protocol interface, it still feels like bad practice for this be a private method.

self,
pk_value: str,
records_schema: Mapping[str, Any],
) -> Mapping[str, Any]:
"""Fetch a single record by primary key value.

This method constructs a path by appending the primary key value to the base path
and sends a GET request to fetch a single record. It's designed for REST APIs that
follow the convention: GET /resource/{id}

Args:
pk_value: The primary key value to fetch as a string (e.g., "123")
records_schema: JSON schema describing the record structure

Returns:
The fetched record as a dict.

Raises:
RecordNotFoundException: If the response is empty/ignored or parsing yields no records.
Exception: HTTP errors (including 404) are propagated from requester's error handling.

Example:
record = retriever.fetch_one("123", schema)

Note:
This implementation uses convention-based path construction, appending /{pk_value} to the base path. (important-comment)

Alternative approaches that could be implemented in the future: (important-comment)
- Path template interpolation: Use a configurable template like "{base_path}/{id}" (important-comment)
See: https://github.com/airbytehq/airbyte-python-cdk/issues/833#phase-1a (important-comment)
- Field path configuration: Allow specifying which response field contains the record (important-comment)
for APIs that wrap single records in envelopes like {"data": {...}} (important-comment)
"""
# Single-record fetch doesn't involve partitioning, so we pass an empty StreamSlice
empty_stream_slice = StreamSlice(partition={}, cursor_slice={})

# Get the base path from the requester
base_path = self.requester.get_path(
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
)

fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}"

# send_request() may return None when the error handler chooses to IGNORE a response
response: requests.Response | None = self.requester.send_request(
path=fetch_path,
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
request_headers=self._request_headers(
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
),
request_params=self._request_params(
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
),
request_body_data=self._request_body_data(
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
),
request_body_json=self._request_body_json(
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
),
log_formatter=self.log_formatter,
)

if not response:
raise RecordNotFoundException(
f"Record with primary key {pk_value} not found (no response)"
)

records_iter: Iterable[Record] = self._parse_response(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section doesn't seem like it will work for a lot of cases. I feel like a common pattern is that for retrieving all objects like prices, we get back a response w/ a top-level array object which will contain an object for each price. And when we hit the individual API endpoint we get back a single top-level object representing the price.

The way this is written assumes that hitting the single record fetch_path will contain a top level list and extract each record into the records_iter. But now we'll be unable to extract anything, there won't be a first_record to extract.

Unless you think I'm missing something, it seems like we shouldn't be using the underlying extractor which is no longer suitable for individual records

response=response,
stream_state={},
records_schema=records_schema,
stream_slice=empty_stream_slice,
next_page_token=None,
)

first_record: Record | None = next(iter(records_iter), None)
if not first_record:
raise RecordNotFoundException(
f"Record with primary key {pk_value} not found (empty response)"
)

return dict(first_record.data)


def _deep_merge(
target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]]
Expand Down
19 changes: 19 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/abstract_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,22 @@ def check_availability(self) -> StreamAvailability:
"""
:return: If the stream is available and if not, why
"""

def fetch_record(self, pk_value: str) -> Mapping[str, Any]:
"""
Fetch a single record by primary key value.

Args:
pk_value: The primary key value as a string (e.g., "123")

Returns:
The fetched record as a dict

Raises:
NotImplementedError: If the stream doesn't support fetching individual records
RecordNotFoundException: If the record is not found
"""
raise NotImplementedError(
f"Stream {self.name} does not support fetching individual records. "
"Only declarative streams with SimpleRetriever currently support this operation."
)
160 changes: 160 additions & 0 deletions unit_tests/sources/declarative/retrievers/test_simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
from airbyte_cdk.sources.declarative.exceptions import RecordNotFoundException
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, HttpSelector, RecordSelector
from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter
from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, Paginator
Expand Down Expand Up @@ -1644,3 +1645,162 @@ def _mock_paginator():
paginator.get_request_body_data.__name__ = "get_request_body_data"
paginator.get_request_body_json.__name__ = "get_request_body_json"
return paginator


def test_fetch_one_simple_pk():
"""Test fetch_one with a simple string primary key."""
requester = MagicMock()
requester.get_path.return_value = "posts"

response = requests.Response()
response.status_code = 200
response._content = json.dumps({"id": "123", "title": "Test Post"}).encode("utf-8")

requester.send_request.return_value = response

record_selector = MagicMock()
record_selector.select_records.return_value = [
Record(data={"id": "123", "title": "Test Post"}, stream_name="posts", associated_slice=None)
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this test is written, we aren't actually testing the selector/extractor logic. Most extractor/selector are defined to extract a list of objects (Stripe almost certainly does it this way)

But here we just mock the single record response, and we mock calling select_records() so in a real world use case, this would fail since we'll attempt to extract response.data which doesn't exist. But all this is skipped over due to all the mocking


retriever = SimpleRetriever(
name="posts",
primary_key="id",
requester=requester,
record_selector=record_selector,
parameters={},
config={},
)

result = retriever._fetch_one("123", records_schema={})

assert result == {"id": "123", "title": "Test Post"}
requester.send_request.assert_called_once()
call_kwargs = requester.send_request.call_args[1]
assert call_kwargs["path"] == "posts/123"


def test_fetch_one_not_found():
"""Test fetch_one raises RecordNotFoundException when record is not found (404)."""
requester = MagicMock()
requester.get_path.return_value = "posts"

error = Exception("Not found")
error.response = MagicMock()
error.response.status_code = 404
requester.send_request.side_effect = error

record_selector = MagicMock()

retriever = SimpleRetriever(
name="posts",
primary_key="id",
requester=requester,
record_selector=record_selector,
parameters={},
config={},
)

with pytest.raises(RecordNotFoundException) as exc_info:
retriever._fetch_one("999", records_schema={})

assert "999" in str(exc_info.value)


def test_fetch_one_server_error():
"""Test fetch_one propagates non-404 errors."""
requester = MagicMock()
requester.get_path.return_value = "posts"

error = Exception("Server error")
error.response = MagicMock()
error.response.status_code = 500
requester.send_request.side_effect = error

record_selector = MagicMock()

retriever = SimpleRetriever(
name="posts",
primary_key="id",
requester=requester,
record_selector=record_selector,
parameters={},
config={},
)

with pytest.raises(Exception) as exc_info:
retriever.fetch_one("123", records_schema={})

assert "Server error" in str(exc_info.value)


def test_fetch_one_invalid_pk_type():
"""Test fetch_one with non-string pk_value (should fail type checking but test runtime behavior)."""
requester = MagicMock()
requester.get_path.return_value = "posts"

record_selector = MagicMock()

retriever = SimpleRetriever(
name="posts",
primary_key="id",
requester=requester,
record_selector=record_selector,
parameters={},
config={},
)

with pytest.raises(AttributeError):
retriever._fetch_one(123, records_schema={}) # type: ignore


def test_fetch_one_no_response():
"""Test fetch_one raises RecordNotFoundException when response is None."""
requester = MagicMock()
requester.get_path.return_value = "posts"
requester.send_request.return_value = None

record_selector = MagicMock()

retriever = SimpleRetriever(
name="posts",
primary_key="id",
requester=requester,
record_selector=record_selector,
parameters={},
config={},
)

with pytest.raises(RecordNotFoundException) as exc_info:
retriever.fetch_one("123", records_schema={})

assert "123" in str(exc_info.value)


def test_fetch_one_empty_records():
"""Test fetch_one raises RecordNotFoundException when no records are returned."""
requester = MagicMock()
requester.get_path.return_value = "posts"

response = requests.Response()
response.status_code = 200
response._content = json.dumps({}).encode("utf-8")

requester.send_request.return_value = response

record_selector = MagicMock()
record_selector.select_records.return_value = []

retriever = SimpleRetriever(
name="posts",
primary_key="id",
requester=requester,
record_selector=record_selector,
parameters={},
config={},
)

with pytest.raises(RecordNotFoundException) as exc_info:
retriever.fetch_one("123", records_schema={})

assert "123" in str(exc_info.value)