diff --git a/Cargo.toml b/Cargo.toml index 88966a9..47b0a9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hypersync" -version = "0.10.0" +version = "0.11.0" edition = "2021" [lib] @@ -20,7 +20,7 @@ serde = { version = "1", features = ["derive"] } alloy-json-abi = "1.1" alloy-dyn-abi = "1.1" alloy-primitives = "1.1" -hypersync-client = "1.0.2" +hypersync-client = "1.1.1" anyhow = "1" arrow = { version = "57", features = ["ffi"] } prefix-hex = "0.7" diff --git a/hypersync/__init__.py b/hypersync/__init__.py index 5d0ab40..b450a4a 100644 --- a/hypersync/__init__.py +++ b/hypersync/__init__.py @@ -5,7 +5,8 @@ from .hypersync import ArrowStream as _ArrowStream from .hypersync import EventStream as _EventStream from .hypersync import QueryResponseStream as _QueryResponseStream -from typing import Optional, Dict +from .hypersync import RateLimitInfo as _RateLimitInfo +from typing import Optional, Dict, Tuple from dataclasses import dataclass from strenum import StrEnum @@ -734,6 +735,9 @@ class ClientConfig: retry_ceiling_ms: Optional[int] = None # Deprecated: use api_token instead. Will be removed in a future release. bearer_token: Optional[str] = None + # Whether to proactively sleep when the rate limit is exhausted instead of + # sending requests that will be rejected with 429. Default: True. + proactive_rate_limit_sleep: Optional[bool] = None class QueryResponseData(object): @@ -760,6 +764,24 @@ class RollbackGuard(object): first_parent_hash: str +class RateLimitInfo(object): + """Rate limit information from server response headers.""" + # Total request quota for the current window. + limit: Optional[int] + # Remaining budget in the current window. + remaining: Optional[int] + # Seconds until the rate limit window resets. + reset_secs: Optional[int] + # Budget consumed per request. + cost: Optional[int] + # Seconds to wait before retrying (from retry-after header). + retry_after_secs: Optional[int] + # Whether the rate limit quota has been exhausted. + is_rate_limited: bool + # Suggested number of seconds to wait before making another request. + suggested_wait_secs: Optional[int] + + class QueryResponse(object): # Current height of the source hypersync instance archive_height: Optional[int] @@ -916,6 +938,18 @@ async def get_arrow(self, query: Query) -> ArrowResponse: """Executes query with retries and returns the response in Arrow format.""" return await self.inner.get_arrow(query) + async def get_with_rate_limit(self, query: Query) -> Tuple[QueryResponse, RateLimitInfo]: + """Executes query with retries and returns the response with rate limit info.""" + return await self.inner.get_with_rate_limit(query) + + def rate_limit_info(self) -> Optional[RateLimitInfo]: + """Get the most recently observed rate limit information. Returns None if no requests have been made yet.""" + return self.inner.rate_limit_info() + + async def wait_for_rate_limit(self) -> None: + """Wait until the current rate limit window resets. Returns immediately if no rate limit info observed or quota available.""" + return await self.inner.wait_for_rate_limit() + async def stream(self, query: Query, config: StreamConfig) -> QueryResponseStream: """Spawns task to execute query and return data via a channel.""" return await self.inner.stream(query, config) diff --git a/src/config.rs b/src/config.rs index 5c3cec6..1cc88e0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -76,6 +76,8 @@ pub struct ClientConfig { pub retry_base_ms: Option, #[serde(skip_serializing_if = "Option::is_none")] pub retry_ceiling_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub proactive_rate_limit_sleep: Option, } impl ClientConfig { diff --git a/src/lib.rs b/src/lib.rs index 6cc1dd6..ab34041 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ use query::Query; use response::{ convert_event_response, convert_response, ArrowStream, EventStream, QueryResponseStream, }; +use types::RateLimitInfo; #[pymodule] fn hypersync(m: &Bound<'_, PyModule>) -> PyResult<()> { @@ -34,6 +35,7 @@ fn hypersync(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_function(wrap_pyfunction!(decode::signature_to_topic0, m)?)?; Ok(()) @@ -213,6 +215,45 @@ impl HypersyncClient { }) } + /// Get blockchain data for a single query, with rate limit info + pub fn get_with_rate_limit<'py>( + &'py self, + query: Query, + py: Python<'py>, + ) -> PyResult> { + let inner = Arc::clone(&self.inner); + + future_into_py(py, async move { + let query = query.try_convert().context("parse query")?; + + let res = inner + .get_with_rate_limit(&query) + .await + .context("get with rate limit")?; + + let response = convert_response(res.response).context("convert response")?; + let rate_limit: RateLimitInfo = res.rate_limit.into(); + + Ok((response, rate_limit)) + }) + } + + /// Get the most recently observed rate limit information. + /// Returns None if no requests have been made yet. + pub fn rate_limit_info(&self) -> Option { + self.inner.rate_limit_info().map(|info| info.into()) + } + + /// Wait until the current rate limit window resets. + /// Returns immediately if no rate limit info observed or quota available. + pub fn wait_for_rate_limit<'py>(&'py self, py: Python<'py>) -> PyResult> { + let inner = Arc::clone(&self.inner); + future_into_py(py, async move { + inner.wait_for_rate_limit().await; + Ok(()) + }) + } + pub fn stream<'py>( &'py self, query: Query, diff --git a/src/types.rs b/src/types.rs index 4d714d2..c4febea 100644 --- a/src/types.rs +++ b/src/types.rs @@ -211,6 +211,41 @@ pub struct Trace { pub refund_address: Option, } +/// Rate limit information from server response headers. +#[pyclass] +#[pyo3(get_all)] +#[derive(Clone)] +pub struct RateLimitInfo { + /// Total request quota for the current window. + pub limit: Option, + /// Remaining budget in the current window. + pub remaining: Option, + /// Seconds until the rate limit window resets. + pub reset_secs: Option, + /// Budget consumed per request. + pub cost: Option, + /// Seconds to wait before retrying (from retry-after header). + pub retry_after_secs: Option, + /// Whether the rate limit quota has been exhausted. + pub is_rate_limited: bool, + /// Suggested number of seconds to wait before making another request. + pub suggested_wait_secs: Option, +} + +impl From for RateLimitInfo { + fn from(info: hypersync_client::RateLimitInfo) -> Self { + Self { + limit: info.limit, + remaining: info.remaining, + reset_secs: info.reset_secs, + cost: info.cost, + retry_after_secs: info.retry_after_secs, + is_rate_limited: info.is_rate_limited(), + suggested_wait_secs: info.suggested_wait_secs(), + } + } +} + /// Decoded EVM log #[pyclass] #[pyo3(get_all)]