Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions crates/sqlite/sqlite/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use starknet::providers::Provider;
use starknet_crypto::Felt;
use tracing::{debug, warn};

use super::{ApplyBalanceDiffQuery, BrokerMessage, Executor};
use super::{ApplyBalanceDiffQuery, Executor};
use crate::constants::{SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE};
use crate::executor::LOG_TARGET;
use crate::simple_broker::SimpleBroker;
use crate::types::{OptimisticTokenBalance, TokenBalance};
use crate::types::TokenBalance;
use crate::utils::{sql_string_to_u256, u256_to_sql_string, I256};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -164,11 +164,7 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {
.await?;

debug!(target: LOG_TARGET, token_balance = ?token_balance, "Applied balance diff");
SimpleBroker::publish(unsafe {
std::mem::transmute::<TokenBalance, OptimisticTokenBalance>(token_balance.clone())
});
self.publish_queue
.push(BrokerMessage::TokenBalanceUpdated(token_balance));
SimpleBroker::publish(token_balance);

Ok(())
}
Expand Down
88 changes: 12 additions & 76 deletions crates/sqlite/sqlite/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::time::Instant;
use torii_sqlite_types::OptimisticToken;
use tracing::{debug, error, info, warn};

use crate::constants::TOKENS_TABLE;
use crate::simple_broker::SimpleBroker;
use crate::types::{
ContractCursor, Entity as EntityUpdated, Event as EventEmitted,
EventMessage as EventMessageUpdated, Model as ModelRegistered, OptimisticEntity,
OptimisticEventMessage, ParsedCall, Token, TokenBalance, Transaction,
EventMessage as EventMessageUpdated, Model as ModelRegistered,
ParsedCall, Token, Transaction,
};
use crate::utils::{felt_to_sql_string, felts_to_sql_string, u256_to_sql_string, I256};

Expand All @@ -43,18 +42,6 @@ pub enum Argument {
FieldElement(Felt),
}

#[derive(Debug, Clone)]
pub enum BrokerMessage {
SetHead(ContractCursor),
ModelRegistered(ModelRegistered),
EntityUpdated(EntityUpdated),
EventMessageUpdated(EventMessageUpdated),
EventEmitted(EventEmitted),
TokenRegistered(Token),
TokenBalanceUpdated(TokenBalance),
Transaction(Transaction),
}

#[derive(Debug, Clone)]
pub struct DeleteEntityQuery {
pub entity_id: String,
Expand Down Expand Up @@ -131,7 +118,6 @@ pub struct Executor<'c, P: Provider + Sync + Send + 'static> {
// This `pool` is only used to create a new `transaction`
pool: Pool<Sqlite>,
transaction: SqlxTransaction<'c, Sqlite>,
publish_queue: Vec<BrokerMessage>,
rx: UnboundedReceiver<QueryMessage>,
shutdown_rx: Receiver<()>,
// It is used to make RPC calls to fetch erc contracts
Expand Down Expand Up @@ -242,14 +228,12 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {
) -> Result<(Self, UnboundedSender<QueryMessage>)> {
let (tx, rx) = unbounded_channel();
let transaction = pool.begin().await?;
let publish_queue = Vec::new();
let shutdown_rx = shutdown_tx.subscribe();

Ok((
Executor {
pool,
transaction,
publish_queue,
rx,
shutdown_rx,
provider,
Expand Down Expand Up @@ -370,8 +354,7 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {
.await?;

// Send appropriate ContractUpdated publish message
self.publish_queue
.push(BrokerMessage::SetHead(cursor.clone()));
SimpleBroker::publish(cursor.clone());
}
}
QueryType::StoreTransaction(store_transaction) => {
Expand Down Expand Up @@ -420,8 +403,7 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {
transaction.contract_addresses = store_transaction.contract_addresses;
transaction.calls = store_transaction.calls;

self.publish_queue
.push(BrokerMessage::Transaction(transaction));
SimpleBroker::publish(transaction);
}
QueryType::SetEntity(entity) => {
let row = query.fetch_one(&mut **tx).await?;
Expand Down Expand Up @@ -487,13 +469,7 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {
.execute(&mut **tx)
.await?;

let optimistic_entity = unsafe {
std::mem::transmute::<EntityUpdated, OptimisticEntity>(entity_updated.clone())
};
SimpleBroker::publish(optimistic_entity);

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.publish_queue.push(broker_message);
SimpleBroker::publish(entity_updated);
}
QueryType::DeleteEntity(entity) => {
let delete_model = query.execute(&mut **tx).await?;
Expand Down Expand Up @@ -538,17 +514,12 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {
entity_updated.deleted = true;
}

SimpleBroker::publish(unsafe {
std::mem::transmute::<EntityUpdated, OptimisticEntity>(entity_updated.clone())
});
self.publish_queue
.push(BrokerMessage::EntityUpdated(entity_updated));
SimpleBroker::publish(entity_updated);
}
QueryType::RegisterModel => {
let row = query.fetch_one(&mut **tx).await?;
let model_registered = ModelRegistered::from_row(&row)?;
self.publish_queue
.push(BrokerMessage::ModelRegistered(model_registered));
SimpleBroker::publish(model_registered);
}
QueryType::EventMessage(em_query) => {
// Must be executed first since other tables have foreign keys on event_messages.id.
Expand Down Expand Up @@ -595,18 +566,12 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {
let mut event_message = EventMessageUpdated::from_row(&event_messages_row)?;
event_message.updated_model = Some(em_query.ty);

SimpleBroker::publish(unsafe {
std::mem::transmute::<EventMessageUpdated, OptimisticEventMessage>(
event_message.clone(),
)
});
self.publish_queue
.push(BrokerMessage::EventMessageUpdated(event_message));
SimpleBroker::publish(event_message);
}
QueryType::StoreEvent => {
let row = query.fetch_one(&mut **tx).await?;
let event = EventEmitted::from_row(&row)?;
self.publish_queue.push(BrokerMessage::EventEmitted(event));
SimpleBroker::publish(event);
}
QueryType::ApplyBalanceDiff(apply_balance_diff) => {
debug!(target: LOG_TARGET, "Applying balance diff.");
Expand Down Expand Up @@ -706,11 +671,7 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {
})?;

info!(target: LOG_TARGET, name = %name, symbol = %symbol, contract_address = %token.contract_address, token_id = %register_nft_token.token_id, "NFT token registered.");
SimpleBroker::publish(unsafe {
std::mem::transmute::<Token, OptimisticToken>(token.clone())
});
self.publish_queue
.push(BrokerMessage::TokenRegistered(token));
SimpleBroker::publish(token);
}
QueryType::RegisterErc20Token(register_erc20_token) => {
let query = sqlx::query_as::<_, Token>(
Expand All @@ -725,9 +686,7 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {

let token = query.fetch_one(&mut **tx).await?;
info!(target: LOG_TARGET, name = %register_erc20_token.name, symbol = %register_erc20_token.symbol, contract_address = %token.contract_address, "Registered ERC20 token.");

self.publish_queue
.push(BrokerMessage::TokenRegistered(token));
SimpleBroker::publish(token);
}
QueryType::Execute => {
debug!(target: LOG_TARGET, "Executing query.");
Expand Down Expand Up @@ -768,11 +727,7 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {
.await?;

info!(target: LOG_TARGET, name = %token.name, symbol = %token.symbol, contract_address = %token.contract_address, token_id = %update_metadata.token_id, "NFT token metadata updated.");
SimpleBroker::publish(unsafe {
std::mem::transmute::<Token, OptimisticToken>(token.clone())
});
self.publish_queue
.push(BrokerMessage::TokenRegistered(token));
SimpleBroker::publish(token);
}
QueryType::Other => {
query.execute(&mut **tx).await?;
Expand All @@ -786,32 +741,13 @@ impl<P: Provider + Sync + Send + 'static> Executor<'_, P> {
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
transaction.commit().await?;

for message in self.publish_queue.drain(..) {
send_broker_message(message);
}

Ok(())
}

async fn rollback(&mut self) -> Result<()> {
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
transaction.rollback().await?;

// NOTE: clear doesn't reset the capacity
self.publish_queue.clear();
Ok(())
}
}

fn send_broker_message(message: BrokerMessage) {
match message {
BrokerMessage::SetHead(update) => SimpleBroker::publish(update),
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event),
BrokerMessage::TokenRegistered(token) => SimpleBroker::publish(token),
BrokerMessage::TokenBalanceUpdated(token_balance) => SimpleBroker::publish(token_balance),
BrokerMessage::Transaction(transaction) => SimpleBroker::publish(transaction),
}
}
54 changes: 0 additions & 54 deletions crates/sqlite/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,6 @@ pub struct Entity {
pub deleted: bool,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OptimisticEntity {
pub id: String,
pub keys: String,
pub event_id: String,
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None
#[sqlx(skip)]
pub updated_model: Option<Ty>,
#[sqlx(skip)]
pub deleted: bool,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct EventMessage {
Expand All @@ -80,21 +63,6 @@ pub struct EventMessage {
pub updated_model: Option<Ty>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OptimisticEventMessage {
pub id: String,
pub keys: String,
pub event_id: String,
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None
#[sqlx(skip)]
pub updated_model: Option<Ty>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Model {
Expand Down Expand Up @@ -122,18 +90,6 @@ pub struct Event {
pub created_at: DateTime<Utc>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OptimisticToken {
pub id: String,
pub contract_address: String,
pub token_id: String,
pub name: String,
pub symbol: String,
pub decimals: u8,
pub metadata: String,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Token {
Expand All @@ -146,16 +102,6 @@ pub struct Token {
pub metadata: String,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OptimisticTokenBalance {
pub id: String,
pub balance: String,
pub account_address: String,
pub contract_address: String,
pub token_id: String,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct TokenBalance {
Expand Down
Loading