Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/indexer/engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use starknet::core::types::{Event, TransactionContent};
use starknet::macros::selector;
use starknet::providers::Provider;
use starknet_crypto::Felt;
use tokio::runtime::Runtime;
use std::sync::LazyLock;
use tokio::sync::broadcast::Sender;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -64,6 +65,7 @@ pub struct Engine<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static>
controllers: Option<Arc<ControllersSync>>,
fetcher: Fetcher<P>,
nft_metadata_semaphore: Arc<Semaphore>,
nft_metadata_runtime: Arc<Runtime>,
// The last fetch result & cursors, in case the processing fails, but not fetching.
// Thus we can retry the processing with the same data instead of fetching again.
cached_fetch: Option<(Box<FetchResult>, HashMap<Felt, ContractType>)>,
Expand Down Expand Up @@ -96,6 +98,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
processors: Arc<Processors<P>>,
config: EngineConfig,
shutdown_tx: Sender<()>,
nft_metadata_runtime: Arc<Runtime>
) -> Self {
Self::new_with_controllers(
storage,
Expand All @@ -105,6 +108,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
config,
shutdown_tx,
None,
nft_metadata_runtime
)
}

Expand All @@ -117,6 +121,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
config: EngineConfig,
shutdown_tx: Sender<()>,
controllers: Option<Arc<ControllersSync>>,
nft_metadata_runtime: Arc<Runtime>,
) -> Self {
let max_concurrent_tasks = config.max_concurrent_tasks;
let event_processor_config = config.event_processor_config.clone();
Expand All @@ -138,11 +143,13 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
processors,
max_concurrent_tasks,
event_processor_config,
nft_metadata_runtime.clone()
),
contract_class_cache: Arc::new(ContractClassCache::new(provider.clone())),
controllers,
fetcher: Fetcher::new(provider.clone(), fetcher_config),
nft_metadata_semaphore,
nft_metadata_runtime,
cached_fetch: None,
}
}
Expand Down Expand Up @@ -567,6 +574,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
event: event.clone(),
nft_metadata_semaphore: self.nft_metadata_semaphore.clone(),
is_at_head,
nft_metadata_runtime: self.nft_metadata_runtime.clone()
};
if self.processors.catch_all_event.validate(event) {
if let Err(e) = self.processors.catch_all_event.process(&ctx).await {
Expand Down
25 changes: 15 additions & 10 deletions crates/processors/src/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use starknet::{
providers::{Provider, ProviderError, ProviderRequestData, ProviderResponseData},
};
use starknet_crypto::Felt;
use tokio::sync::Semaphore;
use tokio::{runtime::Runtime, sync::Semaphore};
use torii_cache::Cache;
use torii_storage::Storage;
use tracing::{debug, warn};
Expand Down Expand Up @@ -409,6 +409,7 @@ pub async fn try_register_nft_token_metadata<P: Provider + Sync>(
cache: Arc<dyn Cache + Send + Sync>,
storage: Arc<dyn Storage>,
nft_metadata_semaphore: Arc<Semaphore>,
runtime: Arc<Runtime>,
) -> Result<(), Error> {
let _lock = match cache.get_token_registration_lock(id.clone()).await {
Some(lock) => lock,
Expand All @@ -419,18 +420,22 @@ pub async fn try_register_nft_token_metadata<P: Provider + Sync>(
return Ok(());
}

let _permit = nft_metadata_semaphore
.acquire()
.await
.map_err(|e| Error::TokenMetadataError(TokenMetadataError::AcquireError(e)))?;
let metadata = fetch_token_metadata(contract_address, actual_token_id, provider).await?;
runtime.spawn(async move {
let _permit = nft_metadata_semaphore
.acquire()
.await
.map_err(|e| Error::TokenMetadataError(TokenMetadataError::AcquireError(e)))?;
let metadata = fetch_token_metadata(contract_address, actual_token_id, provider).await?;

storage
.register_nft_token(contract_address, actual_token_id, metadata)
.await?;
storage
.register_nft_token(contract_address, actual_token_id, metadata)
.await?;

cache.mark_token_registered(id).await;
cache.mark_token_registered(id).await;

Result::<(), Error>::Ok(())
});

// For ERC-1155, we need to track unique token count at contract level
// This is called when a new token is being registered, so we increment by 1
// We can't distinguish ERC-721 vs ERC-1155 here, but ERC-721 will also increment by 1
Expand Down
2 changes: 2 additions & 0 deletions crates/processors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use starknet::core::types::{Event, Felt, TransactionContent};
use starknet::providers::Provider;
use tokio::runtime::Runtime;
use tokio::sync::Semaphore;
use torii_cache::{Cache, ContractClassCache};
use torii_storage::Storage;
Expand Down Expand Up @@ -34,6 +35,7 @@ pub struct EventProcessorContext<P: Provider + Sync + Send + 'static> {
pub event: Event,
pub config: EventProcessorConfig,
pub nft_metadata_semaphore: Arc<Semaphore>,
pub nft_metadata_runtime: Arc<Runtime>,
pub is_at_head: bool,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,12 @@ where
while token_id <= to_token_id {
let storage = ctx.storage.clone();
let nft_metadata_semaphore = ctx.nft_metadata_semaphore.clone();
let runtime = ctx.nft_metadata_runtime.clone();
let provider = ctx.provider.clone();
let token_address_clone = token_address;
let current_token_id = token_id;

tasks.push(tokio::spawn(async move {
tasks.push(runtime.spawn(async move {
let _permit = nft_metadata_semaphore
.acquire()
.await
Expand Down
3 changes: 2 additions & 1 deletion crates/processors/src/processors/erc4906_metadata_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ where
let storage = ctx.storage.clone();
let provider = ctx.provider.clone();
let semaphore = ctx.nft_metadata_semaphore.clone();
let runtime = ctx.nft_metadata_runtime.clone();

tokio::spawn(async move {
runtime.spawn(async move {
let _permit = match semaphore.acquire().await {
Ok(permit) => permit,
Err(e) => {
Expand Down
7 changes: 7 additions & 0 deletions crates/processors/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use hashlink::LinkedHashMap;
use starknet::core::types::Event;
use starknet::providers::Provider;
use starknet_crypto::Felt;
use tokio::runtime::Runtime;
use tokio::sync::Semaphore;
use torii_cache::Cache;
use torii_proto::ContractType;
Expand Down Expand Up @@ -48,6 +49,7 @@ pub struct TaskManager<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'st
processors: Arc<Processors<P>>,
event_processor_config: EventProcessorConfig,
nft_metadata_semaphore: Arc<Semaphore>,
nft_metadata_runtime: Arc<Runtime>,
}

impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<P> {
Expand All @@ -58,6 +60,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<
processors: Arc<Processors<P>>,
max_concurrent_tasks: usize,
event_processor_config: EventProcessorConfig,
nft_metadata_runtime: Arc<Runtime>,
) -> Self {
Self {
storage,
Expand All @@ -69,6 +72,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<
event_processor_config.max_metadata_tasks,
)),
event_processor_config,
nft_metadata_runtime,
}
}

Expand Down Expand Up @@ -144,6 +148,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<
let event_processor_config = self.event_processor_config.clone();
let cache = self.cache.clone();
let nft_metadata_semaphore = self.nft_metadata_semaphore.clone();
let nft_metadata_runtime = self.nft_metadata_runtime.clone();

self.task_network
.process_tasks(move |task_id, task_data| {
Expand All @@ -153,6 +158,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<
let event_processor_config = event_processor_config.clone();
let cache = cache.clone();
let nft_metadata_semaphore = nft_metadata_semaphore.clone();
let nft_metadata_runtime = nft_metadata_runtime.clone();

async move {
// Process all events for this task sequentially
Expand Down Expand Up @@ -209,6 +215,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<
config: event_processor_config.clone(),
nft_metadata_semaphore: nft_metadata_semaphore.clone(),
is_at_head: *is_at_head,
nft_metadata_runtime: nft_metadata_runtime.clone(),
};

// Record processor timing and success/error metrics
Expand Down
2 changes: 2 additions & 0 deletions crates/runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ impl Runner {
"Runtime allocation calculated"
);

let nft_metadata_runtime = Arc::new(Runtime::new()?);
let mut engine: Engine<Arc<JsonRpcClient<HttpTransport>>> = Engine::new_with_controllers(
storage.clone(),
cache.clone(),
Expand Down Expand Up @@ -686,6 +687,7 @@ impl Runner {
},
shutdown_tx.clone(),
controllers,
nft_metadata_runtime,
);

let shutdown_rx = shutdown_tx.subscribe();
Expand Down
Loading