diff --git a/.secrets.baseline b/.secrets.baseline index 3fd4156bb0..59625efc63 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -128,7 +128,7 @@ "filename": "docs/docs/en/release.md", "hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450", "is_verified": false, - "line_number": 1325, + "line_number": 1376, "is_secret": false } ], @@ -163,5 +163,5 @@ } ] }, - "generated_at": "2024-06-10T09:56:52Z" + "generated_at": "2024-07-18T16:41:25Z" } diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 35445a790b..6b6ac2f90b 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -6,7 +6,7 @@ We as members, contributors, and leaders pledge to make participation in our community a harassment-free experience for everyone, regardless of age, body size, visible or invisible disability, ethnicity, sex characteristics, gender -identity and expression, level of experience, education, socio-economic status, +identity and expression, level of experience, education, socioeconomic status, nationality, personal appearance, race, caste, color, religion, or sexual identity and orientation. diff --git a/docs/docs/en/release.md b/docs/docs/en/release.md index feadf44839..5a48192f49 100644 --- a/docs/docs/en/release.md +++ b/docs/docs/en/release.md @@ -12,6 +12,35 @@ hide: --- # Release Notes +## 0.5.14 + +### What's Changed +* Update Release Notes for 0.5.13 by @faststream-release-notes-updater in [#1548](https://github.com/airtai/faststream/pull/1548){.external-link target="_blank"} +* Add allow_auto_create_topics to make automatic topic creation configurable by [@kumaranvpl](https://github.com/kumaranvpl){.external-link target="_blank"} in [#1556](https://github.com/airtai/faststream/pull/1556){.external-link target="_blank"} + + +**Full Changelog**: [#0.5.13...0.5.14](https://github.com/airtai/faststream/compare/0.5.13...0.5.14){.external-link target="_blank"} + +## 0.5.13 + +### What's Changed + +* feat: nats filter JS subscription support by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1519](https://github.com/airtai/faststream/pull/1519){.external-link target="_blank"} +* fix: correct RabbitExchange processing by OTEL in broker.publish case by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1521](https://github.com/airtai/faststream/pull/1521){.external-link target="_blank"} +* fix: correct Nats ObjectStorage get file behavior inside watch subscriber by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1523](https://github.com/airtai/faststream/pull/1523){.external-link target="_blank"} +* Resolve Issue 1386, Add rpc_prefix by [@aKardasz](https://github.com/aKardasz){.external-link target="_blank"} in [#1484](https://github.com/airtai/faststream/pull/1484){.external-link target="_blank"} +* fix: correct spans linking in batches case by [@draincoder](https://github.com/draincoder){.external-link target="_blank"} in [#1532](https://github.com/airtai/faststream/pull/1532){.external-link target="_blank"} +* fix (#1539): correct anyio.create_memory_object_stream annotation by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1541](https://github.com/airtai/faststream/pull/1541){.external-link target="_blank"} +* fix: correct publish_coverage CI by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1536](https://github.com/airtai/faststream/pull/1536){.external-link target="_blank"} +* Add NatsBroker.new_inbox() by [@maxalbert](https://github.com/maxalbert){.external-link target="_blank"} in [#1543](https://github.com/airtai/faststream/pull/1543){.external-link target="_blank"} +* fix (#1544): correct Redis message nack & reject signature by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1546](https://github.com/airtai/faststream/pull/1546){.external-link target="_blank"} + +### New Contributors +* [@aKardasz](https://github.com/aKardasz){.external-link target="_blank"} made their first contribution in [#1484](https://github.com/airtai/faststream/pull/1484){.external-link target="_blank"} +* [@maxalbert](https://github.com/maxalbert){.external-link target="_blank"} made their first contribution in [#1543](https://github.com/airtai/faststream/pull/1543){.external-link target="_blank"} + +**Full Changelog**: [#0.5.12...0.5.13](https://github.com/airtai/faststream/compare/0.5.12...0.5.13){.external-link target="_blank"} + ## 0.5.12 ### What's Changed @@ -324,7 +353,7 @@ You can find more information about it in the official [**aiokafka** doc](https: `pattern` option was added too, but it is still experimental and does not support `Path` -3. [`Path`](https://faststream.airt.ai/latest/nats/message/#subject-pattern-access) feature performance was increased. Also, `Path` is suitable for NATS `PullSub` batch subscribtion as well now. +3. [`Path`](https://faststream.airt.ai/latest/nats/message/#subject-pattern-access) feature performance was increased. Also, `Path` is suitable for NATS `PullSub` batch subscription as well now. ```python from faststream import NatsBroker, PullSub diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index 013914b9d5..2f54a0f5bd 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -201,7 +201,7 @@ def __init__( max_channel_pool_size: Annotated[ int, Doc("Max channel pool size"), - ] = 1, + ] = 2, # NOTE: because we're sharing channels between consumers and producers ) -> None: security_args = parse_security(security) diff --git a/faststream/rabbit/broker/connection.py b/faststream/rabbit/broker/connection.py index b332eb8a42..636b2eb8a7 100644 --- a/faststream/rabbit/broker/connection.py +++ b/faststream/rabbit/broker/connection.py @@ -27,7 +27,7 @@ def __init__( publisher_confirms: bool, on_return_raises: bool, ) -> None: - self._connection_pool: "Pool[RobustConnection]" = Pool( + self._connection_pool: Pool[RobustConnection] = Pool( lambda: connect_robust( url=url, timeout=timeout, @@ -36,7 +36,7 @@ def __init__( max_size=connection_pool_size, ) - self._channel_pool: "Pool[RobustChannel]" = Pool( + self._channel_pool: Pool[RobustChannel] = Pool( lambda: self._get_channel( channel_number=channel_number, publisher_confirms=publisher_confirms, diff --git a/faststream/rabbit/helpers/declarer.py b/faststream/rabbit/helpers/declarer.py index a5f38f8deb..1302a02035 100644 --- a/faststream/rabbit/helpers/declarer.py +++ b/faststream/rabbit/helpers/declarer.py @@ -1,5 +1,5 @@ -from contextlib import AsyncExitStack -from typing import TYPE_CHECKING, Dict, Optional, cast +from contextlib import AsyncExitStack, asynccontextmanager +from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional, Tuple, cast if TYPE_CHECKING: import aio_pika @@ -12,8 +12,13 @@ class RabbitDeclarer: """An utility class to declare RabbitMQ queues and exchanges.""" __connection_manager: "ConnectionManager" - __queues: Dict["RabbitQueue", "aio_pika.RobustQueue"] - __exchanges: Dict["RabbitExchange", "aio_pika.RobustExchange"] + __queues: Dict[ + Tuple[Optional["aio_pika.RobustChannel"], "RabbitQueue"], "aio_pika.RobustQueue" + ] + __exchanges: Dict[ + Tuple[Optional["aio_pika.RobustChannel"], "RabbitExchange"], + "aio_pika.RobustExchange", + ] def __init__(self, connection_manager: "ConnectionManager") -> None: self.__connection_manager = connection_manager @@ -28,14 +33,18 @@ async def declare_queue( channel: Optional["aio_pika.RobustChannel"] = None, ) -> "aio_pika.RobustQueue": """Declare a queue.""" - if (queue_obj := self.__queues.get(queue)) is None: + # NOTE: It would return the queue linked to another channel if it was already declared + # unless the channel is part of the key + if (queue_obj := self.__queues.get((channel, queue))) is None: async with AsyncExitStack() as stack: if channel is None: channel = await stack.enter_async_context( self.__connection_manager.acquire_channel() ) + if (channel, queue) in self.__queues: + return self.__queues[(channel, queue)] - self.__queues[queue] = queue_obj = cast( + self.__queues[(channel, queue)] = queue_obj = cast( "aio_pika.RobustQueue", await channel.declare_queue( name=queue.name, @@ -59,7 +68,9 @@ async def declare_exchange( channel: Optional["aio_pika.RobustChannel"] = None, ) -> "aio_pika.RobustExchange": """Declare an exchange, parent exchanges and bind them each other.""" - if exch := self.__exchanges.get(exchange): + # NOTE: It would return the queue linked to another channel if it was already declared + # unless the channel is part of the key + if exch := self.__exchanges.get((channel, exchange)): return exch async with AsyncExitStack() as stack: @@ -67,12 +78,14 @@ async def declare_exchange( channel = await stack.enter_async_context( self.__connection_manager.acquire_channel() ) + if (channel, exchange) in self.__exchanges: + return self.__exchanges[(channel, exchange)] if not exchange.name: return channel.default_exchange else: - self.__exchanges[exchange] = exch = cast( + self.__exchanges[(channel, exchange)] = exch = cast( "aio_pika.RobustExchange", await channel.declare_exchange( name=exchange.name, @@ -102,3 +115,22 @@ async def declare_exchange( ) return exch # type: ignore[return-value] + + @asynccontextmanager + async def declare_queue_scope( + self, + queue: "RabbitQueue", + passive: bool = False, + *, + channel: Optional["aio_pika.RobustChannel"] = None, + ) -> AsyncGenerator["aio_pika.RobustQueue", None]: + """Declare a queue and return it with a context manager.""" + async with AsyncExitStack() as stack: + if channel is None: + channel = await stack.enter_async_context( + self.__connection_manager.acquire_channel() + ) + + yield await self.declare_queue( + queue=queue, passive=passive, channel=channel + ) diff --git a/faststream/rabbit/publisher/producer.py b/faststream/rabbit/publisher/producer.py index 4d1a6acd23..5f3976e98a 100644 --- a/faststream/rabbit/publisher/producer.py +++ b/faststream/rabbit/publisher/producer.py @@ -1,9 +1,11 @@ +from contextlib import asynccontextmanager from typing import ( TYPE_CHECKING, Any, AsyncContextManager, + AsyncGenerator, Optional, - Type, + Tuple, Union, cast, ) @@ -17,13 +19,15 @@ from faststream.exceptions import WRONG_PUBLISH_ARGS from faststream.rabbit.parser import AioPikaParser from faststream.rabbit.schemas import RABBIT_REPLY, RabbitExchange -from faststream.utils.functions import fake_context, timeout_scope +from faststream.utils.classes import Singleton +from faststream.utils.functions import ( + fake_context_yielding, + timeout_scope, +) if TYPE_CHECKING: - from types import TracebackType - import aiormq - from aio_pika import IncomingMessage, RobustChannel, RobustQueue + from aio_pika import IncomingMessage, RobustChannel from aio_pika.abc import DateType, HeadersType, TimeoutType from anyio.streams.memory import MemoryObjectReceiveStream @@ -50,8 +54,7 @@ def __init__( decoder: Optional["CustomCallable"], ) -> None: self.declarer = declarer - - self._rpc_lock = anyio.Lock() + self.rpc_manager = _RPCManager(declarer=declarer) default_parser = AioPikaParser() self._parser = resolve_custom_func(parser, default_parser.parse_message) @@ -86,24 +89,25 @@ async def publish( # type: ignore[override] ) -> Optional[Any]: """Publish a message to a RabbitMQ queue.""" context: AsyncContextManager[ - Optional[MemoryObjectReceiveStream[IncomingMessage]] + Union[ + Tuple[MemoryObjectReceiveStream[IncomingMessage], RobustChannel], + Tuple[None, None], + ] ] - channel: Optional["RobustChannel"] + channel: Optional[RobustChannel] + response_queue: Optional[MemoryObjectReceiveStream[IncomingMessage]] if rpc: if reply_to is not None: raise WRONG_PUBLISH_ARGS - rmq_queue = await self.declarer.declare_queue(RABBIT_REPLY) - channel = cast("RobustChannel", rmq_queue.channel) - context = _RPCCallback(self._rpc_lock, rmq_queue) + context = self.rpc_manager() reply_to = RABBIT_REPLY.name else: - channel = None - context = fake_context() + context = fake_context_yielding(with_yield=(None, None)) - async with context as response_queue: + async with context as (response_queue, channel): r = await self._publish( message=message, exchange=exchange, @@ -197,37 +201,45 @@ async def _publish( ) -class _RPCCallback: - """A class provides an RPC lock.""" - - def __init__(self, lock: "anyio.Lock", callback_queue: "RobustQueue") -> None: - self.lock = lock - self.queue = callback_queue +class _RPCManager(Singleton): + """A class that provides an RPC lock.""" - async def __aenter__(self) -> "MemoryObjectReceiveStream[IncomingMessage]": - ( - send_response_stream, - receive_response_stream, - ) = anyio.create_memory_object_stream[AbstractIncomingMessage]( - max_buffer_size=1 - ) - await self.lock.acquire() - - self.consumer_tag = await self.queue.consume( - callback=send_response_stream.send, - no_ack=True, - ) - - return cast( - "MemoryObjectReceiveStream[IncomingMessage]", - receive_response_stream, - ) + def __init__(self, declarer: "RabbitDeclarer") -> None: + self.declarer = declarer - async def __aexit__( + @asynccontextmanager + async def __call__( self, - exc_type: Optional[Type[BaseException]] = None, - exc_val: Optional[BaseException] = None, - exc_tb: Optional["TracebackType"] = None, - ) -> None: - self.lock.release() - await self.queue.cancel(self.consumer_tag) + ) -> AsyncGenerator[ + Tuple[ + "MemoryObjectReceiveStream[IncomingMessage]", + "RobustChannel", + ], + None, + ]: + # NOTE: this allows us to make sure the channel is only used by a single + # RPC call at a time, however, if the channel pool is used for both consuming + # and producing, they will be blocked by each other + async with self.declarer.declare_queue_scope(RABBIT_REPLY) as queue: + consumer_tag = None + try: + ( + send_response_stream, + receive_response_stream, + ) = anyio.create_memory_object_stream[AbstractIncomingMessage]( + max_buffer_size=1 + ) + consumer_tag = await queue.consume( + callback=send_response_stream.send, # type: ignore[arg-type] + no_ack=True, + ) + yield ( + cast( + "MemoryObjectReceiveStream[IncomingMessage]", + receive_response_stream, + ), + cast("RobustChannel", queue.channel), + ) + finally: + if consumer_tag is not None: + await queue.cancel(consumer_tag) # type: ignore[index] diff --git a/faststream/utils/functions.py b/faststream/utils/functions.py index 81b1b06db9..81ff8ce02e 100644 --- a/faststream/utils/functions.py +++ b/faststream/utils/functions.py @@ -70,6 +70,15 @@ async def fake_context(*args: Any, **kwargs: Any) -> AsyncIterator[None]: yield None +@asynccontextmanager +async def fake_context_yielding( + *args: Any, + with_yield: F_Return = None, # type: ignore[assignment] + **kwargs: Any, +) -> AsyncIterator[F_Return]: + yield with_yield + + @contextmanager def sync_fake_context(*args: Any, **kwargs: Any) -> Iterator[None]: yield None diff --git a/tests/brokers/rabbit/test_rpc.py b/tests/brokers/rabbit/test_rpc.py index d0bd80cab7..a0e7cebd09 100644 --- a/tests/brokers/rabbit/test_rpc.py +++ b/tests/brokers/rabbit/test_rpc.py @@ -1,10 +1,120 @@ +import asyncio + +import anyio import pytest from faststream.rabbit import RabbitBroker +from faststream.rabbit.publisher.producer import _RPCManager from tests.brokers.base.rpc import BrokerRPCTestcase, ReplyAndConsumeForbidden @pytest.mark.rabbit() class TestRPC(BrokerRPCTestcase, ReplyAndConsumeForbidden): - def get_broker(self, apply_types: bool = False) -> RabbitBroker: - return RabbitBroker(apply_types=apply_types) + def get_broker( + self, apply_types: bool = False, max_channel_pool_size: int = 2 + ) -> RabbitBroker: + return RabbitBroker( + apply_types=apply_types, max_channel_pool_size=max_channel_pool_size + ) + + @pytest.mark.asyncio() + async def test_rpc_with_concurrency(self, queue: str): + rpc_broker = self.get_broker(max_channel_pool_size=20) + + @rpc_broker.subscriber(queue) + async def m(m): # pragma: no cover + await asyncio.sleep(0.1) + return m + + async with self.patch_broker(rpc_broker) as br: + await br.start() + + with anyio.fail_after(1): + results = await asyncio.gather( + *[ + br.publish( + f"hello {i}", + queue, + rpc=True, + ) + for i in range(10) + ] + ) + + for i, r in enumerate(results): + assert r == f"hello {i}" + + @pytest.mark.asyncio() + async def test_rpc_with_concurrency_equal_consumers_channels(self, queue: str): + rpc_broker = self.get_broker(max_channel_pool_size=9) + + @rpc_broker.subscriber(queue) + async def m(m): # pragma: no cover + await asyncio.sleep(0.1) + return m + + async with self.patch_broker(rpc_broker) as br: + await br.start() + + with anyio.fail_after(1): + results = await asyncio.gather( + *[ + br.publish( + f"hello {i}", + queue, + rpc=True, + ) + for i in range(10) + ] + ) + + for i, r in enumerate(results): + assert r == f"hello {i}" + + @pytest.mark.asyncio() + async def test_rpc_recovers_after_timeout(self, queue: str): + rpc_broker = self.get_broker() + + @rpc_broker.subscriber(queue) + async def m(m): # pragma: no cover + await anyio.sleep(0.1) + return m + + async with self.patch_broker(rpc_broker) as br: + await br.start() + + with pytest.raises(TimeoutError): # pragma: no branch + await br.publish( + "hello", + queue, + rpc=True, + rpc_timeout=0, + raise_timeout=True, + ) + assert ( + await br.publish( + "hello", + queue, + rpc=True, + ) + ) == "hello" + + +class TestRPCManager: + @pytest.mark.asyncio() + async def test_context_variables_per_concurrent_task(self): + rpc_broker = RabbitBroker(max_channel_pool_size=10) + rpc_manager = _RPCManager(declarer=rpc_broker.declarer) + receive_streams = set() + channels = set() + + async def run_operation(): + async with rpc_manager() as (receive_stream, channel): + receive_streams.add(receive_stream) + channels.add(channel) + await asyncio.sleep(0.1) + + async with rpc_broker: + await asyncio.gather(*[run_operation() for _ in range(10)]) + assert len(receive_streams) == 10 + assert len(channels) == 10 diff --git a/tests/conftest.py b/tests/conftest.py index 92778c660a..1caa0b07ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -62,3 +62,11 @@ def context(): @pytest.fixture() def kafka_basic_project(): return "docs.docs_src.kafka.basic.basic:app" + + +@pytest.fixture(scope="session", autouse=True) +def event_loop(request): + """Create an instance of the default event loop for each test case.""" + loop = asyncio.get_event_loop_policy().new_event_loop() + yield loop + loop.close()