Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
38d13d3
feat: Add fetch_one/fetch_record functionality to SimpleRetriever
devin-ai-integration[bot] Nov 12, 2025
052eb81
update type hints
aaronsteers Nov 12, 2025
c581827
Change fetch_one to raise RecordNotFoundException instead of returnin…
devin-ai-integration[bot] Nov 12, 2025
bab2781
Fix ruff format issues
devin-ai-integration[bot] Nov 12, 2025
a2e8e55
Improve path construction robustness in fetch_one
devin-ai-integration[bot] Nov 12, 2025
c0719e2
refactor: Move fetch_record to concurrent declarative classes only
devin-ai-integration[bot] Nov 12, 2025
38f6dc3
fix: Address PR review comments
devin-ai-integration[bot] Nov 13, 2025
0a71543
fix: Simplify error handling in fetch_one
devin-ai-integration[bot] Nov 13, 2025
626848b
fix: Address PR review comments - add type hints and clarifying comments
devin-ai-integration[bot] Nov 13, 2025
bf83bf1
refactor: Use iterator instead of list for fetch_one record parsing
devin-ai-integration[bot] Nov 13, 2025
691927b
refactor: Simplify fetch_one/fetch_record to only accept str for pk_v…
devin-ai-integration[bot] Nov 13, 2025
8a52390
refactor: Rename fetch_one to _fetch_one to mark as internal API
devin-ai-integration[bot] Nov 13, 2025
4386a35
fix: Address PR feedback from brianjlai on fetch_one implementation
devin-ai-integration[bot] Nov 13, 2025
28b9286
Update unit_tests/sources/declarative/retrievers/test_simple_retrieve…
aaronsteers Nov 13, 2025
dfeeec2
Update unit_tests/sources/declarative/retrievers/test_simple_retrieve…
aaronsteers Nov 13, 2025
82add1b
Update unit_tests/sources/declarative/retrievers/test_simple_retrieve…
aaronsteers Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions airbyte_cdk/legacy/sources/declarative/declarative_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,29 @@ def get_cursor(self) -> Optional[Cursor]:
return self.retriever.cursor
return None

def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]:
"""
Fetch a single record by primary key value.

Args:
pk_value: The primary key value. Can be:
- str: For simple single-field primary keys (e.g., "123")
- Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"})

Returns:
The fetched record as a dict, or None if not found

Raises:
NotImplementedError: If the stream's retriever doesn't support fetching individual records
"""
if not isinstance(self.retriever, SimpleRetriever):
raise NotImplementedError(
f"Stream {self.name} does not support fetching individual records. "
"Only streams with SimpleRetriever currently support this operation."
)

return self.retriever.fetch_one(pk_value=pk_value, records_schema=self.get_json_schema())

def _get_checkpoint_reader(
self,
logger: logging.Logger,
Expand Down
31 changes: 31 additions & 0 deletions airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,34 @@ def stop_sync_on_stream_failure(self) -> bool:
on the first error seen and emit a single error trace message for that stream.
"""
return False

def fetch_record(
self, stream_name: str, pk_value: Any, config: Mapping[str, Any]
) -> Optional[Mapping[str, Any]]:
"""
Fetch a single record from a stream by primary key.

Args:
stream_name: Name of the stream to fetch from
pk_value: Primary key value to fetch. Can be:
- str: For simple single-field primary keys (e.g., "123")
- Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"})
config: Source configuration

Returns:
The fetched record as a dict, or None if not found

Raises:
ValueError: If the stream name is not found in the source
NotImplementedError: If the stream doesn't support fetching individual records
"""
stream_instances = {s.name: s for s in self.streams(config)}
stream = stream_instances.get(stream_name)

if not stream:
raise ValueError(
f"Stream '{stream_name}' not found in source. "
f"Available streams: {', '.join(stream_instances.keys())}"
)

return stream.fetch_record(pk_value)
117 changes: 117 additions & 0 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,123 @@ def _to_partition_key(to_serialize: Any) -> str:
# separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

def fetch_one(
self,
pk_value: Union[str, Mapping[str, Any]],
records_schema: Mapping[str, Any],
) -> Optional[Mapping[str, Any]]:
"""
Fetch a single record by primary key value.

This method constructs a path by appending the primary key value to the base path
and sends a GET request to fetch a single record. It's designed for REST APIs that
follow the convention: GET /resource/{id}

Args:
pk_value: The primary key value to fetch. Can be:
- str: For simple single-field primary keys (e.g., "123")
- Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"})
records_schema: JSON schema describing the record structure

Returns:
The fetched record as a dict, or None if not found (404 response)

Raises:
Exception: For non-404 HTTP errors (propagated from requester's error handling)

Example:
record = retriever.fetch_one("123", schema)

record = retriever.fetch_one({"company_id": "123", "property": "status"}, schema)

Note:
This implementation uses convention-based path construction (Option B from design). (important-comment)
For simple PKs: appends /{pk_value} to base path (important-comment)
For composite PKs: appends /{value1}/{value2}/... in key order (important-comment)

Alternative approaches that could be implemented in the future: (important-comment)
- Path template interpolation: Use a configurable template like "{base_path}/{id}" (important-comment)
See: https://github.com/airbytehq/airbyte-python-cdk/issues/833#phase-1a (important-comment)
- Field path configuration: Allow specifying which response field contains the record (important-comment)
for APIs that wrap single records in envelopes like {"data": {...}} (important-comment)
"""
# Get the base path from the requester
base_path = self.requester.get_path(
stream_state={},
stream_slice=StreamSlice(partition={}, cursor_slice={}),
next_page_token=None,
)

if isinstance(pk_value, str):
fetch_path = f"{base_path}/{pk_value}".replace("//", "/")
elif isinstance(pk_value, Mapping):
sorted_values = [str(pk_value[key]) for key in sorted(pk_value.keys())]
pk_path_segment = "/".join(sorted_values)
fetch_path = f"{base_path}/{pk_path_segment}".replace("//", "/")
else:
raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}")

stream_slice = StreamSlice(partition={}, cursor_slice={})

try:
response = self.requester.send_request(
path=fetch_path,
stream_state={},
stream_slice=stream_slice,
next_page_token=None,
request_headers=self._request_headers(
stream_state={},
stream_slice=stream_slice,
next_page_token=None,
),
request_params=self._request_params(
stream_state={},
stream_slice=stream_slice,
next_page_token=None,
),
request_body_data=self._request_body_data(
stream_state={},
stream_slice=stream_slice,
next_page_token=None,
),
request_body_json=self._request_body_json(
stream_state={},
stream_slice=stream_slice,
next_page_token=None,
),
log_formatter=self.log_formatter,
)
except Exception as e:
# Check if this is a 404 (record not found) - return None
if hasattr(e, "response") and hasattr(e.response, "status_code"):
if e.response.status_code == 404:
return None
raise

if not response:
return None

records = list(
self._parse_response(
response=response,
stream_state={},
records_schema=records_schema,
stream_slice=stream_slice,
next_page_token=None,
)
)

# Return the first record if found, None otherwise
if records:
first_record = records[0]
if isinstance(first_record, Record):
return dict(first_record.data)
elif isinstance(first_record, Mapping):
return dict(first_record)
else:
return None
return None


def _deep_merge(
target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]]
Expand Down
20 changes: 20 additions & 0 deletions airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,26 @@ def get_cursor(self) -> Optional[Cursor]:
"""
return self.cursor

def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]:
"""
Fetch a single record by primary key value.

Args:
pk_value: The primary key value. Can be:
- str: For simple single-field primary keys (e.g., "123")
- Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"})

Returns:
The fetched record as a dict, or None if not found

Raises:
NotImplementedError: If the stream doesn't support fetching individual records
"""
raise NotImplementedError(
f"Stream {self.name} does not support fetching individual records. "
"Only declarative streams with SimpleRetriever currently support this operation."
)

def _get_checkpoint_reader(
self,
logger: logging.Logger,
Expand Down
Loading
Loading