-
Notifications
You must be signed in to change notification settings - Fork 30
feat: Add Source.fetch_record() andStream.fetch_record() backed by SimpleRetriever._fetch_one()
#846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Add Source.fetch_record() andStream.fetch_record() backed by SimpleRetriever._fetch_one()
#846
Changes from 12 commits
38d13d3
052eb81
c581827
bab2781
a2e8e55
c0719e2
38f6dc3
0a71543
626848b
bf83bf1
691927b
8a52390
4386a35
28b9286
dfeeec2
82add1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |
| from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor | ||
| from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor | ||
| from airbyte_cdk.models import AirbyteMessage | ||
| from airbyte_cdk.sources.declarative.exceptions import RecordNotFoundException | ||
| from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector | ||
| from airbyte_cdk.sources.declarative.interpolation import InterpolatedString | ||
| from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( | ||
|
|
@@ -626,6 +627,102 @@ def _to_partition_key(to_serialize: Any) -> str: | |
| # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value | ||
| return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True) | ||
|
|
||
| def _fetch_one( | ||
| self, | ||
| pk_value: str, | ||
| records_schema: Mapping[str, Any], | ||
| ) -> Mapping[str, Any]: | ||
| """Fetch a single record by primary key value. | ||
|
|
||
| This method constructs a path by appending the primary key value to the base path | ||
| and sends a GET request to fetch a single record. It's designed for REST APIs that | ||
| follow the convention: GET /resource/{id} | ||
|
|
||
| Args: | ||
| pk_value: The primary key value to fetch as a string (e.g., "123") | ||
| records_schema: JSON schema describing the record structure | ||
|
|
||
| Returns: | ||
| The fetched record as a dict. | ||
|
|
||
| Raises: | ||
| RecordNotFoundException: If the response is empty/ignored or parsing yields no records. | ||
| Exception: HTTP errors (including 404) are propagated from requester's error handling. | ||
|
|
||
| Example: | ||
| record = retriever.fetch_one("123", schema) | ||
|
|
||
| Note: | ||
| This implementation uses convention-based path construction, appending /{pk_value} to the base path. (important-comment) | ||
|
|
||
| Alternative approaches that could be implemented in the future: (important-comment) | ||
| - Path template interpolation: Use a configurable template like "{base_path}/{id}" (important-comment) | ||
| See: https://github.com/airbytehq/airbyte-python-cdk/issues/833#phase-1a (important-comment) | ||
| - Field path configuration: Allow specifying which response field contains the record (important-comment) | ||
| for APIs that wrap single records in envelopes like {"data": {...}} (important-comment) | ||
| """ | ||
| # Single-record fetch doesn't involve partitioning, so we pass an empty StreamSlice | ||
| empty_stream_slice = StreamSlice(partition={}, cursor_slice={}) | ||
|
|
||
| # Get the base path from the requester | ||
| base_path = self.requester.get_path( | ||
| stream_state={}, | ||
| stream_slice=empty_stream_slice, | ||
| next_page_token=None, | ||
| ) | ||
|
|
||
| fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}" | ||
|
|
||
| # send_request() may return None when the error handler chooses to IGNORE a response | ||
| response: requests.Response | None = self.requester.send_request( | ||
| path=fetch_path, | ||
| stream_state={}, | ||
| stream_slice=empty_stream_slice, | ||
| next_page_token=None, | ||
| request_headers=self._request_headers( | ||
| stream_state={}, | ||
| stream_slice=empty_stream_slice, | ||
| next_page_token=None, | ||
| ), | ||
| request_params=self._request_params( | ||
| stream_state={}, | ||
| stream_slice=empty_stream_slice, | ||
| next_page_token=None, | ||
| ), | ||
| request_body_data=self._request_body_data( | ||
| stream_state={}, | ||
| stream_slice=empty_stream_slice, | ||
| next_page_token=None, | ||
| ), | ||
| request_body_json=self._request_body_json( | ||
| stream_state={}, | ||
| stream_slice=empty_stream_slice, | ||
| next_page_token=None, | ||
| ), | ||
| log_formatter=self.log_formatter, | ||
| ) | ||
|
|
||
| if not response: | ||
| raise RecordNotFoundException( | ||
| f"Record with primary key {pk_value} not found (no response)" | ||
| ) | ||
aaronsteers marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| records_iter: Iterable[Record] = self._parse_response( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section doesn't seem like it will work for a lot of cases. I feel like a common pattern is that for retrieving all objects like The way this is written assumes that hitting the single record Unless you think I'm missing something, it seems like we shouldn't be using the underlying extractor which is no longer suitable for individual records |
||
| response=response, | ||
| stream_state={}, | ||
| records_schema=records_schema, | ||
| stream_slice=empty_stream_slice, | ||
| next_page_token=None, | ||
| ) | ||
|
|
||
| first_record: Record | None = next(iter(records_iter), None) | ||
| if not first_record: | ||
| raise RecordNotFoundException( | ||
| f"Record with primary key {pk_value} not found (empty response)" | ||
| ) | ||
|
|
||
| return dict(first_record.data) | ||
|
|
||
|
|
||
| def _deep_merge( | ||
| target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| ) | ||
| from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth | ||
| from airbyte_cdk.sources.declarative.decoders import JsonDecoder | ||
| from airbyte_cdk.sources.declarative.exceptions import RecordNotFoundException | ||
| from airbyte_cdk.sources.declarative.extractors import DpathExtractor, HttpSelector, RecordSelector | ||
| from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter | ||
| from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, Paginator | ||
|
|
@@ -1644,3 +1645,162 @@ def _mock_paginator(): | |
| paginator.get_request_body_data.__name__ = "get_request_body_data" | ||
| paginator.get_request_body_json.__name__ = "get_request_body_json" | ||
| return paginator | ||
|
|
||
|
|
||
| def test_fetch_one_simple_pk(): | ||
| """Test fetch_one with a simple string primary key.""" | ||
| requester = MagicMock() | ||
| requester.get_path.return_value = "posts" | ||
|
|
||
| response = requests.Response() | ||
| response.status_code = 200 | ||
| response._content = json.dumps({"id": "123", "title": "Test Post"}).encode("utf-8") | ||
|
|
||
| requester.send_request.return_value = response | ||
|
|
||
| record_selector = MagicMock() | ||
| record_selector.select_records.return_value = [ | ||
| Record(data={"id": "123", "title": "Test Post"}, stream_name="posts", associated_slice=None) | ||
| ] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way this test is written, we aren't actually testing the selector/extractor logic. Most extractor/selector are defined to extract a list of objects (Stripe almost certainly does it this way) But here we just mock the single record response, and we mock calling |
||
|
|
||
| retriever = SimpleRetriever( | ||
| name="posts", | ||
| primary_key="id", | ||
| requester=requester, | ||
| record_selector=record_selector, | ||
| parameters={}, | ||
| config={}, | ||
| ) | ||
|
|
||
| result = retriever._fetch_one("123", records_schema={}) | ||
|
|
||
| assert result == {"id": "123", "title": "Test Post"} | ||
| requester.send_request.assert_called_once() | ||
| call_kwargs = requester.send_request.call_args[1] | ||
| assert call_kwargs["path"] == "posts/123" | ||
|
|
||
|
|
||
| def test_fetch_one_not_found(): | ||
| """Test fetch_one raises RecordNotFoundException when record is not found (404).""" | ||
| requester = MagicMock() | ||
| requester.get_path.return_value = "posts" | ||
|
|
||
| error = Exception("Not found") | ||
| error.response = MagicMock() | ||
| error.response.status_code = 404 | ||
| requester.send_request.side_effect = error | ||
|
|
||
| record_selector = MagicMock() | ||
|
|
||
| retriever = SimpleRetriever( | ||
| name="posts", | ||
| primary_key="id", | ||
| requester=requester, | ||
| record_selector=record_selector, | ||
| parameters={}, | ||
| config={}, | ||
| ) | ||
|
|
||
| with pytest.raises(RecordNotFoundException) as exc_info: | ||
| retriever._fetch_one("999", records_schema={}) | ||
|
|
||
| assert "999" in str(exc_info.value) | ||
|
|
||
|
|
||
| def test_fetch_one_server_error(): | ||
| """Test fetch_one propagates non-404 errors.""" | ||
| requester = MagicMock() | ||
| requester.get_path.return_value = "posts" | ||
|
|
||
| error = Exception("Server error") | ||
| error.response = MagicMock() | ||
| error.response.status_code = 500 | ||
| requester.send_request.side_effect = error | ||
|
|
||
| record_selector = MagicMock() | ||
|
|
||
| retriever = SimpleRetriever( | ||
| name="posts", | ||
| primary_key="id", | ||
| requester=requester, | ||
| record_selector=record_selector, | ||
| parameters={}, | ||
| config={}, | ||
| ) | ||
|
|
||
| with pytest.raises(Exception) as exc_info: | ||
| retriever.fetch_one("123", records_schema={}) | ||
|
|
||
| assert "Server error" in str(exc_info.value) | ||
|
|
||
|
|
||
| def test_fetch_one_invalid_pk_type(): | ||
| """Test fetch_one with non-string pk_value (should fail type checking but test runtime behavior).""" | ||
| requester = MagicMock() | ||
| requester.get_path.return_value = "posts" | ||
|
|
||
| record_selector = MagicMock() | ||
|
|
||
| retriever = SimpleRetriever( | ||
| name="posts", | ||
| primary_key="id", | ||
| requester=requester, | ||
| record_selector=record_selector, | ||
| parameters={}, | ||
| config={}, | ||
| ) | ||
|
|
||
| with pytest.raises(AttributeError): | ||
| retriever._fetch_one(123, records_schema={}) # type: ignore | ||
|
|
||
|
|
||
| def test_fetch_one_no_response(): | ||
| """Test fetch_one raises RecordNotFoundException when response is None.""" | ||
| requester = MagicMock() | ||
| requester.get_path.return_value = "posts" | ||
| requester.send_request.return_value = None | ||
|
|
||
| record_selector = MagicMock() | ||
|
|
||
| retriever = SimpleRetriever( | ||
| name="posts", | ||
| primary_key="id", | ||
| requester=requester, | ||
| record_selector=record_selector, | ||
| parameters={}, | ||
| config={}, | ||
| ) | ||
|
|
||
| with pytest.raises(RecordNotFoundException) as exc_info: | ||
| retriever.fetch_one("123", records_schema={}) | ||
|
|
||
| assert "123" in str(exc_info.value) | ||
|
|
||
|
|
||
| def test_fetch_one_empty_records(): | ||
| """Test fetch_one raises RecordNotFoundException when no records are returned.""" | ||
| requester = MagicMock() | ||
| requester.get_path.return_value = "posts" | ||
|
|
||
| response = requests.Response() | ||
| response.status_code = 200 | ||
| response._content = json.dumps({}).encode("utf-8") | ||
|
|
||
| requester.send_request.return_value = response | ||
|
|
||
| record_selector = MagicMock() | ||
| record_selector.select_records.return_value = [] | ||
|
|
||
| retriever = SimpleRetriever( | ||
| name="posts", | ||
| primary_key="id", | ||
| requester=requester, | ||
| record_selector=record_selector, | ||
| parameters={}, | ||
| config={}, | ||
| ) | ||
|
|
||
| with pytest.raises(RecordNotFoundException) as exc_info: | ||
| retriever.fetch_one("123", records_schema={}) | ||
|
|
||
| assert "123" in str(exc_info.value) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this is presumably going to be called from somewhere else outside of
SimpleRetriever, we should make this public. While I understand this is not meant to be public facing or used by the protocol interface, it still feels like bad practice for this be a private method.