Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,16 @@ async def close(self) -> None:
"""Closes the bidi-gRPC connection."""
if not self._is_stream_open:
raise ValueError("Stream is not open")
await self.requests_done()
await self.socket_like_rpc.close()
self._is_stream_open = False

async def requests_done(self):
"""Signals that all requests have been sent."""

await self.socket_like_rpc.send(None)
await self.socket_like_rpc.recv()
Comment on lines +158 to +162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This new method has a couple of areas for improvement:

  1. Missing guard clause: For consistency with other methods like send and recv, a check should be added to ensure the stream is open before proceeding. This prevents errors if the method is called on a closed stream.
  2. Code duplication: This method is identical to the one in _AsyncWriteObjectStream. To improve maintainability, consider moving this shared logic to the _AsyncAbstractObjectStream base class. This would likely involve moving _is_stream_open and socket_like_rpc to the base class as well.

Here is a suggested implementation that includes the guard clause:

    async def requests_done(self):
        """Signals that all requests have been sent."""
        if not self._is_stream_open:
            raise ValueError("Stream is not open")

        await self.socket_like_rpc.send(None)
        await self.socket_like_rpc.recv()


async def send(
self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,16 @@ async def close(self) -> None:
"""Closes the bidi-gRPC connection."""
if not self._is_stream_open:
raise ValueError("Stream is not open")
await self.requests_done()
await self.socket_like_rpc.close()
self._is_stream_open = False

async def requests_done(self):
"""Signals that all requests have been sent."""

await self.socket_like_rpc.send(None)
await self.socket_like_rpc.recv()
Comment on lines +159 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to my comment on async_read_object_stream.py, this method should include a check to ensure the stream is open.

Additionally, this logic is duplicated across both stream classes. Refactoring this into the _AsyncAbstractObjectStream base class would be a good improvement for maintainability.

    async def requests_done(self):
        """Signals that all requests have been sent."""
        if not self._is_stream_open:
            raise ValueError("Stream is not open")

        await self.socket_like_rpc.send(None)
        await self.socket_like_rpc.recv()


async def send(
self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest
) -> None:
Expand Down Expand Up @@ -186,3 +193,4 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
@property
def is_stream_open(self) -> bool:
return self._is_stream_open

26 changes: 26 additions & 0 deletions tests/unit/asyncio/test_async_read_object_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,41 @@ async def test_close(mock_client, mock_cls_async_bidi_rpc):
read_obj_stream = await instantiate_read_obj_stream(
mock_client, mock_cls_async_bidi_rpc, open=True
)
read_obj_stream.requests_done = AsyncMock()

# act
await read_obj_stream.close()

# assert
read_obj_stream.requests_done.assert_called_once()
read_obj_stream.socket_like_rpc.close.assert_called_once()
assert not read_obj_stream.is_stream_open


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_requests_done(mock_client, mock_cls_async_bidi_rpc):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This is a good test for the happy path. With the recommended addition of the _is_stream_open check in requests_done, it would be beneficial to also add a test case to verify that a ValueError is raised when requests_done is called on a closed stream, similar to test_close_without_open_should_raise_error.

"""Test that requests_done signals the end of requests."""
# Arrange
read_obj_stream = await instantiate_read_obj_stream(
mock_client, mock_cls_async_bidi_rpc, open=True
)
read_obj_stream.socket_like_rpc.send = AsyncMock()
read_obj_stream.socket_like_rpc.recv = AsyncMock()

# Act
await read_obj_stream.requests_done()

# Assert
read_obj_stream.socket_like_rpc.send.assert_called_once_with(None)
read_obj_stream.socket_like_rpc.recv.assert_called_once()


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
)
Expand Down
23 changes: 23 additions & 0 deletions tests/unit/asyncio/test_async_write_object_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,13 @@ async def test_close(mock_cls_async_bidi_rpc, mock_client):
write_obj_stream = await instantiate_write_obj_stream(
mock_client, mock_cls_async_bidi_rpc, open=True
)
write_obj_stream.requests_done = AsyncMock()

# Act
await write_obj_stream.close()

# Assert
write_obj_stream.requests_done.assert_called_once()
write_obj_stream.socket_like_rpc.close.assert_called_once()
assert not write_obj_stream.is_stream_open

Expand Down Expand Up @@ -394,3 +396,24 @@ async def test_recv_without_open_should_raise_error(
# Act & Assert
with pytest.raises(ValueError, match="Stream is not open"):
await write_obj_stream.recv()


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
)
async def test_requests_done(mock_cls_async_bidi_rpc, mock_client):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the read stream test, this is a good happy path test. It would be valuable to add another test to ensure requests_done raises a ValueError when the stream is not open, to cover the failure case. This would be analogous to test_close_without_open_should_raise_error.

"""Test that requests_done signals the end of requests."""
# Arrange
write_obj_stream = await instantiate_write_obj_stream(
mock_client, mock_cls_async_bidi_rpc, open=True
)
write_obj_stream.socket_like_rpc.send = AsyncMock()
write_obj_stream.socket_like_rpc.recv = AsyncMock()

# Act
await write_obj_stream.requests_done()

# Assert
write_obj_stream.socket_like_rpc.send.assert_called_once_with(None)
write_obj_stream.socket_like_rpc.recv.assert_called_once()