Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hypersync"
version = "0.10.0"
version = "0.11.0"
edition = "2021"

[lib]
Expand All @@ -20,7 +20,7 @@ serde = { version = "1", features = ["derive"] }
alloy-json-abi = "1.1"
alloy-dyn-abi = "1.1"
alloy-primitives = "1.1"
hypersync-client = "1.0.2"
hypersync-client = "1.1.1"
anyhow = "1"
arrow = { version = "57", features = ["ffi"] }
prefix-hex = "0.7"
Expand Down
36 changes: 35 additions & 1 deletion hypersync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from .hypersync import ArrowStream as _ArrowStream
from .hypersync import EventStream as _EventStream
from .hypersync import QueryResponseStream as _QueryResponseStream
from typing import Optional, Dict
from .hypersync import RateLimitInfo as _RateLimitInfo
from typing import Optional, Dict, Tuple
from dataclasses import dataclass
from strenum import StrEnum

Expand Down Expand Up @@ -734,6 +735,9 @@ class ClientConfig:
retry_ceiling_ms: Optional[int] = None
# Deprecated: use api_token instead. Will be removed in a future release.
bearer_token: Optional[str] = None
# Whether to proactively sleep when the rate limit is exhausted instead of
# sending requests that will be rejected with 429. Default: True.
proactive_rate_limit_sleep: Optional[bool] = None


class QueryResponseData(object):
Expand All @@ -760,6 +764,24 @@ class RollbackGuard(object):
first_parent_hash: str


class RateLimitInfo(object):
"""Rate limit information from server response headers."""
# Total request quota for the current window.
limit: Optional[int]
# Remaining budget in the current window.
remaining: Optional[int]
# Seconds until the rate limit window resets.
reset_secs: Optional[int]
# Budget consumed per request.
cost: Optional[int]
# Seconds to wait before retrying (from retry-after header).
retry_after_secs: Optional[int]
# Whether the rate limit quota has been exhausted.
is_rate_limited: bool
# Suggested number of seconds to wait before making another request.
suggested_wait_secs: Optional[int]


class QueryResponse(object):
# Current height of the source hypersync instance
archive_height: Optional[int]
Expand Down Expand Up @@ -916,6 +938,18 @@ async def get_arrow(self, query: Query) -> ArrowResponse:
"""Executes query with retries and returns the response in Arrow format."""
return await self.inner.get_arrow(query)

async def get_with_rate_limit(self, query: Query) -> Tuple[QueryResponse, RateLimitInfo]:
"""Executes query with retries and returns the response with rate limit info."""
return await self.inner.get_with_rate_limit(query)

def rate_limit_info(self) -> Optional[RateLimitInfo]:
"""Get the most recently observed rate limit information. Returns None if no requests have been made yet."""
return self.inner.rate_limit_info()

async def wait_for_rate_limit(self) -> None:
"""Wait until the current rate limit window resets. Returns immediately if no rate limit info observed or quota available."""
return await self.inner.wait_for_rate_limit()

async def stream(self, query: Query, config: StreamConfig) -> QueryResponseStream:
"""Spawns task to execute query and return data via a channel."""
return await self.inner.stream(query, config)
Expand Down
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub struct ClientConfig {
pub retry_base_ms: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retry_ceiling_ms: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub proactive_rate_limit_sleep: Option<bool>,
}

impl ClientConfig {
Expand Down
41 changes: 41 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use query::Query;
use response::{
convert_event_response, convert_response, ArrowStream, EventStream, QueryResponseStream,
};
use types::RateLimitInfo;

#[pymodule]
fn hypersync(m: &Bound<'_, PyModule>) -> PyResult<()> {
Expand All @@ -34,6 +35,7 @@ fn hypersync(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<ArrowStream>()?;
m.add_class::<EventStream>()?;
m.add_class::<QueryResponseStream>()?;
m.add_class::<RateLimitInfo>()?;
m.add_function(wrap_pyfunction!(decode::signature_to_topic0, m)?)?;

Ok(())
Expand Down Expand Up @@ -213,6 +215,45 @@ impl HypersyncClient {
})
}

/// Get blockchain data for a single query, with rate limit info
pub fn get_with_rate_limit<'py>(
&'py self,
query: Query,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let inner = Arc::clone(&self.inner);

future_into_py(py, async move {
let query = query.try_convert().context("parse query")?;

let res = inner
.get_with_rate_limit(&query)
.await
.context("get with rate limit")?;

let response = convert_response(res.response).context("convert response")?;
let rate_limit: RateLimitInfo = res.rate_limit.into();

Ok((response, rate_limit))
})
}

/// Get the most recently observed rate limit information.
/// Returns None if no requests have been made yet.
pub fn rate_limit_info(&self) -> Option<RateLimitInfo> {
self.inner.rate_limit_info().map(|info| info.into())
}

/// Wait until the current rate limit window resets.
/// Returns immediately if no rate limit info observed or quota available.
pub fn wait_for_rate_limit<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let inner = Arc::clone(&self.inner);
future_into_py(py, async move {
inner.wait_for_rate_limit().await;
Ok(())
})
}

pub fn stream<'py>(
&'py self,
query: Query,
Expand Down
35 changes: 35 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,41 @@ pub struct Trace {
pub refund_address: Option<String>,
}

/// Rate limit information from server response headers.
#[pyclass]
#[pyo3(get_all)]
#[derive(Clone)]
pub struct RateLimitInfo {
/// Total request quota for the current window.
pub limit: Option<u64>,
/// Remaining budget in the current window.
pub remaining: Option<u64>,
/// Seconds until the rate limit window resets.
pub reset_secs: Option<u64>,
/// Budget consumed per request.
pub cost: Option<u64>,
/// Seconds to wait before retrying (from retry-after header).
pub retry_after_secs: Option<u64>,
/// Whether the rate limit quota has been exhausted.
pub is_rate_limited: bool,
/// Suggested number of seconds to wait before making another request.
pub suggested_wait_secs: Option<u64>,
}

impl From<hypersync_client::RateLimitInfo> for RateLimitInfo {
fn from(info: hypersync_client::RateLimitInfo) -> Self {
Self {
limit: info.limit,
remaining: info.remaining,
reset_secs: info.reset_secs,
cost: info.cost,
retry_after_secs: info.retry_after_secs,
is_rate_limited: info.is_rate_limited(),
suggested_wait_secs: info.suggested_wait_secs(),
}
}
}

/// Decoded EVM log
#[pyclass]
#[pyo3(get_all)]
Expand Down