-
Notifications
You must be signed in to change notification settings - Fork 168
fix(experimental): implement requests_done method to signal end of requests in async streams. Gracefully close streams. #1700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -152,9 +152,16 @@ async def close(self) -> None: | |
| """Closes the bidi-gRPC connection.""" | ||
| if not self._is_stream_open: | ||
| raise ValueError("Stream is not open") | ||
| await self.requests_done() | ||
| await self.socket_like_rpc.close() | ||
| self._is_stream_open = False | ||
|
|
||
| async def requests_done(self): | ||
| """Signals that all requests have been sent.""" | ||
|
|
||
| await self.socket_like_rpc.send(None) | ||
| await self.socket_like_rpc.recv() | ||
|
Comment on lines
+159
to
+163
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to my comment on Additionally, this logic is duplicated across both stream classes. Refactoring this into the async def requests_done(self):
"""Signals that all requests have been sent."""
if not self._is_stream_open:
raise ValueError("Stream is not open")
await self.socket_like_rpc.send(None)
await self.socket_like_rpc.recv() |
||
|
|
||
| async def send( | ||
| self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest | ||
| ) -> None: | ||
|
|
@@ -186,3 +193,4 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse: | |
| @property | ||
| def is_stream_open(self) -> bool: | ||
| return self._is_stream_open | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -197,15 +197,41 @@ async def test_close(mock_client, mock_cls_async_bidi_rpc): | |
| read_obj_stream = await instantiate_read_obj_stream( | ||
| mock_client, mock_cls_async_bidi_rpc, open=True | ||
| ) | ||
| read_obj_stream.requests_done = AsyncMock() | ||
|
|
||
| # act | ||
| await read_obj_stream.close() | ||
|
|
||
| # assert | ||
| read_obj_stream.requests_done.assert_called_once() | ||
| read_obj_stream.socket_like_rpc.close.assert_called_once() | ||
| assert not read_obj_stream.is_stream_open | ||
|
|
||
|
|
||
| @mock.patch( | ||
| "google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc" | ||
| ) | ||
| @mock.patch( | ||
| "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" | ||
| ) | ||
| @pytest.mark.asyncio | ||
| async def test_requests_done(mock_client, mock_cls_async_bidi_rpc): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good test for the happy path. With the recommended addition of the |
||
| """Test that requests_done signals the end of requests.""" | ||
| # Arrange | ||
| read_obj_stream = await instantiate_read_obj_stream( | ||
| mock_client, mock_cls_async_bidi_rpc, open=True | ||
| ) | ||
| read_obj_stream.socket_like_rpc.send = AsyncMock() | ||
| read_obj_stream.socket_like_rpc.recv = AsyncMock() | ||
|
|
||
| # Act | ||
| await read_obj_stream.requests_done() | ||
|
|
||
| # Assert | ||
| read_obj_stream.socket_like_rpc.send.assert_called_once_with(None) | ||
| read_obj_stream.socket_like_rpc.recv.assert_called_once() | ||
|
|
||
|
|
||
| @mock.patch( | ||
| "google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc" | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -289,11 +289,13 @@ async def test_close(mock_cls_async_bidi_rpc, mock_client): | |
| write_obj_stream = await instantiate_write_obj_stream( | ||
| mock_client, mock_cls_async_bidi_rpc, open=True | ||
| ) | ||
| write_obj_stream.requests_done = AsyncMock() | ||
|
|
||
| # Act | ||
| await write_obj_stream.close() | ||
|
|
||
| # Assert | ||
| write_obj_stream.requests_done.assert_called_once() | ||
| write_obj_stream.socket_like_rpc.close.assert_called_once() | ||
| assert not write_obj_stream.is_stream_open | ||
|
|
||
|
|
@@ -394,3 +396,24 @@ async def test_recv_without_open_should_raise_error( | |
| # Act & Assert | ||
| with pytest.raises(ValueError, match="Stream is not open"): | ||
| await write_obj_stream.recv() | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| @mock.patch( | ||
| "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" | ||
| ) | ||
| async def test_requests_done(mock_cls_async_bidi_rpc, mock_client): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| """Test that requests_done signals the end of requests.""" | ||
| # Arrange | ||
| write_obj_stream = await instantiate_write_obj_stream( | ||
| mock_client, mock_cls_async_bidi_rpc, open=True | ||
| ) | ||
| write_obj_stream.socket_like_rpc.send = AsyncMock() | ||
| write_obj_stream.socket_like_rpc.recv = AsyncMock() | ||
|
|
||
| # Act | ||
| await write_obj_stream.requests_done() | ||
|
|
||
| # Assert | ||
| write_obj_stream.socket_like_rpc.send.assert_called_once_with(None) | ||
| write_obj_stream.socket_like_rpc.recv.assert_called_once() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new method has a couple of areas for improvement:
sendandrecv, a check should be added to ensure the stream is open before proceeding. This prevents errors if the method is called on a closed stream._AsyncWriteObjectStream. To improve maintainability, consider moving this shared logic to the_AsyncAbstractObjectStreambase class. This would likely involve moving_is_stream_openandsocket_like_rpcto the base class as well.Here is a suggested implementation that includes the guard clause: