Skip to content

Commit 8dabe3d

Browse files
feat: Add get_record() method to fetch individual records by primary key
- Add LookupValue type alias for primary key values - Add get_record() method to Source class (keyword-only pk_value arg) - Add helper methods _get_stream_primary_key() and _normalize_and_validate_pk() - Add fetch_record() method to DeclarativeExecutor - Support both direct PK values and dict input with validation - Dict validation ensures single entry and key matches stream's primary key - Only supported for declarative (YAML-based) sources - Add comprehensive unit tests (16 tests, all passing) This builds on airbytehq/airbyte-python-cdk#846 Co-Authored-By: AJ Steers <[email protected]>
1 parent 7eb746b commit 8dabe3d

File tree

3 files changed

+489
-0
lines changed

3 files changed

+489
-0
lines changed

airbyte/_executors/declarative.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,3 +140,34 @@ def install(self) -> None:
140140
def uninstall(self) -> None:
141141
"""No-op. The declarative source is included with PyAirbyte."""
142142
pass
143+
144+
def fetch_record(
145+
self,
146+
stream_name: str,
147+
pk_value: str,
148+
config: dict[str, Any] | None = None,
149+
) -> dict[str, Any]:
150+
"""Fetch a single record from a stream by primary key.
151+
152+
Args:
153+
stream_name: Name of the stream to fetch from
154+
pk_value: Primary key value as a string
155+
config: Source configuration (optional, uses instance config if not provided)
156+
157+
Returns:
158+
The fetched record as a dict
159+
160+
Raises:
161+
ValueError: If the stream name is not found
162+
NotImplementedError: If the stream doesn't support fetching individual records
163+
RecordNotFoundException: If the record is not found
164+
"""
165+
merged_config = {**self._config_dict}
166+
if config:
167+
merged_config.update(config)
168+
169+
return self.declarative_source.fetch_record(
170+
stream_name=stream_name,
171+
pk_value=pk_value,
172+
config=merged_config,
173+
)

airbyte/sources/base.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@
6464
)
6565

6666

67+
LookupValue = Any
68+
69+
6770
class Source(ConnectorBase): # noqa: PLR0904
6871
"""A class representing a source that can be called."""
6972

@@ -577,6 +580,154 @@ def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]
577580
progress_tracker=progress_tracker,
578581
)
579582

583+
def _get_stream_primary_key(self, stream_name: str) -> list[str]:
584+
"""Get the primary key for a stream.
585+
586+
Returns the primary key from the configured catalog, which includes user overrides.
587+
The Airbyte protocol represents primary keys as a list of field paths (list of lists).
588+
This method flattens single-field paths to simple strings.
589+
590+
Args:
591+
stream_name: Name of the stream
592+
593+
Returns:
594+
List of primary key field names (flattened from field paths)
595+
596+
Raises:
597+
PyAirbyteInputError: If the stream doesn't exist
598+
"""
599+
configured_catalog = self.get_configured_catalog(streams=[stream_name])
600+
601+
if len(configured_catalog.streams) == 0:
602+
raise exc.PyAirbyteInputError(
603+
message="Stream name does not exist in catalog.",
604+
input_value=stream_name,
605+
)
606+
607+
configured_stream = configured_catalog.streams[0]
608+
pk_field_paths = configured_stream.primary_key or []
609+
610+
return [field_path[0] for field_path in pk_field_paths if field_path]
611+
612+
def _normalize_and_validate_pk(
613+
self,
614+
stream_name: str,
615+
pk_value: LookupValue | dict[str, LookupValue],
616+
) -> str:
617+
"""Normalize and validate primary key input.
618+
619+
Accepts either a direct primary key value or a dict with a single entry
620+
where the key matches the stream's primary key field name.
621+
622+
Args:
623+
stream_name: Name of the stream
624+
pk_value: Either a direct PK value or a dict with single entry
625+
626+
Returns:
627+
The primary key value as a string
628+
629+
Raises:
630+
PyAirbyteInputError: If validation fails
631+
NotImplementedError: If the stream has composite or no primary key
632+
"""
633+
primary_key = self._get_stream_primary_key(stream_name)
634+
635+
if len(primary_key) == 0:
636+
raise NotImplementedError(
637+
f"Stream '{stream_name}' does not have a primary key defined. "
638+
"Cannot fetch individual records without a primary key."
639+
)
640+
641+
if len(primary_key) > 1:
642+
raise NotImplementedError(
643+
f"Stream '{stream_name}' has a composite primary key {primary_key}. "
644+
"Fetching by composite primary key is not yet supported."
645+
)
646+
647+
pk_field = primary_key[0]
648+
649+
if isinstance(pk_value, dict):
650+
if len(pk_value) != 1:
651+
raise exc.PyAirbyteInputError(
652+
message="When passing a dict for pk_value, it must contain exactly one entry.",
653+
input_value=pk_value,
654+
context={
655+
"stream_name": stream_name,
656+
"expected_entries": 1,
657+
"actual_entries": len(pk_value),
658+
},
659+
)
660+
661+
dict_key = next(iter(pk_value.keys()))
662+
dict_value = pk_value[dict_key]
663+
664+
if dict_key != pk_field:
665+
raise exc.PyAirbyteInputError(
666+
message="The key in the pk_value dict does not match the stream's primary key.",
667+
input_value=dict_key,
668+
context={
669+
"stream_name": stream_name,
670+
"expected_key": pk_field,
671+
"actual_key": dict_key,
672+
},
673+
)
674+
675+
return str(dict_value)
676+
677+
return str(pk_value)
678+
679+
def get_record(
680+
self,
681+
stream_name: str,
682+
*,
683+
pk_value: LookupValue | dict[str, LookupValue],
684+
) -> dict[str, Any]:
685+
"""Fetch a single record from a stream by primary key.
686+
687+
This method is only supported for declarative (YAML-based) sources.
688+
689+
Args:
690+
stream_name: Name of the stream to fetch from
691+
pk_value: Either a direct primary key value (e.g., "123") or a dict
692+
with a single entry where the key is the primary key field name
693+
and the value is the primary key value (e.g., {"id": "123"}).
694+
The dict form provides explicit validation that you're using
695+
the correct primary key field.
696+
697+
Returns:
698+
The fetched record as a dict
699+
700+
Raises:
701+
NotImplementedError: If the source is not a declarative source, or if
702+
the stream has a composite primary key or no primary key
703+
PyAirbyteInputError: If the stream doesn't exist or pk_value validation fails
704+
RecordNotFoundException: If the record is not found (from CDK)
705+
706+
Example:
707+
```python
708+
source = get_source("source-rest-api-tutorial", config=config)
709+
710+
record = source.get_record("users", pk_value="123")
711+
712+
record = source.get_record("users", pk_value={"id": "123"})
713+
```
714+
"""
715+
from airbyte._executors.declarative import DeclarativeExecutor
716+
717+
if not isinstance(self.executor, DeclarativeExecutor):
718+
raise NotImplementedError(
719+
f"get_record() is only supported for declarative sources. "
720+
f"This source uses {type(self.executor).__name__}."
721+
)
722+
723+
validated_pk_str = self._normalize_and_validate_pk(stream_name, pk_value)
724+
725+
return self.executor.fetch_record(
726+
stream_name=stream_name,
727+
pk_value=validated_pk_str,
728+
config=self._config_dict,
729+
)
730+
580731
def get_documents(
581732
self,
582733
stream: str,

0 commit comments

Comments
 (0)