diff --git a/crates/indexer/engine/src/engine.rs b/crates/indexer/engine/src/engine.rs index f3e30c67..d5cf4c9f 100644 --- a/crates/indexer/engine/src/engine.rs +++ b/crates/indexer/engine/src/engine.rs @@ -8,6 +8,7 @@ use starknet::core::types::{Event, TransactionContent}; use starknet::macros::selector; use starknet::providers::Provider; use starknet_crypto::Felt; +use tokio::runtime::Runtime; use std::sync::LazyLock; use tokio::sync::broadcast::Sender; use tokio::sync::Semaphore; @@ -64,6 +65,7 @@ pub struct Engine controllers: Option>, fetcher: Fetcher

, nft_metadata_semaphore: Arc, + nft_metadata_runtime: Arc, // The last fetch result & cursors, in case the processing fails, but not fetching. // Thus we can retry the processing with the same data instead of fetching again. cached_fetch: Option<(Box, HashMap)>, @@ -96,6 +98,7 @@ impl Engine

{ processors: Arc>, config: EngineConfig, shutdown_tx: Sender<()>, + nft_metadata_runtime: Arc ) -> Self { Self::new_with_controllers( storage, @@ -105,6 +108,7 @@ impl Engine

{ config, shutdown_tx, None, + nft_metadata_runtime ) } @@ -117,6 +121,7 @@ impl Engine

{ config: EngineConfig, shutdown_tx: Sender<()>, controllers: Option>, + nft_metadata_runtime: Arc, ) -> Self { let max_concurrent_tasks = config.max_concurrent_tasks; let event_processor_config = config.event_processor_config.clone(); @@ -138,11 +143,13 @@ impl Engine

{ processors, max_concurrent_tasks, event_processor_config, + nft_metadata_runtime.clone() ), contract_class_cache: Arc::new(ContractClassCache::new(provider.clone())), controllers, fetcher: Fetcher::new(provider.clone(), fetcher_config), nft_metadata_semaphore, + nft_metadata_runtime, cached_fetch: None, } } @@ -567,6 +574,7 @@ impl Engine

{ event: event.clone(), nft_metadata_semaphore: self.nft_metadata_semaphore.clone(), is_at_head, + nft_metadata_runtime: self.nft_metadata_runtime.clone() }; if self.processors.catch_all_event.validate(event) { if let Err(e) = self.processors.catch_all_event.process(&ctx).await { diff --git a/crates/processors/src/erc.rs b/crates/processors/src/erc.rs index 5a64df96..b0acbd11 100644 --- a/crates/processors/src/erc.rs +++ b/crates/processors/src/erc.rs @@ -11,7 +11,7 @@ use starknet::{ providers::{Provider, ProviderError, ProviderRequestData, ProviderResponseData}, }; use starknet_crypto::Felt; -use tokio::sync::Semaphore; +use tokio::{runtime::Runtime, sync::Semaphore}; use torii_cache::Cache; use torii_storage::Storage; use tracing::{debug, warn}; @@ -409,6 +409,7 @@ pub async fn try_register_nft_token_metadata( cache: Arc, storage: Arc, nft_metadata_semaphore: Arc, + runtime: Arc, ) -> Result<(), Error> { let _lock = match cache.get_token_registration_lock(id.clone()).await { Some(lock) => lock, @@ -419,18 +420,22 @@ pub async fn try_register_nft_token_metadata( return Ok(()); } - let _permit = nft_metadata_semaphore - .acquire() - .await - .map_err(|e| Error::TokenMetadataError(TokenMetadataError::AcquireError(e)))?; - let metadata = fetch_token_metadata(contract_address, actual_token_id, provider).await?; + runtime.spawn(async move { + let _permit = nft_metadata_semaphore + .acquire() + .await + .map_err(|e| Error::TokenMetadataError(TokenMetadataError::AcquireError(e)))?; + let metadata = fetch_token_metadata(contract_address, actual_token_id, provider).await?; - storage - .register_nft_token(contract_address, actual_token_id, metadata) - .await?; + storage + .register_nft_token(contract_address, actual_token_id, metadata) + .await?; - cache.mark_token_registered(id).await; + cache.mark_token_registered(id).await; + Result::<(), Error>::Ok(()) + }); + // For ERC-1155, we need to track unique token count at contract level // This is called when a new token is being registered, so we increment by 1 // We can't distinguish ERC-721 vs ERC-1155 here, but ERC-721 will also increment by 1 diff --git a/crates/processors/src/lib.rs b/crates/processors/src/lib.rs index 1dd04035..d0edf1c5 100644 --- a/crates/processors/src/lib.rs +++ b/crates/processors/src/lib.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use async_trait::async_trait; use starknet::core::types::{Event, Felt, TransactionContent}; use starknet::providers::Provider; +use tokio::runtime::Runtime; use tokio::sync::Semaphore; use torii_cache::{Cache, ContractClassCache}; use torii_storage::Storage; @@ -34,6 +35,7 @@ pub struct EventProcessorContext { pub event: Event, pub config: EventProcessorConfig, pub nft_metadata_semaphore: Arc, + pub nft_metadata_runtime: Arc, pub is_at_head: bool, } diff --git a/crates/processors/src/processors/erc4906_batch_metadata_update.rs b/crates/processors/src/processors/erc4906_batch_metadata_update.rs index cdc3160c..a3c5bb5b 100644 --- a/crates/processors/src/processors/erc4906_batch_metadata_update.rs +++ b/crates/processors/src/processors/erc4906_batch_metadata_update.rs @@ -93,11 +93,12 @@ where while token_id <= to_token_id { let storage = ctx.storage.clone(); let nft_metadata_semaphore = ctx.nft_metadata_semaphore.clone(); + let runtime = ctx.nft_metadata_runtime.clone(); let provider = ctx.provider.clone(); let token_address_clone = token_address; let current_token_id = token_id; - tasks.push(tokio::spawn(async move { + tasks.push(runtime.spawn(async move { let _permit = nft_metadata_semaphore .acquire() .await diff --git a/crates/processors/src/processors/erc4906_metadata_update.rs b/crates/processors/src/processors/erc4906_metadata_update.rs index a459a99b..ee0e02a9 100644 --- a/crates/processors/src/processors/erc4906_metadata_update.rs +++ b/crates/processors/src/processors/erc4906_metadata_update.rs @@ -87,8 +87,9 @@ where let storage = ctx.storage.clone(); let provider = ctx.provider.clone(); let semaphore = ctx.nft_metadata_semaphore.clone(); + let runtime = ctx.nft_metadata_runtime.clone(); - tokio::spawn(async move { + runtime.spawn(async move { let _permit = match semaphore.acquire().await { Ok(permit) => permit, Err(e) => { diff --git a/crates/processors/src/task_manager.rs b/crates/processors/src/task_manager.rs index 9d67451d..77ae3a67 100644 --- a/crates/processors/src/task_manager.rs +++ b/crates/processors/src/task_manager.rs @@ -4,6 +4,7 @@ use hashlink::LinkedHashMap; use starknet::core::types::Event; use starknet::providers::Provider; use starknet_crypto::Felt; +use tokio::runtime::Runtime; use tokio::sync::Semaphore; use torii_cache::Cache; use torii_proto::ContractType; @@ -48,6 +49,7 @@ pub struct TaskManager>, event_processor_config: EventProcessorConfig, nft_metadata_semaphore: Arc, + nft_metadata_runtime: Arc, } impl TaskManager

{ @@ -58,6 +60,7 @@ impl TaskManager< processors: Arc>, max_concurrent_tasks: usize, event_processor_config: EventProcessorConfig, + nft_metadata_runtime: Arc, ) -> Self { Self { storage, @@ -69,6 +72,7 @@ impl TaskManager< event_processor_config.max_metadata_tasks, )), event_processor_config, + nft_metadata_runtime, } } @@ -144,6 +148,7 @@ impl TaskManager< let event_processor_config = self.event_processor_config.clone(); let cache = self.cache.clone(); let nft_metadata_semaphore = self.nft_metadata_semaphore.clone(); + let nft_metadata_runtime = self.nft_metadata_runtime.clone(); self.task_network .process_tasks(move |task_id, task_data| { @@ -153,6 +158,7 @@ impl TaskManager< let event_processor_config = event_processor_config.clone(); let cache = cache.clone(); let nft_metadata_semaphore = nft_metadata_semaphore.clone(); + let nft_metadata_runtime = nft_metadata_runtime.clone(); async move { // Process all events for this task sequentially @@ -209,6 +215,7 @@ impl TaskManager< config: event_processor_config.clone(), nft_metadata_semaphore: nft_metadata_semaphore.clone(), is_at_head: *is_at_head, + nft_metadata_runtime: nft_metadata_runtime.clone(), }; // Record processor timing and success/error metrics diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs index 7f2ceb64..478fb348 100644 --- a/crates/runner/src/lib.rs +++ b/crates/runner/src/lib.rs @@ -634,6 +634,7 @@ impl Runner { "Runtime allocation calculated" ); + let nft_metadata_runtime = Arc::new(Runtime::new()?); let mut engine: Engine>> = Engine::new_with_controllers( storage.clone(), cache.clone(), @@ -686,6 +687,7 @@ impl Runner { }, shutdown_tx.clone(), controllers, + nft_metadata_runtime, ); let shutdown_rx = shutdown_tx.subscribe();