-
Notifications
You must be signed in to change notification settings - Fork 30
feat: Add Source.fetch_record() andStream.fetch_record() backed by SimpleRetriever._fetch_one()
#846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Add Source.fetch_record() andStream.fetch_record() backed by SimpleRetriever._fetch_one()
#846
Changes from 10 commits
38d13d3
052eb81
c581827
bab2781
a2e8e55
c0719e2
38f6dc3
0a71543
626848b
bf83bf1
691927b
8a52390
4386a35
28b9286
dfeeec2
82add1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |
| from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor | ||
| from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor | ||
| from airbyte_cdk.models import AirbyteMessage | ||
| from airbyte_cdk.sources.declarative.exceptions import RecordNotFoundException | ||
| from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector | ||
| from airbyte_cdk.sources.declarative.interpolation import InterpolatedString | ||
| from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( | ||
|
|
@@ -626,6 +627,116 @@ def _to_partition_key(to_serialize: Any) -> str: | |
| # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value | ||
| return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True) | ||
|
|
||
| def fetch_one( | ||
| self, | ||
| pk_value: Union[str, Mapping[str, Any]], | ||
| records_schema: Mapping[str, Any], | ||
| ) -> Mapping[str, Any]: | ||
| """Fetch a single record by primary key value. | ||
|
|
||
| This method constructs a path by appending the primary key value to the base path | ||
| and sends a GET request to fetch a single record. It's designed for REST APIs that | ||
| follow the convention: GET /resource/{id} | ||
|
|
||
| Args: | ||
| pk_value: The primary key value to fetch. Can be: | ||
| - str: For simple single-field primary keys (e.g., "123") | ||
| - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) | ||
| records_schema: JSON schema describing the record structure | ||
|
|
||
| Returns: | ||
| The fetched record as a dict. | ||
|
|
||
| Raises: | ||
| RecordNotFoundException: If the response is empty/ignored or parsing yields no records. | ||
| ValueError: If pk_value is not a string or dict. | ||
| Exception: HTTP errors (including 404) are propagated from requester's error handling. | ||
|
|
||
| Example: | ||
| record = retriever.fetch_one("123", schema) | ||
|
|
||
| record = retriever.fetch_one({"company_id": "123", "property": "status"}, schema) | ||
|
|
||
| Note: | ||
| This implementation uses convention-based path construction (Option B from design). (important-comment) | ||
| For simple PKs: appends /{pk_value} to base path (important-comment) | ||
| For composite PKs: appends /{value1}/{value2}/... in key order (important-comment) | ||
|
|
||
| Alternative approaches that could be implemented in the future: (important-comment) | ||
| - Path template interpolation: Use a configurable template like "{base_path}/{id}" (important-comment) | ||
| See: https://github.com/airbytehq/airbyte-python-cdk/issues/833#phase-1a (important-comment) | ||
| - Field path configuration: Allow specifying which response field contains the record (important-comment) | ||
| for APIs that wrap single records in envelopes like {"data": {...}} (important-comment) | ||
| """ | ||
| # Get the base path from the requester | ||
| base_path = self.requester.get_path( | ||
| stream_state={}, | ||
| stream_slice=StreamSlice(partition={}, cursor_slice={}), | ||
| next_page_token=None, | ||
| ) | ||
|
|
||
| if isinstance(pk_value, str): | ||
| fetch_path = f"{base_path.rstrip('/')}/{str(pk_value).lstrip('/')}" | ||
| elif isinstance(pk_value, Mapping): | ||
| sorted_values = [str(pk_value[key]).lstrip("/") for key in sorted(pk_value.keys())] | ||
| pk_path_segment = "/".join(sorted_values) | ||
| fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}" | ||
| else: | ||
| raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}") | ||
|
|
||
| # Single-record fetch doesn't involve partitioning, so we pass an empty StreamSlice | ||
| stream_slice = StreamSlice(partition={}, cursor_slice={}) | ||
aaronsteers marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # send_request() may return None when the error handler chooses to IGNORE a response | ||
| response: requests.Response | None = self.requester.send_request( | ||
| path=fetch_path, | ||
| stream_state={}, | ||
| stream_slice=stream_slice, | ||
| next_page_token=None, | ||
| request_headers=self._request_headers( | ||
| stream_state={}, | ||
| stream_slice=stream_slice, | ||
| next_page_token=None, | ||
| ), | ||
| request_params=self._request_params( | ||
| stream_state={}, | ||
| stream_slice=stream_slice, | ||
| next_page_token=None, | ||
| ), | ||
| request_body_data=self._request_body_data( | ||
| stream_state={}, | ||
| stream_slice=stream_slice, | ||
| next_page_token=None, | ||
| ), | ||
| request_body_json=self._request_body_json( | ||
| stream_state={}, | ||
| stream_slice=stream_slice, | ||
| next_page_token=None, | ||
| ), | ||
| log_formatter=self.log_formatter, | ||
| ) | ||
|
|
||
| if not response: | ||
| raise RecordNotFoundException( | ||
| f"Record with primary key {pk_value} not found (no response)" | ||
| ) | ||
aaronsteers marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| records_iter: Iterable[Record] = self._parse_response( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section doesn't seem like it will work for a lot of cases. I feel like a common pattern is that for retrieving all objects like The way this is written assumes that hitting the single record Unless you think I'm missing something, it seems like we shouldn't be using the underlying extractor which is no longer suitable for individual records |
||
| response=response, | ||
| stream_state={}, | ||
| records_schema=records_schema, | ||
| stream_slice=stream_slice, | ||
| next_page_token=None, | ||
| ) | ||
|
|
||
| first_record: Record | None = next(iter(records_iter), None) | ||
| if not first_record: | ||
| raise RecordNotFoundException( | ||
| f"Record with primary key {pk_value} not found (empty response)" | ||
| ) | ||
|
|
||
| return dict(first_record.data) | ||
|
|
||
|
|
||
| def _deep_merge( | ||
| target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]] | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.