-
Notifications
You must be signed in to change notification settings - Fork 30
feat: Add Source.fetch_record() andStream.fetch_record() backed by SimpleRetriever._fetch_one()
#846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Add Source.fetch_record() andStream.fetch_record() backed by SimpleRetriever._fetch_one()
#846
Conversation
Implements Phase 1 of GitHub issue #833 Changes: - Add fetch_one() method to SimpleRetriever for fetching single records by PK - Add fetch_record() base method to Stream class - Implement fetch_record() in DeclarativeStream to delegate to retriever - Add fetch_record() helper method to AbstractSource - Add comprehensive unit tests for fetch_one functionality The implementation uses convention-based path construction (appending PK value to base path) and supports both simple string PKs and composite dict PKs. Handles 404 responses gracefully by returning None. Co-Authored-By: AJ Steers <[email protected]>
Original prompt from AJ Steers |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1762976446-add-fetch-one-to-simple-retriever#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1762976446-add-fetch-one-to-simple-retrieverHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements Phase 1 of the "get_one" functionality to fetch individual records by primary key from declarative streams using SimpleRetriever. The implementation adds a convention-based approach where the primary key value is appended to the base path (e.g., /posts/123 for simple PKs, /companies/123/status for composite PKs sorted alphabetically).
Key changes:
- Added
fetch_one()method toSimpleRetrieverwith convention-based path construction - Added
fetch_record()methods toStreamandDeclarativeStreamclasses to expose the functionality - Added helper method in
AbstractSourcefor convenient access across streams
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| unit_tests/sources/declarative/retrievers/test_simple_retriever.py | Comprehensive test coverage for fetch_one() including simple PKs, composite PKs, 404 handling, error propagation, and edge cases |
| airbyte_cdk/sources/streams/core.py | Base fetch_record() method added to Stream class that raises NotImplementedError by default |
| airbyte_cdk/sources/declarative/retrievers/simple_retriever.py | Core fetch_one() implementation with convention-based path construction and error handling |
| airbyte_cdk/sources/abstract_source.py | Convenience method for fetching records by stream name |
| airbyte_cdk/legacy/sources/declarative/declarative_stream.py | DeclarativeStream implementation that delegates to SimpleRetriever's fetch_one() |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughAdds single-record fetch support: a Changes
Sequence DiagramsequenceDiagram
participant User
participant CDS as ConcurrentDeclarativeSource
participant SR as SimpleRetriever
participant HTTP as HTTP Client
User->>CDS: fetch_record(stream_name, pk_value)
activate CDS
CDS->>CDS: resolve stream, retriever & schema
CDS->>SR: _fetch_one(pk_value, schema)
deactivate CDS
activate SR
SR->>SR: build path = base_path + "/" + pk_value
SR->>HTTP: send request -> GET /{path}
activate HTTP
HTTP-->>SR: 200 / 404 / error
deactivate HTTP
alt 404 / no response / empty list
SR->>SR: raise RecordNotFoundException
else 200 Success
SR->>SR: parse response -> records list
SR->>SR: return first record (mapping)
else other error
SR->>SR: propagate error
end
deactivate SR
SR-->>CDS: record or exception
CDS-->>User: record or exception
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
205-227: Consider validating pk_value format against the stream's primary_key definition?The method accepts both string and dict pk_value but doesn't validate whether the format matches this stream's actual
self.primary_key. For instance, if a stream hasprimary_key = ["company_id", "property"]but a caller passes a simple string "123", this will proceed to callretriever.fetch_one()which might construct an incorrect path.Should we add a check here to ensure pk_value format aligns with the primary_key definition? For example:
- If
self.primary_keyis a string or single-element list, requirepk_valueto be a string- If
self.primary_keyis a multi-element list, requirepk_valueto be a dict with matching keysThis would catch mismatches early with a clearer error message. Wdyt?
Additionally, consider adding debug logging when fetching records to aid troubleshooting:
def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]: """...""" if not isinstance(self.retriever, SimpleRetriever): raise NotImplementedError(...) + self.logger.debug(f"Fetching record from stream {self.name} with pk_value: {pk_value}") return self.retriever.fetch_one(pk_value=pk_value, records_schema=self.get_json_schema())airbyte_cdk/sources/abstract_source.py (1)
328-357: Consider caching stream instances to avoid repeated initialization?The method calls
self.streams(config)on every invocation (line 348), which could be expensive if stream initialization involves API calls or heavy computation. Since this is a new "fetch by PK" use case that might be called frequently (e.g., in a loop fetching related records), this could become a performance bottleneck.Would it make sense to cache the stream instances, perhaps using an LRU cache keyed by a hash of the config? Something like:
@lru_cache(maxsize=8) def _get_stream_instances(self, config_hash: str) -> Dict[str, Stream]: # config_hash would be a frozen/hashable representation of config return {s.name: s for s in self.streams(config)}This would maintain the existing behavior for the first call but speed up subsequent calls. Wdyt?
Alternatively, if the current behavior is intentional (always get fresh stream instances), that's fine too - just wanted to flag the potential performance consideration for Phase 2 optimizations.
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
676-683: Path construction for composite PKs: Document the key ordering assumption?For composite primary keys (lines 679-681), the implementation sorts keys alphabetically and joins their values with "/". For example,
{"company_id": "123", "property": "status"}becomes the path segment"123/status"(because "company_id" < "property" alphabetically).This convention-based approach is pragmatic for Phase 1, and the inline comments (lines 659-667) appropriately acknowledge that future phases might need configurable path templates. However, I'm wondering if we should make the key ordering even more explicit in the docstring examples?
Currently the docstring shows:
record = retriever.fetch_one({"company_id": "123", "property": "status"}, schema)Could we add a note like: "Note: For composite PKs, values are ordered alphabetically by key name" to make this behavior crystal clear for API developers? Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py(1 hunks)airbyte_cdk/sources/abstract_source.py(1 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py(1 hunks)airbyte_cdk/sources/streams/core.py(1 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (3)
airbyte_cdk/sources/declarative/requesters/requester.py (2)
get_path(62-71)send_request(138-156)airbyte_cdk/sources/types.py (3)
Record(21-72)data(35-36)associated_slice(39-40)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
name(118-126)name(129-131)fetch_one(629-744)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
airbyte_cdk/sources/abstract_source.py (1)
fetch_record(328-357)airbyte_cdk/sources/streams/core.py (2)
fetch_record(466-484)name(139-143)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
SimpleRetriever(59-744)name(118-126)name(129-131)fetch_one(629-744)
airbyte_cdk/sources/abstract_source.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
fetch_record(205-226)name(93-97)name(100-102)airbyte_cdk/sources/streams/core.py (2)
fetch_record(466-484)name(139-143)
airbyte_cdk/sources/streams/core.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
fetch_record(205-226)name(93-97)name(100-102)airbyte_cdk/sources/abstract_source.py (1)
fetch_record(328-357)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
get_path(195-208)send_request(446-484)_request_headers(304-325)_request_params(327-356)_request_body_data(358-382)_request_body_json(384-407)airbyte_cdk/sources/declarative/requesters/requester.py (2)
get_path(62-71)send_request(138-156)airbyte_cdk/sources/types.py (6)
StreamSlice(75-169)partition(99-104)cursor_slice(107-112)keys(137-138)Record(21-72)data(35-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (python)
🔇 Additional comments (4)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)
1649-1843: Excellent test coverage!The seven test cases provide comprehensive coverage of the
fetch_one()functionality:
- Simple and composite primary keys with proper path construction validation
- 404 handling (returns None as expected)
- Non-404 error propagation
- Type validation for pk_value
- Edge cases (None response, empty records)
The tests clearly document the expected behavior and will catch regressions. Nice work!
airbyte_cdk/sources/streams/core.py (1)
466-484: LGTM - Clean base implementation!The default
fetch_record()implementation in the Stream base class is well-designed:
- Clear NotImplementedError with an informative message indicating which stream types support this operation
- Comprehensive docstring with examples for both simple and composite primary keys
- Appropriate signature accepting Any for pk_value to accommodate different PK formats
This establishes a good foundation for the new functionality.
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
715-720: Verify 404 detection robustness across different exception types?The 404 detection logic (lines 717-719) checks
hasattr(e, "response")andhasattr(e.response, "status_code")before accessinge.response.status_code. This works well for requests library exceptions, but could potentially miss 404s from other HTTP client implementations or custom error handlers.Have you verified this works correctly with the requester's error handling? Specifically:
- Does
self.requester.send_request()always wrap HTTP errors in exceptions with a.responseattribute?- Are there any custom error handlers in the declarative framework that might transform exceptions?
If we're confident the requester always uses requests library exceptions (or compatible ones), this is fine. Otherwise, we might want to be more defensive:
except Exception as e: # Check if this is a 404 (record not found) - return None status_code = None if hasattr(e, "response"): status_code = getattr(e.response, "status_code", None) elif hasattr(e, "status_code"): status_code = e.status_code if status_code == 404: return None raiseWdyt?
629-744: Overall: Solid Phase 1 implementation with clear documentation!The
fetch_one()implementation is well-structured and includes:
- ✅ Clear path construction logic for simple and composite PKs
- ✅ Appropriate 404 handling (returns None) vs. error propagation
- ✅ Reuse of existing request options and response parsing machinery
- ✅ Good type checking with ValueError for invalid pk_value types
- ✅ Comprehensive docstring with examples and future considerations
- ✅ Inline comments documenting alternative approaches for future phases
The convention-based approach (appending PK values to base path) is reasonable for Phase 1, and you've appropriately documented the limitations and future enhancement possibilities.
One additional thought: Since this is Phase 1 and you mention testing against real declarative connectors in the PR description, it would be valuable to document any connectors you've tested this with (either in the PR description or as a comment). This helps future developers understand which path construction patterns have been validated in practice.
PyTest Results (Fast)3 818 tests +7 3 806 ✅ +6 6m 28s ⏱️ -44s Results for commit a2e8e55. ± Comparison against base commit f0443aa. This pull request skips 1 test.♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
205-208: Type hint could be more precise and consistent, wdyt?The type hint
Any | Mapping[str, Any]is redundant sinceAnyalready encompassesMapping[str, Any]. Additionally:
- Your docstring indicates pk_value should be
strfor simple PKs (line 214), but the type hint saysAnySimpleRetriever.fetch_oneexpectsUnion[str, Mapping[str, Any]], which more accurately reflects the actual constraint- The rest of this file uses
Unionsyntax (see line 50), not the|operatorConsider aligning the type hint with both the documentation and
fetch_one's signature:def fetch_record( self, - pk_value: Any | Mapping[str, Any], + pk_value: Union[str, Mapping[str, Any]], ) -> Optional[Mapping[str, Any]]:This would match the docstring, improve type safety, and maintain consistency with the rest of the file.
Note: The same pattern appears in
AbstractSource.fetch_record. If you update here, consider updating there as well for consistency across the codebase.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py(1 hunks)airbyte_cdk/sources/abstract_source.py(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/abstract_source.py
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
airbyte_cdk/sources/abstract_source.py (1)
fetch_record(328-360)airbyte_cdk/sources/streams/core.py (3)
fetch_record(466-484)name(139-143)get_json_schema(296-304)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
SimpleRetriever(59-744)name(118-126)name(129-131)fetch_one(629-744)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
223-229: Implementation looks solid!The runtime type check appropriately guards the delegation, the error message is clear and consistent with the base class, and passing
self.get_json_schema()tofetch_oneensures the retriever has the schema context it needs for parsing.
…g None for 404s Co-Authored-By: AJ Steers <[email protected]>
Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
677-684: Path construction should normalize slashes more robustly, wdyt?The current implementation uses
.replace("//", "/")to handle double slashes, but this approach has limitations as noted in past review comments:
- Doesn't handle triple slashes or more:
"///"becomes"//"- Fails if
base_pathhas trailing slash ANDpk_valuestarts with slash- Example:
"posts/"+"/123"→"posts//123"→"posts/123"✓ (works)- Example:
"posts//"+"//123"→"posts////123"→"posts///123"✗ (broken)As suggested in previous reviews, normalizing with
rstrip('/')is more reliable:if isinstance(pk_value, str): - fetch_path = f"{base_path}/{pk_value}".replace("//", "/") + fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}" elif isinstance(pk_value, Mapping): sorted_values = [str(pk_value[key]) for key in sorted(pk_value.keys())] pk_path_segment = "/".join(sorted_values) - fetch_path = f"{base_path}/{pk_path_segment}".replace("//", "/") + fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}"This ensures exactly one slash between segments regardless of how base_path and pk_value are formatted.
🧹 Nitpick comments (6)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)
1683-1720: Consider verifying alphabetical key ordering explicitly, wdyt?The test uses a composite PK with
{"company_id": "123", "property": "status"}and expects pathcompanies/123/status. While this works because "company_id" < "property" alphabetically, it would be clearer to test the sorting behavior explicitly.Perhaps add an assertion or comment that makes the alphabetical ordering expectation explicit:
# Test verifies alphabetical key ordering: company_id comes before property result = retriever.fetch_one({"company_id": "123", "property": "status"}, records_schema={})Or test with keys intentionally out of order to prove sorting works:
# Keys intentionally reversed to verify sorting result = retriever.fetch_one({"property": "status", "company_id": "123"}, records_schema={}) # Should still produce: companies/123/status (sorted order)airbyte_cdk/sources/abstract_source.py (1)
328-360: Consider the performance implications of callingself.streams(config)on each fetch, wdyt?The implementation looks correct and the error handling is good. However, calling
self.streams(config)on everyfetch_recordinvocation could be expensive if stream initialization involves API calls or heavy computation.A couple of options to consider:
If this method is called frequently in a loop, you might want to cache the stream instances or allow the caller to pass them:
def fetch_record( self, stream_name: str, pk_value: Any | Mapping[str, Any], config: Mapping[str, Any], stream_instances: Optional[Dict[str, Stream]] = None, ) -> Optional[Mapping[str, Any]]: if stream_instances is None: stream_instances = {s.name: s for s in self.streams(config)} # ... rest of logicAdd debug logging for observability when fetching individual records:
logger.debug(f"Fetching record from stream '{stream_name}' with pk_value: {pk_value}")That said, if
fetch_recordis used infrequently (e.g., for one-off lookups), the current implementation is fine as-is.airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
683-684: Consider validating that pk_value is not empty, wdyt?The code raises a clear
ValueErrorfor invalid types, but doesn't check for empty strings or empty dicts. This could lead to malformed paths like"posts/"or unexpected behavior.You might want to add validation:
if isinstance(pk_value, str): if not pk_value.strip(): raise ValueError("pk_value cannot be an empty string") fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}" elif isinstance(pk_value, Mapping): if not pk_value: raise ValueError("pk_value cannot be an empty dict") sorted_values = [str(pk_value[key]) for key in sorted(pk_value.keys())] pk_path_segment = "/".join(sorted_values) fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}"
688-715: Consider extracting request options into a helper to reduce repetition, wdyt?Lines 694-713 repeat the same pattern four times: calling
self._request_*with identical arguments. While not incorrect, this repetition could be reduced for maintainability.You could extract this into a helper:
def _get_request_options_for_fetch( self, stream_slice: StreamSlice ) -> dict: """Get all request options for a fetch_one operation.""" return { "request_headers": self._request_headers( stream_state={}, stream_slice=stream_slice, next_page_token=None, ), "request_params": self._request_params( stream_state={}, stream_slice=stream_slice, next_page_token=None, ), "request_body_data": self._request_body_data( stream_state={}, stream_slice=stream_slice, next_page_token=None, ), "request_body_json": self._request_body_json( stream_state={}, stream_slice=stream_slice, next_page_token=None, ), }Then use it:
response = self.requester.send_request( path=fetch_path, stream_state={}, stream_slice=stream_slice, next_page_token=None, log_formatter=self.log_formatter, **self._get_request_options_for_fetch(stream_slice), )
716-726: 404 detection via string search could match false positives, wdyt?The condition
"404" in str(e)might match error messages that contain "404" but aren't actually 404 responses (e.g., "Failed after 404 retries" or "Connection timeout after 404ms").Consider making the detection more specific:
except Exception as e: # Check if this is a 404 (record not found) is_404 = False # Check response status code if available if hasattr(e, "response") and hasattr(e.response, "status_code"): is_404 = e.response.status_code == 404 # Fallback to checking error message more carefully elif "404" in str(e): # Only match if 404 appears with typical HTTP context error_str = str(e).lower() is_404 = any(pattern in error_str for pattern in [ "404 not found", "404 client error", "http error 404", "status code 404", ]) if is_404: raise RecordNotFoundException( f"Record with primary key {pk_value} not found" ) from e raiseOr, if you're confident the current implementation works for your error handling, at least add a comment explaining the string check is intentional.
630-756: Consider adding logging for debugging and observability, wdyt?The method performs a critical operation (fetching a single record by PK) but doesn't log anything. Adding debug-level logging would help troubleshoot issues in production.
Consider adding:
def fetch_one( self, pk_value: Union[str, Mapping[str, Any]], records_schema: Mapping[str, Any], ) -> Optional[Mapping[str, Any]]: # ... existing docstring ... LOGGER.debug(f"Fetching single record for stream '{self.name}' with pk_value: {pk_value}") # ... existing path construction ... LOGGER.debug(f"Constructed fetch path: {fetch_path}") try: response = self.requester.send_request(...) except Exception as e: LOGGER.debug(f"Error fetching record with pk_value {pk_value}: {e}") # ... existing exception handling ... # ... rest of method ...
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/abstract_source.py(1 hunks)airbyte_cdk/sources/declarative/exceptions.py(1 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py(2 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (4)
airbyte_cdk/sources/declarative/exceptions.py (1)
RecordNotFoundException(12-13)airbyte_cdk/sources/declarative/requesters/requester.py (2)
get_path(62-71)send_request(138-156)airbyte_cdk/sources/types.py (3)
Record(21-72)data(35-36)associated_slice(39-40)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
name(119-127)name(130-132)fetch_one(630-756)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
airbyte_cdk/sources/declarative/exceptions.py (1)
RecordNotFoundException(12-13)airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
get_path(195-208)send_request(446-484)_request_headers(304-325)_request_params(327-356)_request_body_data(358-382)_request_body_json(384-407)airbyte_cdk/sources/declarative/requesters/requester.py (2)
get_path(62-71)send_request(138-156)airbyte_cdk/sources/types.py (6)
StreamSlice(75-169)partition(99-104)cursor_slice(107-112)keys(137-138)Record(21-72)data(35-36)
airbyte_cdk/sources/abstract_source.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
fetch_record(205-229)name(93-97)name(100-102)airbyte_cdk/sources/streams/core.py (2)
fetch_record(466-484)name(139-143)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (Fast)
🔇 Additional comments (12)
airbyte_cdk/sources/declarative/exceptions.py (1)
12-13: LGTM! Clean exception class design.The new
RecordNotFoundExceptionfollows Python exception conventions and inherits appropriately fromReadException. The docstring clearly indicates its purpose for 404-like scenarios.unit_tests/sources/declarative/retrievers/test_simple_retriever.py (7)
27-27: LGTM! Import added for new exception.The import is correctly placed and used throughout the test suite.
1650-1681: LGTM! Simple PK test covers path construction and response handling.The test verifies both the returned record data and the constructed path (
posts/123). Good coverage.
1722-1747: LGTM! 404 handling test verifies exception and message.The test confirms that a 404 response raises
RecordNotFoundExceptionand includes the PK value in the error message.
1749-1774: LGTM! Non-404 error propagation test ensures errors aren't swallowed.Good coverage of the differentiation between 404 (converted to
RecordNotFoundException) and other errors (propagated as-is).
1776-1796: LGTM! Type validation test catches invalid PK types.The test verifies that passing an integer (instead of string or dict) raises a
ValueErrorwith a clear message.
1798-1819: LGTM! None response handling test.The test confirms that when
send_requestreturnsNone, aRecordNotFoundExceptionis raised appropriately.
1821-1847: LGTM! Empty records test handles the case where API returns 200 but no data.This is a good edge case to cover—APIs that return success but empty results should be treated as "not found".
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
29-29: LGTM! Import added for new exception.The import is correctly placed and used in the
fetch_onemethod.
630-669: LGTM! Excellent documentation with examples and future considerations.The docstring is comprehensive, includes examples for both simple and composite PKs, and documents the convention-based approach with references to alternative implementations. This will help future maintainers understand the design decisions.
670-675: LGTM! Correct use of requester to get base path.The method properly delegates to
requester.get_path()with empty state/slice/token, which is appropriate for fetching a specific record.
733-756: LGTM! Record extraction logic is thorough and handles multiple types.The code properly handles both
Recordobjects and plainMappingtypes, and raises clear exceptions when records are empty or have unexpected types. The use ofdict()ensures a clean dict is returned.
- Use rstrip('/') on base_path and lstrip('/') on pk segments
- More robust than replace('//', '/') for handling edge cases
- Addresses Copilot review feedback
Co-Authored-By: AJ Steers <[email protected]>
Stream.fetch_record() and SimpleRetriever.fetch_one()
Stream.fetch_record() and SimpleRetriever.fetch_one()Stream.fetch_record() backed by SimpleRetriever.fetch_one()
|
Thanks for the feedback! I've updated the path construction to use The new implementation: if isinstance(pk_value, str):
fetch_path = f"{base_path.rstrip('/')}/{str(pk_value).lstrip('/')}"
elif isinstance(pk_value, Mapping):
sorted_values = [str(pk_value[key]).lstrip('/') for key in sorted(pk_value.keys())]
pk_path_segment = "/".join(sorted_values)
fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}"All unit tests continue to pass with this change. |
Stream.fetch_record() backed by SimpleRetriever.fetch_one()Source.fetch_record() backed byStream.fetch_record() and SimpleRetriever.fetch_one()
- Remove fetch_record from AbstractSource (legacy, not supported) - Remove fetch_record from Stream base class (legacy, not supported) - Remove fetch_record from DeclarativeStream (legacy, not supported) - Add fetch_record to ConcurrentDeclarativeSource (primary implementation) - Add fetch_record to AbstractStream as concrete method (raises NotImplementedError) - Keep SimpleRetriever.fetch_one as underlying mechanism This refactor ensures fetch_record is ONLY available for declarative concurrent connectors (PyAirbyte use case), not legacy sources. Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
557-568: Consider validating primary key existence, wdyt?The retriever type check is good, but should we also verify that the stream has a primary key defined before attempting the fetch? Without a PK, the
fetch_onecall might fail in unclear ways.Something like:
if not stream_config.get("primary_key"): raise ValueError( f"Stream '{stream_name}' does not define a primary key, " "which is required for fetching individual records." )This would provide clearer error messages upfront. What do you think?
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
677-684: Clarify composite key ordering in docstring; URL encoding can remain optional.The docstring already documents the behavior at line 662 ("in key order"), but it could be more explicit: would you consider rewording it to say "in alphabetically sorted key order" to match the
sorted(pk_value.keys())implementation? That removes any ambiguity for users.On URL encoding—there's no existing pattern for encoding path parameters in the retriever/requester codebase, and no tests demonstrate issues with special characters. If pk_values are typically sourced from database records or structured data, this might not be a concern. If that changes or edge cases emerge, URL encoding can be added then. What are the typical sources of pk_values in practice, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py(1 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py(2 hunks)airbyte_cdk/sources/streams/concurrent/abstract_stream.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
DeclarativeStream(2482-2549)SimpleRetriever(2867-2925)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
SimpleRetriever(60-756)fetch_one(630-756)airbyte_cdk/sources/declarative/schema/default_schema_loader.py (1)
DefaultSchemaLoader(15-47)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
create_component(831-864)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/exceptions.py (1)
RecordNotFoundException(12-13)airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
get_path(195-208)send_request(446-484)_request_headers(304-325)_request_params(327-356)_request_body_data(358-382)_request_body_json(384-407)airbyte_cdk/sources/types.py (6)
StreamSlice(75-169)partition(99-104)cursor_slice(107-112)keys(137-138)Record(21-72)data(35-36)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
fetch_record(510-585)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (python)
🔇 Additional comments (6)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
94-113: LGTM! Clean interface addition.The new
fetch_recordmethod provides a clear extension point for streams that support single-record fetching. The default NotImplementedError with an informative message is appropriate for a base class.airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
540-555: Stream resolution looks good!The linear search for the stream config is appropriate given the typical number of streams, and the error message helpfully lists available streams when lookup fails.
583-585: Clean delegation to retriever!The final call properly passes through both the pk_value and the resolved schema to the retriever's fetch_one method.
570-581: Verify thetype(schema_loader_config)pattern safety.The use of
type(schema_loader_config)on line 573 assumes the config is compatible withcreate_component's expected model type parameter. Can you confirm this pattern is safe across all possible schema loader configurations in manifests?Run this to check how schema_loader is typically structured in manifests:
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
686-715: Request construction looks solid!The use of empty stream state and minimal StreamSlice is appropriate for fetching a single record. All necessary request options (headers, params, body) are properly included.
728-756: Response parsing and error handling are thorough!The implementation properly handles various edge cases:
- No response → RecordNotFoundException
- Empty records list → RecordNotFoundException
- Type checking with clear error messages
- Consistent dict return type
The reuse of
_parse_responseis good for maintaining consistency with the streaming logic.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Outdated
Show resolved
Hide resolved
- Update SimpleRetriever.fetch_one docstring to reflect RecordNotFoundException behavior - Move inline imports to top-level in ConcurrentDeclarativeSource - Simplify schema building logic using declarative_stream.get_json_schema() - Remove unused DefaultSchemaLoader import Addresses feedback from @aaronsteers on PR #846 Co-Authored-By: AJ Steers <[email protected]>
Remove manual 404 checking since send_request() already handles errors internally. If send_request() returns without raising, we have valid data. Errors are now propagated naturally from the requester's error handler. Addresses feedback from @aaronsteers on PR #846 Co-Authored-By: AJ Steers <[email protected]>
|
|
||
| stream_slice = StreamSlice(partition={}, cursor_slice={}) | ||
|
|
||
| response = self.requester.send_request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Devin, add a type annotation here so the reader knows what we're working with in the "response" variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I'll add a type annotation to make it explicit that response is requests.Response | None.
| else: | ||
| raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}") | ||
|
|
||
| stream_slice = StreamSlice(partition={}, cursor_slice={}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Devin, do we actually need to pass a stream slice? If yes, that's okay. Just want to check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need to pass a StreamSlice because the API methods (get_path, send_request, _request_headers, _request_params, etc.) all require it as a parameter.
For single-record fetch, we don't have partitioning, so we pass an empty slice with partition={} and cursor_slice={}. This is appropriate since templates and backoff strategies may reference the slice parameter. I'll add a brief inline comment to clarify this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. In that case, let's call it dummy_stream_slice or empty_stream_slice for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! I'll rename it to empty_stream_slice for clarity since it better describes that we're passing an empty slice (not a dummy/placeholder).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
511-534: Consider tightening the pk_value type annotation?The docstring clarifies that
pk_valuecan be eitherstrorMapping[str, Any], but the signature usesAny. Would it be clearer to useUnion[str, Mapping[str, Any]]to match the documented behavior, wdyt?def fetch_record( self, stream_name: str, - pk_value: Any, + pk_value: Union[str, Mapping[str, Any]], config: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]:airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
678-685: Path construction: Consider documenting composite key ordering assumptions?The implementation sorts composite keys alphabetically (line 681), which is deterministic. However, some APIs might expect keys in a specific order (e.g., parent before child IDs).
Also curious about edge cases:
- Empty string pk_value:
"".lstrip('/')would still concatenate tobase/- Composite keys with None/null values:
str(None)becomes"None"- Nested values in composite keys: Only the top level is handled
These might be fine for the intended use cases, but worth documenting the assumptions? If APIs need custom ordering, they'd need a different approach (like the path template option mentioned in the docstring), wdyt?
722-745: Record extraction logic is solid, consider adding observability?The implementation correctly handles both
RecordandMappingtypes and extracts the data properly. Two suggestions:
Multiple records: If the API unexpectedly returns multiple records, taking the first one silently (line 734) could mask API issues. Consider logging a warning if
len(records) > 1? This would help catch configuration issues where the PK path returns a collection instead of a single item, wdyt?Success logging: Adding debug-level logging when a record is successfully fetched would help with troubleshooting. Something like
LOGGER.debug(f"Successfully fetched record with PK {pk_value} from {fetch_path}")?Error message clarity: Line 740's "invalid record type" might be confusing—it's really about unexpected API behavior rather than "not found". Perhaps "Record with primary key {pk_value} returned unexpected type {type(first_record).name}" would be clearer?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py(2 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py(2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
SimpleRetriever(60-745)fetch_one(630-745)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
fetch_record(94-113)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
create_component(831-864)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/exceptions.py (1)
RecordNotFoundException(12-13)airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
get_path(195-208)send_request(446-484)_request_headers(304-325)_request_params(327-356)_request_body_data(358-382)_request_body_json(384-407)airbyte_cdk/sources/types.py (6)
StreamSlice(75-169)partition(99-104)cursor_slice(107-112)keys(137-138)Record(21-72)data(35-36)
🔇 Additional comments (8)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
80-80: LGTM: Import properly placed at module level.Good to see this as a top-level import rather than inline.
535-550: Solid stream lookup with helpful error message.The fallback to instance config and the clear ValueError with available stream names will make debugging much easier. Nice work!
552-567: Code already implements proper error handling and type safety; review concerns are addressedAfter verification, your concerns appear to be either already addressed or handled implicitly by design:
- Stream construction (lines 545-550): Validation exists checking
stream_configis found beforecreate_component()call- Schema retrieval: All
get_json_schema()implementations returnMapping[str, Any]with noNonereturn path—the type contract guarantees this- PK validation:
fetch_one()validatespk_valuetype at lines 678-685, raisingValueErrorfor invalid types; tests cover this scenarioThe existing pre-validation and type contracts mean these edge cases are already handled. The code demonstrates good defensive design with clear error messages where needed (e.g., SimpleRetriever type check at lines 559-563).
Wdyt—does this align with your analysis, or do you see additional edge cases the verification might have missed?
Likely an incorrect or invalid review comment.
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5)
29-29: LGTM: Exception import properly placed.
630-670: Excellent docstring with clear design rationale.The comprehensive documentation including examples, the convention-based approach explanation, and future alternatives is very helpful. The type hints using
Union[str, Mapping[str, Any]]instead ofAnymake the interface clearer too.
672-676: Base path retrieval looks good.Using empty state/slice for the single-record fetch context makes sense. The requester's
get_path()should handle any edge cases like None or empty paths.
687-715: Good reuse of existing request machinery.Properly delegates to the requester's error handling and reuses the existing helper methods for headers/params/body. This maintains consistency with the rest of the retriever's behavior.
717-720: Response validation is clear and simple.Good that this was simplified from the previous version. The requester's error handler takes care of HTTP status codes, so checking for a falsy response here is appropriate.
- Add type annotation to response variable: requests.Response | None - Add comment explaining response can be None when error handler returns IGNORE - Add comment explaining empty StreamSlice for single-record fetch - Update docstring to clarify 404s propagate as exceptions (not RecordNotFoundException) - RecordNotFoundException is only raised for empty/ignored responses or no parsed records Addresses comments from @aaronsteers on PR #846 Co-Authored-By: AJ Steers <[email protected]>
- Add type hints: records_iter: Iterable[Record] and first_record: Record | None - Replace list(...) with next(iter(...), None) to avoid materializing entire iterable - Remove unnecessary Mapping branch since _parse_response returns Iterable[Record] - Simplify logic from 27 lines to 18 lines This is more efficient (only fetches first record) and has clearer type hints. Addresses feedback from @aaronsteers on PR #846 Co-Authored-By: AJ Steers <[email protected]>
| if isinstance(pk_value, str): | ||
| fetch_path = f"{base_path.rstrip('/')}/{str(pk_value).lstrip('/')}" | ||
| elif isinstance(pk_value, Mapping): | ||
| sorted_values = [str(pk_value[key]).lstrip("/") for key in sorted(pk_value.keys())] | ||
| pk_path_segment = "/".join(sorted_values) | ||
| fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}" | ||
| else: | ||
| raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Devin, let's simplify for now. I don't think we are prepared to actually handle a dict here yet, so let's update the signature and implementation to only accept a string. This will simplify things a bit. We can re-add complexity later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call! I'll simplify to only accept str for pk_value for now. This will remove the composite key handling logic and make the implementation cleaner. We can add dict support later when there's a concrete use case.
…alue - Update SimpleRetriever.fetch_one signature: pk_value now str only (not Union[str, Mapping]) - Rename stream_slice to empty_stream_slice for clarity - Remove composite key handling logic (isinstance checks, dict path construction) - Update ConcurrentDeclarativeSource.fetch_record signature to accept str only - Update AbstractStream.fetch_record signature to accept str only - Remove test_fetch_one_composite_pk test (no longer supported) - Update test_fetch_one_invalid_pk_type to reflect new behavior - Simplify docstrings to remove composite key documentation This simplification makes the implementation cleaner and easier to maintain. Composite key support can be added later when there's a concrete use case. Addresses feedback from @aaronsteers on PR #846 Co-Authored-By: AJ Steers <[email protected]>
The _fetch_one method on SimpleRetriever is now prefixed with underscore to indicate it's not part of the public API. External callers should use Stream.fetch_record() or Source.fetch_record() instead. Addresses feedback from @aaronsteers on PR #846 Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
563-565: Consider making_fetch_onepart of the public API?The method calls
declarative_stream.retriever._fetch_one(), which is a private method (indicated by the leading underscore). While this works, calling private methods across module boundaries can be fragile if the implementation changes.Since
_fetch_oneis intended to be called fromConcurrentDeclarativeSource.fetch_record, would it make sense to either:
- Make it a public method (rename to
fetch_one), or- Document it as part of the internal API contract between these classes?
This is more of a design preference, so wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py(2 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py(2 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
- unit_tests/sources/declarative/retrievers/test_simple_retriever.py
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
SimpleRetriever(60-724)_fetch_one(630-724)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
fetch_record(94-111)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
create_component(831-864)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
80-80: LGTM on the import placement!The
SimpleRetrieverimport is properly placed at the module level and is used for the type check infetch_record. This addresses the previous feedback about avoiding inline imports.
511-565: Solid implementation! Please verify end-to-end with a real connector.The method correctly:
- Resolves the stream from the manifest
- Validates the stream uses
SimpleRetriever- Delegates to the retriever's fetch logic
- Provides clear error messages for missing streams and unsupported operations
However, as noted in the PR objectives, this hasn't been tested end-to-end with a real declarative connector. Given that the method reconstructs components from the manifest (lines 550-555), it's important to verify the path construction and component reconstruction work correctly in practice.
Could you test this with a real declarative connector to ensure:
- The path construction works correctly (e.g.,
/posts/123for a posts stream)- Component reconstruction from the manifest succeeds
- Error handling works for 404s, empty responses, and server errors
- The schema retrieval via
get_json_schema()doesn't fail unexpectedlywdyt?
Source.fetch_record() backed byStream.fetch_record() and SimpleRetriever.fetch_one()Source.fetch_record() andStream.fetch_record() backed by SimpleRetriever._fetch_one()
- Add LookupValue type alias for primary key values - Add get_record() method to Source class (keyword-only pk_value arg) - Add helper methods _get_stream_primary_key() and _normalize_and_validate_pk() - Add fetch_record() method to DeclarativeExecutor - Support both direct PK values and dict input with validation - Dict validation ensures single entry and key matches stream's primary key - Only supported for declarative (YAML-based) sources - Add comprehensive unit tests (16 tests, all passing) This builds on airbytehq/airbyte-python-cdk#846 Co-Authored-By: AJ Steers <[email protected]>
Summary
Adds the ability to fetch individual records by primary key from declarative concurrent connectors. This implements Phase 1a of #833 for PyAirbyte's use case with
ConcurrentDeclarativeSource.What's being added to main:
ConcurrentDeclarativeSource.fetch_record(stream_name, pk_value, config)- Primary API for fetching single recordsSimpleRetriever.fetch_one(pk_value, records_schema)- Underlying HTTP GET mechanismAbstractStream.fetch_record(pk_value)- Interface method (raises NotImplementedError by default)RecordNotFoundException- Exception raised when record is not foundKey implementation details:
"123")/{pk_value}to base pathRecordNotFoundExceptionfor empty/ignored responsesSimpleRetriever(raises NotImplementedError for others)Review & Testing Checklist for Human
Test with a real declarative connector - The
ConcurrentDeclarativeSource.fetch_record()method reconstructs components from the manifest but has never been tested with an actual connector. Test with a simple declarative connector (e.g., JSONPlaceholder, PokeAPI) to verify it works.Verify component reconstruction - The logic that creates
declarative_streamfromstream_config(lines 552-557 in concurrent_declarative_source.py) has never been exercised. Verify it correctly handles:Test error handling - Verify that:
Verify path construction - Test that the simple
base_path + "/" + pk_valuepattern works for your target use cases. This may not work for:?id=123)/companies/{id}/properties/{prop})Recommended Test Plan
Notes
Requested by: AJ Steers ([email protected], @aaronsteers)
Devin Session: https://app.devin.ai/sessions/e0d627ac7e89447a9ebddd515104e2c3
Summary by CodeRabbit
New Features
Tests