Skip to content
Discussion options

You must be logged in to vote

Thanks for pointing out 👍🏻 . This is my working implementation ⬇️

import httpx
from aiolimiter import AsyncLimiter
from httpx import HTTPStatusError, Request, RequestError, Response

__all__ = ["AsyncRateLimiter", "AsyncRateLimitTransport", "AsyncRetryTransport"]

class AsyncRateLimiter(AsyncLimiter):
    def update_limit(self, *, max_rate: Optional[float] = None, time_period: Optional[float] = None):
        self.max_rate = max_rate or self.max_rate
        self.time_period = time_period or self.time_period
        self._rate_per_sec = self.max_rate / self.time_period

class AsyncRateLimitTransport(AsyncRetryTransport):
    def __init__(self, *args, default_max_rate: float, default_time_…

Replies: 1 comment 1 reply

Comment options

You must be logged in to vote
1 reply
@Tomas2D
Comment options

Answer selected by lovelydinosaur
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Ideas
Labels
None yet
2 participants