Skip to content

Conversation

@aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Nov 12, 2025

Summary

Adds the ability to fetch individual records by primary key from declarative concurrent connectors. This implements Phase 1a of #833 for PyAirbyte's use case with ConcurrentDeclarativeSource.

What's being added to main:

  • ConcurrentDeclarativeSource.fetch_record(stream_name, pk_value, config) - Primary API for fetching single records
  • SimpleRetriever.fetch_one(pk_value, records_schema) - Underlying HTTP GET mechanism
  • AbstractStream.fetch_record(pk_value) - Interface method (raises NotImplementedError by default)
  • RecordNotFoundException - Exception raised when record is not found
  • 6 unit tests covering happy path, 404s, server errors, invalid inputs, and empty responses

Key implementation details:

  • Accepts string primary keys only (e.g., "123")
  • Uses convention-based path construction: appends /{pk_value} to base path
  • Raises RecordNotFoundException for empty/ignored responses
  • HTTP errors (including 404) propagate from requester's error handling
  • Only supports streams with SimpleRetriever (raises NotImplementedError for others)

Review & Testing Checklist for Human

⚠️ CRITICAL - This PR has NOT been tested end-to-end with a real connector:

  • Test with a real declarative connector - The ConcurrentDeclarativeSource.fetch_record() method reconstructs components from the manifest but has never been tested with an actual connector. Test with a simple declarative connector (e.g., JSONPlaceholder, PokeAPI) to verify it works.

  • Verify component reconstruction - The logic that creates declarative_stream from stream_config (lines 552-557 in concurrent_declarative_source.py) has never been exercised. Verify it correctly handles:

    • Standard streams with SimpleRetriever
    • Dynamic streams
    • Streams with custom schema loaders
    • Streams without SimpleRetriever (should raise NotImplementedError)
  • Test error handling - Verify that:

    • 404 responses propagate as exceptions (not caught as RecordNotFoundException)
    • Empty responses raise RecordNotFoundException
    • Server errors (5xx) propagate correctly
    • Invalid stream names raise ValueError with helpful message
  • Verify path construction - Test that the simple base_path + "/" + pk_value pattern works for your target use cases. This may not work for:

    • APIs that use query parameters for IDs (e.g., ?id=123)
    • Nested resources (e.g., /companies/{id}/properties/{prop})
    • APIs with non-standard path patterns

Recommended Test Plan

from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource

# Load a simple declarative manifest
source = ConcurrentDeclarativeSource(
    config={"api_key": "..."},
    source_config=manifest_dict,
)

# Test happy path
record = source.fetch_record("posts", "1", config)
assert record["id"] == "1"

# Test 404 (should propagate as exception, not RecordNotFoundException)
try:
    source.fetch_record("posts", "999999", config)
except Exception as e:
    print(f"404 error: {e}")

# Test invalid stream
try:
    source.fetch_record("nonexistent", "1", config)
except ValueError as e:
    assert "not found in source" in str(e)

Notes

  • All 6 unit tests pass (covering SimpleRetriever.fetch_one scenarios)
  • Ruff linter and formatter pass
  • No integration tests exist for ConcurrentDeclarativeSource.fetch_record
  • Only supports SimpleRetriever - other retriever types will raise NotImplementedError

Requested by: AJ Steers ([email protected], @aaronsteers)
Devin Session: https://app.devin.ai/sessions/e0d627ac7e89447a9ebddd515104e2c3

Summary by CodeRabbit

  • New Features

    • Added capability to fetch individual records by primary key from declarative data sources, with built-in error handling for not-found and server error scenarios.
  • Tests

    • Added comprehensive unit test suite covering record retrieval, error handling, and edge cases.

Implements Phase 1 of GitHub issue #833

Changes:
- Add fetch_one() method to SimpleRetriever for fetching single records by PK
- Add fetch_record() base method to Stream class
- Implement fetch_record() in DeclarativeStream to delegate to retriever
- Add fetch_record() helper method to AbstractSource
- Add comprehensive unit tests for fetch_one functionality

The implementation uses convention-based path construction (appending PK value
to base path) and supports both simple string PKs and composite dict PKs.
Handles 404 responses gracefully by returning None.

Co-Authored-By: AJ Steers <[email protected]>
Copilot AI review requested due to automatic review settings November 12, 2025 19:46
@devin-ai-integration
Copy link
Contributor

Original prompt from AJ Steers
Received message in Slack channel #ask-devin-ai:

@Devin - Let's create a plan of action for the first phase of the project defined here: <https://github.com/airbytehq/airbyte-python-cdk/issues/833>

Specifically, for now we ONLY want to add a "get_one" implementation to the existing SimpleRetriever implementation - ignoring all other requirements listed there and not (yet) implementing a new retriever class.
We don't care yet about CLI-based invocation but we need a "fetch_record(pk_value: Any) -&gt; dict:" API method on the Source or the Stream.
Thread URL: https://airbytehq-team.slack.com/archives/C08BHPUMEPJ/p1762969856048879

@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions github-actions bot added the enhancement New feature or request label Nov 12, 2025
@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1762976446-add-fetch-one-to-simple-retriever#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1762976446-add-fetch-one-to-simple-retriever

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements Phase 1 of the "get_one" functionality to fetch individual records by primary key from declarative streams using SimpleRetriever. The implementation adds a convention-based approach where the primary key value is appended to the base path (e.g., /posts/123 for simple PKs, /companies/123/status for composite PKs sorted alphabetically).

Key changes:

  • Added fetch_one() method to SimpleRetriever with convention-based path construction
  • Added fetch_record() methods to Stream and DeclarativeStream classes to expose the functionality
  • Added helper method in AbstractSource for convenient access across streams

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
unit_tests/sources/declarative/retrievers/test_simple_retriever.py Comprehensive test coverage for fetch_one() including simple PKs, composite PKs, 404 handling, error propagation, and edge cases
airbyte_cdk/sources/streams/core.py Base fetch_record() method added to Stream class that raises NotImplementedError by default
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py Core fetch_one() implementation with convention-based path construction and error handling
airbyte_cdk/sources/abstract_source.py Convenience method for fetching records by stream name
airbyte_cdk/legacy/sources/declarative/declarative_stream.py DeclarativeStream implementation that delegates to SimpleRetriever's fetch_one()

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 12, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

📝 Walkthrough

Walkthrough

Adds single-record fetch support: a _fetch_one method on SimpleRetriever, a RecordNotFoundException, ConcurrentDeclarativeSource.fetch_record() to expose the capability, and an AbstractStream.fetch_record() interface with a NotImplementedError default.

Changes

Cohort / File(s) Summary
Exception Definition
airbyte_cdk/sources/declarative/exceptions.py
Adds RecordNotFoundException (subclass of ReadException) for not-found cases.
Core Retriever Implementation
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Adds _fetch_one(self, pk_value: str, records_schema: Mapping[str, Any]) -> Mapping[str, Any]: constructs fetch path by appending primary key to base path, requests the single record via requester, raises RecordNotFoundException on 404/None/empty results, parses response and returns the first record; imports new exception.
Public API Surface
airbyte_cdk/sources/declarative/concurrent_declarative_source.py, airbyte_cdk/sources/streams/concurrent/abstract_stream.py
Adds ConcurrentDeclarativeSource.fetch_record(stream_name, pk_value, config=None) which resolves the stream and delegates to SimpleRetriever._fetch_one; adds AbstractStream.fetch_record(pk_value) default raising NotImplementedError.
Unit Tests
unit_tests/sources/declarative/retrievers/test_simple_retriever.py
Adds tests covering successful fetch, 404 -> RecordNotFoundException, server error propagation, invalid PK type, None response, and empty-records handling.

Sequence Diagram

sequenceDiagram
    participant User
    participant CDS as ConcurrentDeclarativeSource
    participant SR as SimpleRetriever
    participant HTTP as HTTP Client

    User->>CDS: fetch_record(stream_name, pk_value)
    activate CDS
    CDS->>CDS: resolve stream, retriever & schema
    CDS->>SR: _fetch_one(pk_value, schema)
    deactivate CDS

    activate SR
    SR->>SR: build path = base_path + "/" + pk_value
    SR->>HTTP: send request -> GET /{path}
    activate HTTP
    HTTP-->>SR: 200 / 404 / error
    deactivate HTTP

    alt 404 / no response / empty list
        SR->>SR: raise RecordNotFoundException
    else 200 Success
        SR->>SR: parse response -> records list
        SR->>SR: return first record (mapping)
    else other error
        SR->>SR: propagate error
    end
    deactivate SR

    SR-->>CDS: record or exception
    CDS-->>User: record or exception
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Pay attention to path construction and URL-escaping when concatenating pk_value.
  • Verify 404 / None / empty-list consistently map to RecordNotFoundException while other HTTP errors propagate.
  • Confirm unit tests exercise parsing and all error branches—wdyt?

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 73.91% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: adding fetch_record() methods to Source and Stream classes, backed by SimpleRetriever._fetch_one().
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1762976446-add-fetch-one-to-simple-retriever

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)

205-227: Consider validating pk_value format against the stream's primary_key definition?

The method accepts both string and dict pk_value but doesn't validate whether the format matches this stream's actual self.primary_key. For instance, if a stream has primary_key = ["company_id", "property"] but a caller passes a simple string "123", this will proceed to call retriever.fetch_one() which might construct an incorrect path.

Should we add a check here to ensure pk_value format aligns with the primary_key definition? For example:

  • If self.primary_key is a string or single-element list, require pk_value to be a string
  • If self.primary_key is a multi-element list, require pk_value to be a dict with matching keys

This would catch mismatches early with a clearer error message. Wdyt?

Additionally, consider adding debug logging when fetching records to aid troubleshooting:

 def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]:
     """..."""
     if not isinstance(self.retriever, SimpleRetriever):
         raise NotImplementedError(...)
     
+    self.logger.debug(f"Fetching record from stream {self.name} with pk_value: {pk_value}")
     return self.retriever.fetch_one(pk_value=pk_value, records_schema=self.get_json_schema())
airbyte_cdk/sources/abstract_source.py (1)

328-357: Consider caching stream instances to avoid repeated initialization?

The method calls self.streams(config) on every invocation (line 348), which could be expensive if stream initialization involves API calls or heavy computation. Since this is a new "fetch by PK" use case that might be called frequently (e.g., in a loop fetching related records), this could become a performance bottleneck.

Would it make sense to cache the stream instances, perhaps using an LRU cache keyed by a hash of the config? Something like:

@lru_cache(maxsize=8)
def _get_stream_instances(self, config_hash: str) -> Dict[str, Stream]:
    # config_hash would be a frozen/hashable representation of config
    return {s.name: s for s in self.streams(config)}

This would maintain the existing behavior for the first call but speed up subsequent calls. Wdyt?

Alternatively, if the current behavior is intentional (always get fresh stream instances), that's fine too - just wanted to flag the potential performance consideration for Phase 2 optimizations.

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)

676-683: Path construction for composite PKs: Document the key ordering assumption?

For composite primary keys (lines 679-681), the implementation sorts keys alphabetically and joins their values with "/". For example, {"company_id": "123", "property": "status"} becomes the path segment "123/status" (because "company_id" < "property" alphabetically).

This convention-based approach is pragmatic for Phase 1, and the inline comments (lines 659-667) appropriately acknowledge that future phases might need configurable path templates. However, I'm wondering if we should make the key ordering even more explicit in the docstring examples?

Currently the docstring shows:

record = retriever.fetch_one({"company_id": "123", "property": "status"}, schema)

Could we add a note like: "Note: For composite PKs, values are ordered alphabetically by key name" to make this behavior crystal clear for API developers? Wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0443aa and 38d13d3.

📒 Files selected for processing (5)
  • airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1 hunks)
  • airbyte_cdk/sources/abstract_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1 hunks)
  • airbyte_cdk/sources/streams/core.py (1 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (3)
airbyte_cdk/sources/declarative/requesters/requester.py (2)
  • get_path (62-71)
  • send_request (138-156)
airbyte_cdk/sources/types.py (3)
  • Record (21-72)
  • data (35-36)
  • associated_slice (39-40)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
  • name (118-126)
  • name (129-131)
  • fetch_one (629-744)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
airbyte_cdk/sources/abstract_source.py (1)
  • fetch_record (328-357)
airbyte_cdk/sources/streams/core.py (2)
  • fetch_record (466-484)
  • name (139-143)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
  • SimpleRetriever (59-744)
  • name (118-126)
  • name (129-131)
  • fetch_one (629-744)
airbyte_cdk/sources/abstract_source.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
  • fetch_record (205-226)
  • name (93-97)
  • name (100-102)
airbyte_cdk/sources/streams/core.py (2)
  • fetch_record (466-484)
  • name (139-143)
airbyte_cdk/sources/streams/core.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
  • fetch_record (205-226)
  • name (93-97)
  • name (100-102)
airbyte_cdk/sources/abstract_source.py (1)
  • fetch_record (328-357)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
  • get_path (195-208)
  • send_request (446-484)
  • _request_headers (304-325)
  • _request_params (327-356)
  • _request_body_data (358-382)
  • _request_body_json (384-407)
airbyte_cdk/sources/declarative/requesters/requester.py (2)
  • get_path (62-71)
  • send_request (138-156)
airbyte_cdk/sources/types.py (6)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • keys (137-138)
  • Record (21-72)
  • data (35-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)

1649-1843: Excellent test coverage!

The seven test cases provide comprehensive coverage of the fetch_one() functionality:

  • Simple and composite primary keys with proper path construction validation
  • 404 handling (returns None as expected)
  • Non-404 error propagation
  • Type validation for pk_value
  • Edge cases (None response, empty records)

The tests clearly document the expected behavior and will catch regressions. Nice work!

airbyte_cdk/sources/streams/core.py (1)

466-484: LGTM - Clean base implementation!

The default fetch_record() implementation in the Stream base class is well-designed:

  • Clear NotImplementedError with an informative message indicating which stream types support this operation
  • Comprehensive docstring with examples for both simple and composite primary keys
  • Appropriate signature accepting Any for pk_value to accommodate different PK formats

This establishes a good foundation for the new functionality.

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)

715-720: Verify 404 detection robustness across different exception types?

The 404 detection logic (lines 717-719) checks hasattr(e, "response") and hasattr(e.response, "status_code") before accessing e.response.status_code. This works well for requests library exceptions, but could potentially miss 404s from other HTTP client implementations or custom error handlers.

Have you verified this works correctly with the requester's error handling? Specifically:

  1. Does self.requester.send_request() always wrap HTTP errors in exceptions with a .response attribute?
  2. Are there any custom error handlers in the declarative framework that might transform exceptions?

If we're confident the requester always uses requests library exceptions (or compatible ones), this is fine. Otherwise, we might want to be more defensive:

except Exception as e:
    # Check if this is a 404 (record not found) - return None
    status_code = None
    if hasattr(e, "response"):
        status_code = getattr(e.response, "status_code", None)
    elif hasattr(e, "status_code"):
        status_code = e.status_code
    
    if status_code == 404:
        return None
    raise

Wdyt?


629-744: Overall: Solid Phase 1 implementation with clear documentation!

The fetch_one() implementation is well-structured and includes:

  • ✅ Clear path construction logic for simple and composite PKs
  • ✅ Appropriate 404 handling (returns None) vs. error propagation
  • ✅ Reuse of existing request options and response parsing machinery
  • ✅ Good type checking with ValueError for invalid pk_value types
  • ✅ Comprehensive docstring with examples and future considerations
  • ✅ Inline comments documenting alternative approaches for future phases

The convention-based approach (appending PK values to base path) is reasonable for Phase 1, and you've appropriately documented the limitations and future enhancement possibilities.

One additional thought: Since this is Phase 1 and you mention testing against real declarative connectors in the PR description, it would be valuable to document any connectors you've tested this with (either in the PR description or as a comment). This helps future developers understand which path construction patterns have been validated in practice.

@github-actions
Copy link

github-actions bot commented Nov 12, 2025

PyTest Results (Fast)

3 818 tests  +7   3 806 ✅ +6   6m 28s ⏱️ -44s
    1 suites ±0      12 💤 +1 
    1 files   ±0       0 ❌ ±0 

Results for commit a2e8e55. ± Comparison against base commit f0443aa.

This pull request skips 1 test.
unit_tests.sources.declarative.test_concurrent_declarative_source ‑ test_read_with_concurrent_and_synchronous_streams

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Nov 12, 2025

PyTest Results (Full)

3 821 tests  +7   3 809 ✅ +7   11m 9s ⏱️ +5s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit a2e8e55. ± Comparison against base commit f0443aa.

♻️ This comment has been updated with latest results.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)

205-208: Type hint could be more precise and consistent, wdyt?

The type hint Any | Mapping[str, Any] is redundant since Any already encompasses Mapping[str, Any]. Additionally:

  • Your docstring indicates pk_value should be str for simple PKs (line 214), but the type hint says Any
  • SimpleRetriever.fetch_one expects Union[str, Mapping[str, Any]], which more accurately reflects the actual constraint
  • The rest of this file uses Union syntax (see line 50), not the | operator

Consider aligning the type hint with both the documentation and fetch_one's signature:

     def fetch_record(
         self,
-        pk_value: Any | Mapping[str, Any],
+        pk_value: Union[str, Mapping[str, Any]],
     ) -> Optional[Mapping[str, Any]]:

This would match the docstring, improve type safety, and maintain consistency with the rest of the file.

Note: The same pattern appears in AbstractSource.fetch_record. If you update here, consider updating there as well for consistency across the codebase.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 38d13d3 and 052eb81.

📒 Files selected for processing (2)
  • airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1 hunks)
  • airbyte_cdk/sources/abstract_source.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/abstract_source.py
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
airbyte_cdk/sources/abstract_source.py (1)
  • fetch_record (328-360)
airbyte_cdk/sources/streams/core.py (3)
  • fetch_record (466-484)
  • name (139-143)
  • get_json_schema (296-304)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
  • SimpleRetriever (59-744)
  • name (118-126)
  • name (129-131)
  • fetch_one (629-744)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)

223-229: Implementation looks solid!

The runtime type check appropriately guards the delegation, the error message is clear and consistent with the base class, and passing self.get_json_schema() to fetch_one ensures the retriever has the schema context it needs for parsing.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)

677-684: Path construction should normalize slashes more robustly, wdyt?

The current implementation uses .replace("//", "/") to handle double slashes, but this approach has limitations as noted in past review comments:

  • Doesn't handle triple slashes or more: "///" becomes "//"
  • Fails if base_path has trailing slash AND pk_value starts with slash
  • Example: "posts/" + "/123""posts//123""posts/123" ✓ (works)
  • Example: "posts//" + "//123""posts////123""posts///123" ✗ (broken)

As suggested in previous reviews, normalizing with rstrip('/') is more reliable:

     if isinstance(pk_value, str):
-        fetch_path = f"{base_path}/{pk_value}".replace("//", "/")
+        fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}"
     elif isinstance(pk_value, Mapping):
         sorted_values = [str(pk_value[key]) for key in sorted(pk_value.keys())]
         pk_path_segment = "/".join(sorted_values)
-        fetch_path = f"{base_path}/{pk_path_segment}".replace("//", "/")
+        fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}"

This ensures exactly one slash between segments regardless of how base_path and pk_value are formatted.

🧹 Nitpick comments (6)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)

1683-1720: Consider verifying alphabetical key ordering explicitly, wdyt?

The test uses a composite PK with {"company_id": "123", "property": "status"} and expects path companies/123/status. While this works because "company_id" < "property" alphabetically, it would be clearer to test the sorting behavior explicitly.

Perhaps add an assertion or comment that makes the alphabetical ordering expectation explicit:

# Test verifies alphabetical key ordering: company_id comes before property
result = retriever.fetch_one({"company_id": "123", "property": "status"}, records_schema={})

Or test with keys intentionally out of order to prove sorting works:

# Keys intentionally reversed to verify sorting
result = retriever.fetch_one({"property": "status", "company_id": "123"}, records_schema={})
# Should still produce: companies/123/status (sorted order)
airbyte_cdk/sources/abstract_source.py (1)

328-360: Consider the performance implications of calling self.streams(config) on each fetch, wdyt?

The implementation looks correct and the error handling is good. However, calling self.streams(config) on every fetch_record invocation could be expensive if stream initialization involves API calls or heavy computation.

A couple of options to consider:

  1. If this method is called frequently in a loop, you might want to cache the stream instances or allow the caller to pass them:

    def fetch_record(
        self,
        stream_name: str,
        pk_value: Any | Mapping[str, Any],
        config: Mapping[str, Any],
        stream_instances: Optional[Dict[str, Stream]] = None,
    ) -> Optional[Mapping[str, Any]]:
        if stream_instances is None:
            stream_instances = {s.name: s for s in self.streams(config)}
        # ... rest of logic
  2. Add debug logging for observability when fetching individual records:

    logger.debug(f"Fetching record from stream '{stream_name}' with pk_value: {pk_value}")

That said, if fetch_record is used infrequently (e.g., for one-off lookups), the current implementation is fine as-is.

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)

683-684: Consider validating that pk_value is not empty, wdyt?

The code raises a clear ValueError for invalid types, but doesn't check for empty strings or empty dicts. This could lead to malformed paths like "posts/" or unexpected behavior.

You might want to add validation:

if isinstance(pk_value, str):
    if not pk_value.strip():
        raise ValueError("pk_value cannot be an empty string")
    fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}"
elif isinstance(pk_value, Mapping):
    if not pk_value:
        raise ValueError("pk_value cannot be an empty dict")
    sorted_values = [str(pk_value[key]) for key in sorted(pk_value.keys())]
    pk_path_segment = "/".join(sorted_values)
    fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}"

688-715: Consider extracting request options into a helper to reduce repetition, wdyt?

Lines 694-713 repeat the same pattern four times: calling self._request_* with identical arguments. While not incorrect, this repetition could be reduced for maintainability.

You could extract this into a helper:

def _get_request_options_for_fetch(
    self, stream_slice: StreamSlice
) -> dict:
    """Get all request options for a fetch_one operation."""
    return {
        "request_headers": self._request_headers(
            stream_state={},
            stream_slice=stream_slice,
            next_page_token=None,
        ),
        "request_params": self._request_params(
            stream_state={},
            stream_slice=stream_slice,
            next_page_token=None,
        ),
        "request_body_data": self._request_body_data(
            stream_state={},
            stream_slice=stream_slice,
            next_page_token=None,
        ),
        "request_body_json": self._request_body_json(
            stream_state={},
            stream_slice=stream_slice,
            next_page_token=None,
        ),
    }

Then use it:

response = self.requester.send_request(
    path=fetch_path,
    stream_state={},
    stream_slice=stream_slice,
    next_page_token=None,
    log_formatter=self.log_formatter,
    **self._get_request_options_for_fetch(stream_slice),
)

716-726: 404 detection via string search could match false positives, wdyt?

The condition "404" in str(e) might match error messages that contain "404" but aren't actually 404 responses (e.g., "Failed after 404 retries" or "Connection timeout after 404ms").

Consider making the detection more specific:

except Exception as e:
    # Check if this is a 404 (record not found)
    is_404 = False
    
    # Check response status code if available
    if hasattr(e, "response") and hasattr(e.response, "status_code"):
        is_404 = e.response.status_code == 404
    # Fallback to checking error message more carefully
    elif "404" in str(e):
        # Only match if 404 appears with typical HTTP context
        error_str = str(e).lower()
        is_404 = any(pattern in error_str for pattern in [
            "404 not found",
            "404 client error",
            "http error 404",
            "status code 404",
        ])
    
    if is_404:
        raise RecordNotFoundException(
            f"Record with primary key {pk_value} not found"
        ) from e
    raise

Or, if you're confident the current implementation works for your error handling, at least add a comment explaining the string check is intentional.


630-756: Consider adding logging for debugging and observability, wdyt?

The method performs a critical operation (fetching a single record by PK) but doesn't log anything. Adding debug-level logging would help troubleshoot issues in production.

Consider adding:

def fetch_one(
    self,
    pk_value: Union[str, Mapping[str, Any]],
    records_schema: Mapping[str, Any],
) -> Optional[Mapping[str, Any]]:
    # ... existing docstring ...
    
    LOGGER.debug(f"Fetching single record for stream '{self.name}' with pk_value: {pk_value}")
    
    # ... existing path construction ...
    
    LOGGER.debug(f"Constructed fetch path: {fetch_path}")
    
    try:
        response = self.requester.send_request(...)
    except Exception as e:
        LOGGER.debug(f"Error fetching record with pk_value {pk_value}: {e}")
        # ... existing exception handling ...
    
    # ... rest of method ...
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 052eb81 and bab2781.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/abstract_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/exceptions.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (4)
airbyte_cdk/sources/declarative/exceptions.py (1)
  • RecordNotFoundException (12-13)
airbyte_cdk/sources/declarative/requesters/requester.py (2)
  • get_path (62-71)
  • send_request (138-156)
airbyte_cdk/sources/types.py (3)
  • Record (21-72)
  • data (35-36)
  • associated_slice (39-40)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
  • name (119-127)
  • name (130-132)
  • fetch_one (630-756)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
airbyte_cdk/sources/declarative/exceptions.py (1)
  • RecordNotFoundException (12-13)
airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
  • get_path (195-208)
  • send_request (446-484)
  • _request_headers (304-325)
  • _request_params (327-356)
  • _request_body_data (358-382)
  • _request_body_json (384-407)
airbyte_cdk/sources/declarative/requesters/requester.py (2)
  • get_path (62-71)
  • send_request (138-156)
airbyte_cdk/sources/types.py (6)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • keys (137-138)
  • Record (21-72)
  • data (35-36)
airbyte_cdk/sources/abstract_source.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
  • fetch_record (205-229)
  • name (93-97)
  • name (100-102)
airbyte_cdk/sources/streams/core.py (2)
  • fetch_record (466-484)
  • name (139-143)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (12)
airbyte_cdk/sources/declarative/exceptions.py (1)

12-13: LGTM! Clean exception class design.

The new RecordNotFoundException follows Python exception conventions and inherits appropriately from ReadException. The docstring clearly indicates its purpose for 404-like scenarios.

unit_tests/sources/declarative/retrievers/test_simple_retriever.py (7)

27-27: LGTM! Import added for new exception.

The import is correctly placed and used throughout the test suite.


1650-1681: LGTM! Simple PK test covers path construction and response handling.

The test verifies both the returned record data and the constructed path (posts/123). Good coverage.


1722-1747: LGTM! 404 handling test verifies exception and message.

The test confirms that a 404 response raises RecordNotFoundException and includes the PK value in the error message.


1749-1774: LGTM! Non-404 error propagation test ensures errors aren't swallowed.

Good coverage of the differentiation between 404 (converted to RecordNotFoundException) and other errors (propagated as-is).


1776-1796: LGTM! Type validation test catches invalid PK types.

The test verifies that passing an integer (instead of string or dict) raises a ValueError with a clear message.


1798-1819: LGTM! None response handling test.

The test confirms that when send_request returns None, a RecordNotFoundException is raised appropriately.


1821-1847: LGTM! Empty records test handles the case where API returns 200 but no data.

This is a good edge case to cover—APIs that return success but empty results should be treated as "not found".

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)

29-29: LGTM! Import added for new exception.

The import is correctly placed and used in the fetch_one method.


630-669: LGTM! Excellent documentation with examples and future considerations.

The docstring is comprehensive, includes examples for both simple and composite PKs, and documents the convention-based approach with references to alternative implementations. This will help future maintainers understand the design decisions.


670-675: LGTM! Correct use of requester to get base path.

The method properly delegates to requester.get_path() with empty state/slice/token, which is appropriate for fetching a specific record.


733-756: LGTM! Record extraction logic is thorough and handles multiple types.

The code properly handles both Record objects and plain Mapping types, and raises clear exceptions when records are empty or have unexpected types. The use of dict() ensures a clean dict is returned.

- Use rstrip('/') on base_path and lstrip('/') on pk segments
- More robust than replace('//', '/') for handling edge cases
- Addresses Copilot review feedback

Co-Authored-By: AJ Steers <[email protected]>
@aaronsteers aaronsteers changed the title feat: Add fetch_one/fetch_record functionality to SimpleRetriever feat: Add Stream.fetch_record() and SimpleRetriever.fetch_one() Nov 12, 2025
@aaronsteers aaronsteers changed the title feat: Add Stream.fetch_record() and SimpleRetriever.fetch_one() feat: Add Stream.fetch_record() backed by SimpleRetriever.fetch_one() Nov 12, 2025
@devin-ai-integration
Copy link
Contributor

Thanks for the feedback! I've updated the path construction to use rstrip('/') and lstrip('/') instead of .replace("//", "/") for more robust handling of edge cases. This change was pushed in commit a2e8e55.

The new implementation:

if isinstance(pk_value, str):
    fetch_path = f"{base_path.rstrip('/')}/{str(pk_value).lstrip('/')}"
elif isinstance(pk_value, Mapping):
    sorted_values = [str(pk_value[key]).lstrip('/') for key in sorted(pk_value.keys())]
    pk_path_segment = "/".join(sorted_values)
    fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}"

All unit tests continue to pass with this change.

@aaronsteers aaronsteers changed the title feat: Add Stream.fetch_record() backed by SimpleRetriever.fetch_one() feat: Add Source.fetch_record() backed byStream.fetch_record() and SimpleRetriever.fetch_one() Nov 12, 2025
- Remove fetch_record from AbstractSource (legacy, not supported)
- Remove fetch_record from Stream base class (legacy, not supported)
- Remove fetch_record from DeclarativeStream (legacy, not supported)
- Add fetch_record to ConcurrentDeclarativeSource (primary implementation)
- Add fetch_record to AbstractStream as concrete method (raises NotImplementedError)
- Keep SimpleRetriever.fetch_one as underlying mechanism

This refactor ensures fetch_record is ONLY available for declarative
concurrent connectors (PyAirbyte use case), not legacy sources.

Co-Authored-By: AJ Steers <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

557-568: Consider validating primary key existence, wdyt?

The retriever type check is good, but should we also verify that the stream has a primary key defined before attempting the fetch? Without a PK, the fetch_one call might fail in unclear ways.

Something like:

if not stream_config.get("primary_key"):
    raise ValueError(
        f"Stream '{stream_name}' does not define a primary key, "
        "which is required for fetching individual records."
    )

This would provide clearer error messages upfront. What do you think?

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)

677-684: Clarify composite key ordering in docstring; URL encoding can remain optional.

The docstring already documents the behavior at line 662 ("in key order"), but it could be more explicit: would you consider rewording it to say "in alphabetically sorted key order" to match the sorted(pk_value.keys()) implementation? That removes any ambiguity for users.

On URL encoding—there's no existing pattern for encoding path parameters in the retriever/requester codebase, and no tests demonstrate issues with special characters. If pk_values are typically sourced from database records or structured data, this might not be a concern. If that changes or edge cases emerge, URL encoding can be added then. What are the typical sources of pk_values in practice, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bab2781 and c0719e2.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2 hunks)
  • airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
  • DeclarativeStream (2482-2549)
  • SimpleRetriever (2867-2925)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
  • SimpleRetriever (60-756)
  • fetch_one (630-756)
airbyte_cdk/sources/declarative/schema/default_schema_loader.py (1)
  • DefaultSchemaLoader (15-47)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • create_component (831-864)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/exceptions.py (1)
  • RecordNotFoundException (12-13)
airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
  • get_path (195-208)
  • send_request (446-484)
  • _request_headers (304-325)
  • _request_params (327-356)
  • _request_body_data (358-382)
  • _request_body_json (384-407)
airbyte_cdk/sources/types.py (6)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • keys (137-138)
  • Record (21-72)
  • data (35-36)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
  • fetch_record (510-585)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Analyze (python)
🔇 Additional comments (6)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)

94-113: LGTM! Clean interface addition.

The new fetch_record method provides a clear extension point for streams that support single-record fetching. The default NotImplementedError with an informative message is appropriate for a base class.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

540-555: Stream resolution looks good!

The linear search for the stream config is appropriate given the typical number of streams, and the error message helpfully lists available streams when lookup fails.


583-585: Clean delegation to retriever!

The final call properly passes through both the pk_value and the resolved schema to the retriever's fetch_one method.


570-581: Verify the type(schema_loader_config) pattern safety.

The use of type(schema_loader_config) on line 573 assumes the config is compatible with create_component's expected model type parameter. Can you confirm this pattern is safe across all possible schema loader configurations in manifests?

Run this to check how schema_loader is typically structured in manifests:

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)

686-715: Request construction looks solid!

The use of empty stream state and minimal StreamSlice is appropriate for fetching a single record. All necessary request options (headers, params, body) are properly included.


728-756: Response parsing and error handling are thorough!

The implementation properly handles various edge cases:

  • No response → RecordNotFoundException
  • Empty records list → RecordNotFoundException
  • Type checking with clear error messages
  • Consistent dict return type

The reuse of _parse_response is good for maintaining consistency with the streaming logic.

devin-ai-integration bot and others added 2 commits November 13, 2025 00:54
- Update SimpleRetriever.fetch_one docstring to reflect RecordNotFoundException behavior
- Move inline imports to top-level in ConcurrentDeclarativeSource
- Simplify schema building logic using declarative_stream.get_json_schema()
- Remove unused DefaultSchemaLoader import

Addresses feedback from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <[email protected]>
Remove manual 404 checking since send_request() already handles errors
internally. If send_request() returns without raising, we have valid data.
Errors are now propagated naturally from the requester's error handler.

Addresses feedback from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <[email protected]>

stream_slice = StreamSlice(partition={}, cursor_slice={})

response = self.requester.send_request(
Copy link
Contributor Author

@aaronsteers aaronsteers Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin, add a type annotation here so the reader knows what we're working with in the "response" variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I'll add a type annotation to make it explicit that response is requests.Response | None.

else:
raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}")

stream_slice = StreamSlice(partition={}, cursor_slice={})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin, do we actually need to pass a stream slice? If yes, that's okay. Just want to check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to pass a StreamSlice because the API methods (get_path, send_request, _request_headers, _request_params, etc.) all require it as a parameter.

For single-record fetch, we don't have partitioning, so we pass an empty slice with partition={} and cursor_slice={}. This is appropriate since templates and backoff strategies may reference the slice parameter. I'll add a brief inline comment to clarify this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. In that case, let's call it dummy_stream_slice or empty_stream_slice for clarity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I'll rename it to empty_stream_slice for clarity since it better describes that we're passing an empty slice (not a dummy/placeholder).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

511-534: Consider tightening the pk_value type annotation?

The docstring clarifies that pk_value can be either str or Mapping[str, Any], but the signature uses Any. Would it be clearer to use Union[str, Mapping[str, Any]] to match the documented behavior, wdyt?

     def fetch_record(
         self,
         stream_name: str,
-        pk_value: Any,
+        pk_value: Union[str, Mapping[str, Any]],
         config: Optional[Mapping[str, Any]] = None,
     ) -> Mapping[str, Any]:
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)

678-685: Path construction: Consider documenting composite key ordering assumptions?

The implementation sorts composite keys alphabetically (line 681), which is deterministic. However, some APIs might expect keys in a specific order (e.g., parent before child IDs).

Also curious about edge cases:

  • Empty string pk_value: "".lstrip('/') would still concatenate to base/
  • Composite keys with None/null values: str(None) becomes "None"
  • Nested values in composite keys: Only the top level is handled

These might be fine for the intended use cases, but worth documenting the assumptions? If APIs need custom ordering, they'd need a different approach (like the path template option mentioned in the docstring), wdyt?


722-745: Record extraction logic is solid, consider adding observability?

The implementation correctly handles both Record and Mapping types and extracts the data properly. Two suggestions:

  1. Multiple records: If the API unexpectedly returns multiple records, taking the first one silently (line 734) could mask API issues. Consider logging a warning if len(records) > 1? This would help catch configuration issues where the PK path returns a collection instead of a single item, wdyt?

  2. Success logging: Adding debug-level logging when a record is successfully fetched would help with troubleshooting. Something like LOGGER.debug(f"Successfully fetched record with PK {pk_value} from {fetch_path}")?

  3. Error message clarity: Line 740's "invalid record type" might be confusing—it's really about unexpected API behavior rather than "not found". Perhaps "Record with primary key {pk_value} returned unexpected type {type(first_record).name}" would be clearer?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0719e2 and 0a71543.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
  • SimpleRetriever (60-745)
  • fetch_one (630-745)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
  • fetch_record (94-113)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • create_component (831-864)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/exceptions.py (1)
  • RecordNotFoundException (12-13)
airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
  • get_path (195-208)
  • send_request (446-484)
  • _request_headers (304-325)
  • _request_params (327-356)
  • _request_body_data (358-382)
  • _request_body_json (384-407)
airbyte_cdk/sources/types.py (6)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • keys (137-138)
  • Record (21-72)
  • data (35-36)
🔇 Additional comments (8)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

80-80: LGTM: Import properly placed at module level.

Good to see this as a top-level import rather than inline.


535-550: Solid stream lookup with helpful error message.

The fallback to instance config and the clear ValueError with available stream names will make debugging much easier. Nice work!


552-567: Code already implements proper error handling and type safety; review concerns are addressed

After verification, your concerns appear to be either already addressed or handled implicitly by design:

  1. Stream construction (lines 545-550): Validation exists checking stream_config is found before create_component() call
  2. Schema retrieval: All get_json_schema() implementations return Mapping[str, Any] with no None return path—the type contract guarantees this
  3. PK validation: fetch_one() validates pk_value type at lines 678-685, raising ValueError for invalid types; tests cover this scenario

The existing pre-validation and type contracts mean these edge cases are already handled. The code demonstrates good defensive design with clear error messages where needed (e.g., SimpleRetriever type check at lines 559-563).

Wdyt—does this align with your analysis, or do you see additional edge cases the verification might have missed?

Likely an incorrect or invalid review comment.

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5)

29-29: LGTM: Exception import properly placed.


630-670: Excellent docstring with clear design rationale.

The comprehensive documentation including examples, the convention-based approach explanation, and future alternatives is very helpful. The type hints using Union[str, Mapping[str, Any]] instead of Any make the interface clearer too.


672-676: Base path retrieval looks good.

Using empty state/slice for the single-record fetch context makes sense. The requester's get_path() should handle any edge cases like None or empty paths.


687-715: Good reuse of existing request machinery.

Properly delegates to the requester's error handling and reuses the existing helper methods for headers/params/body. This maintains consistency with the rest of the retriever's behavior.


717-720: Response validation is clear and simple.

Good that this was simplified from the previous version. The requester's error handler takes care of HTTP status codes, so checking for a falsy response here is appropriate.

- Add type annotation to response variable: requests.Response | None
- Add comment explaining response can be None when error handler returns IGNORE
- Add comment explaining empty StreamSlice for single-record fetch
- Update docstring to clarify 404s propagate as exceptions (not RecordNotFoundException)
- RecordNotFoundException is only raised for empty/ignored responses or no parsed records

Addresses comments from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <[email protected]>
- Add type hints: records_iter: Iterable[Record] and first_record: Record | None
- Replace list(...) with next(iter(...), None) to avoid materializing entire iterable
- Remove unnecessary Mapping branch since _parse_response returns Iterable[Record]
- Simplify logic from 27 lines to 18 lines

This is more efficient (only fetches first record) and has clearer type hints.

Addresses feedback from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <[email protected]>
Comment on lines 678 to 685
if isinstance(pk_value, str):
fetch_path = f"{base_path.rstrip('/')}/{str(pk_value).lstrip('/')}"
elif isinstance(pk_value, Mapping):
sorted_values = [str(pk_value[key]).lstrip("/") for key in sorted(pk_value.keys())]
pk_path_segment = "/".join(sorted_values)
fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}"
else:
raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin, let's simplify for now. I don't think we are prepared to actually handle a dict here yet, so let's update the signature and implementation to only accept a string. This will simplify things a bit. We can re-add complexity later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! I'll simplify to only accept str for pk_value for now. This will remove the composite key handling logic and make the implementation cleaner. We can add dict support later when there's a concrete use case.

devin-ai-integration bot and others added 2 commits November 13, 2025 01:46
…alue

- Update SimpleRetriever.fetch_one signature: pk_value now str only (not Union[str, Mapping])
- Rename stream_slice to empty_stream_slice for clarity
- Remove composite key handling logic (isinstance checks, dict path construction)
- Update ConcurrentDeclarativeSource.fetch_record signature to accept str only
- Update AbstractStream.fetch_record signature to accept str only
- Remove test_fetch_one_composite_pk test (no longer supported)
- Update test_fetch_one_invalid_pk_type to reflect new behavior
- Simplify docstrings to remove composite key documentation

This simplification makes the implementation cleaner and easier to maintain.
Composite key support can be added later when there's a concrete use case.

Addresses feedback from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <[email protected]>
The _fetch_one method on SimpleRetriever is now prefixed with underscore
to indicate it's not part of the public API. External callers should use
Stream.fetch_record() or Source.fetch_record() instead.

Addresses feedback from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

563-565: Consider making _fetch_one part of the public API?

The method calls declarative_stream.retriever._fetch_one(), which is a private method (indicated by the leading underscore). While this works, calling private methods across module boundaries can be fragile if the implementation changes.

Since _fetch_one is intended to be called from ConcurrentDeclarativeSource.fetch_record, would it make sense to either:

  1. Make it a public method (rename to fetch_one), or
  2. Document it as part of the internal API contract between these classes?

This is more of a design preference, so wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 691927b and 8a52390.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
  • SimpleRetriever (60-724)
  • _fetch_one (630-724)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
  • fetch_record (94-111)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • create_component (831-864)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

80-80: LGTM on the import placement!

The SimpleRetriever import is properly placed at the module level and is used for the type check in fetch_record. This addresses the previous feedback about avoiding inline imports.


511-565: Solid implementation! Please verify end-to-end with a real connector.

The method correctly:

  • Resolves the stream from the manifest
  • Validates the stream uses SimpleRetriever
  • Delegates to the retriever's fetch logic
  • Provides clear error messages for missing streams and unsupported operations

However, as noted in the PR objectives, this hasn't been tested end-to-end with a real declarative connector. Given that the method reconstructs components from the manifest (lines 550-555), it's important to verify the path construction and component reconstruction work correctly in practice.

Could you test this with a real declarative connector to ensure:

  1. The path construction works correctly (e.g., /posts/123 for a posts stream)
  2. Component reconstruction from the manifest succeeds
  3. Error handling works for 404s, empty responses, and server errors
  4. The schema retrieval via get_json_schema() doesn't fail unexpectedly

wdyt?

@aaronsteers aaronsteers changed the title feat: Add Source.fetch_record() backed byStream.fetch_record() and SimpleRetriever.fetch_one() feat: Add Source.fetch_record() andStream.fetch_record() backed by SimpleRetriever._fetch_one() Nov 13, 2025
devin-ai-integration bot added a commit to airbytehq/PyAirbyte that referenced this pull request Nov 13, 2025
- Add LookupValue type alias for primary key values
- Add get_record() method to Source class (keyword-only pk_value arg)
- Add helper methods _get_stream_primary_key() and _normalize_and_validate_pk()
- Add fetch_record() method to DeclarativeExecutor
- Support both direct PK values and dict input with validation
- Dict validation ensures single entry and key matches stream's primary key
- Only supported for declarative (YAML-based) sources
- Add comprehensive unit tests (16 tests, all passing)

This builds on airbytehq/airbyte-python-cdk#846

Co-Authored-By: AJ Steers <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants