Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ Cargo.lock
/data
/config/environments/*.toml
!/config/environments/*.example.toml

test-data/
test-ledger/
18 changes: 17 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,35 @@ all: build
build:
@$(CARGO) build --workspace

run-node-%: build
run-publisher-%: build
@$(CARGO) run --bin node -- \
--node-type publisher \
--index $* \
--base-port $(BASE_PORT) \
--enable-tip-route

run-relayer-%: build
@$(CARGO) run --bin node -- \
--node-type relayer \
--index $* \
--base-port $$(( $(BASE_PORT) + 100 )) \
--enable-tip-route

run-local-network: build
@for i in $$(seq 0 $(shell echo $$(($(NODES)-1)))); do \
$(CARGO) run --bin node -- \
--node-type publisher \
--index $$i \
--base-port $(BASE_PORT) \
--enable-tip-route & \
done
@for i in $$(seq 0 $(shell echo $$(($(NODES)-1)))); do \
$(CARGO) run --bin node -- \
--node-type relayer \
--index $$i \
--base-port $$(( $(BASE_PORT) + 100 )) \
--enable-tip-route & \
done

clean:
@rm -rf $(DATA_DIR)/node_*
Expand Down
2 changes: 1 addition & 1 deletion crates/windexer-common/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod node;

pub use network::NetworkConfig;
pub use store::StoreConfig;
pub use node::NodeConfig;
pub use node::{NodeType, NodeConfig, PublisherNodeConfig, RelayerNodeConfig};

use serde::{Deserialize, Serialize};
use std::path::PathBuf;
Expand Down
95 changes: 92 additions & 3 deletions crates/windexer-common/src/config/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@ use {
serde::{Deserialize, Serialize},
std::net::SocketAddr,
crate::crypto::SerializableKeypair,
std::any::Any,
};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum NodeType {
Publisher,
Relayer,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeConfig {
pub struct PublisherNodeConfig {
pub node_id: String,
pub node_type: NodeType,
pub listen_addr: SocketAddr,
pub rpc_addr: SocketAddr,
pub bootstrap_peers: Vec<String>,
Expand All @@ -19,15 +27,96 @@ pub struct NodeConfig {
pub metrics_addr: Option<SocketAddr>,
}

impl NodeConfig {
pub fn new_local(
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelayerNodeConfig {
pub node_id: String,
pub node_type: NodeType,
pub listen_addr: SocketAddr,
pub rpc_addr: SocketAddr,
pub bootstrap_peers: Vec<String>,
pub data_dir: String,
pub solana_rpc_url: String,
pub geyser_plugin_config: Option<String>,
pub keypair: SerializableKeypair,
pub metrics_addr: Option<SocketAddr>,
}

pub trait NodeConfig: Any + std::fmt::Debug {
fn get_node_type(&self) -> NodeType;
fn get_config(&self) -> &dyn NodeConfig;
fn get_keypair(&self) -> &SerializableKeypair;
fn get_listen_addr(&self) -> &SocketAddr;
fn get_bootstrap_peers(&self) -> &Vec<String>;
}

impl NodeConfig for PublisherNodeConfig {
fn get_node_type(&self) -> NodeType {
NodeType::Publisher
}
fn get_config(&self) -> &dyn NodeConfig {
self
}
fn get_keypair(&self) -> &SerializableKeypair {
&self.keypair
}
fn get_listen_addr(&self) -> &SocketAddr {
&self.listen_addr
}
fn get_bootstrap_peers(&self) -> &Vec<String> {
&self.bootstrap_peers
}
}

impl NodeConfig for RelayerNodeConfig {
fn get_node_type(&self) -> NodeType {
NodeType::Relayer
}
fn get_config(&self) -> &dyn NodeConfig {
self
}
fn get_keypair(&self) -> &SerializableKeypair {
&self.keypair
}
fn get_listen_addr(&self) -> &SocketAddr {
&self.listen_addr
}
fn get_bootstrap_peers(&self) -> &Vec<String> {
&self.bootstrap_peers
}
}

impl PublisherNodeConfig {
pub fn new_local_publisher(
node_id: impl Into<String> + std::fmt::Display,
port: u16,
rpc_port: u16,
bootstrap_peers: Vec<String>,
) -> Self {
Self {
node_id: node_id.to_string(),
node_type: NodeType::Publisher,
listen_addr: format!("127.0.0.1:{}", port).parse().unwrap(),
rpc_addr: format!("127.0.0.1:{}", rpc_port).parse().unwrap(),
bootstrap_peers,
data_dir: format!("./data/node_{}", node_id),
solana_rpc_url: "http://localhost:8899".to_string(),
geyser_plugin_config: None,
keypair: SerializableKeypair::default(),
metrics_addr: None,
}
}
}

impl RelayerNodeConfig {
pub fn new_local_relayer(
node_id: impl Into<String> + std::fmt::Display,
port: u16,
rpc_port: u16,
bootstrap_peers: Vec<String>,
) -> Self {
Self {
node_id: node_id.to_string(),
node_type: NodeType::Relayer,
listen_addr: format!("127.0.0.1:{}", port).parse().unwrap(),
rpc_addr: format!("127.0.0.1:{}", rpc_port).parse().unwrap(),
bootstrap_peers,
Expand Down
47 changes: 34 additions & 13 deletions crates/windexer-examples/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// examples/node.rs
use {
anyhow::Result,
anyhow::{Result, anyhow},
clap::Parser,
solana_sdk::signer::keypair::Keypair,
std::{
Expand All @@ -12,8 +12,9 @@ use {
tracing::{info, warn},
tracing_subscriber::EnvFilter,
windexer_common::{
config::NodeConfig,
config::{NodeType, PublisherNodeConfig, RelayerNodeConfig, NodeConfig},
crypto::SerializableKeypair,

},
windexer_jito_staking::StakingConfig,
windexer_network::Node,
Expand All @@ -27,6 +28,10 @@ use {
long_about = "Runs a wIndexer node that connects to the Jito network for block data and tip routing"
)]
struct Args {
/// Node type (publisher, relayer)
#[clap(long, default_value = "publisher", value_parser = ["publisher", "relayer"])]
node_type: String,

/// Node index (0, 1, 2)
#[clap(short, long)]
index: u16,
Expand Down Expand Up @@ -78,17 +83,33 @@ async fn main() -> Result<()> {
info!(" RPC: {}", rpc_port);
info!(" Metrics: {}", metrics_port);

let config = NodeConfig {
node_id: format!("node_{}", args.index),
listen_addr: format!("127.0.0.1:{}", port).parse()?,
rpc_addr: format!("127.0.0.1:{}", rpc_port).parse()?,
bootstrap_peers: args.bootstrap_peers,
data_dir: format!("./data/node_{}", args.index),
solana_rpc_url: args.solana_rpc,
keypair: SerializableKeypair::new(&Keypair::new()),
geyser_plugin_config: None,
metrics_addr: Some(format!("127.0.0.1:{}", metrics_port).parse()?),
};
let config: Box<dyn NodeConfig> = match args.node_type.to_lowercase().as_str() {
"publisher" => Box::new(PublisherNodeConfig {
node_id: format!("publisher_{}", args.index),
node_type: NodeType::Publisher,
listen_addr: format!("127.0.0.1:{}", port).parse()?,
rpc_addr: format!("127.0.0.1:{}", rpc_port).parse()?,
bootstrap_peers: args.bootstrap_peers,
data_dir: format!("./data/node_{}", args.index),
solana_rpc_url: args.solana_rpc,
keypair: SerializableKeypair::new(&Keypair::new()),
geyser_plugin_config: None,
metrics_addr: Some(format!("127.0.0.1:{}", metrics_port).parse()?),
}),
"relayer" => Box::new(RelayerNodeConfig {
node_id: format!("relayer_{}", args.index),
node_type: NodeType::Relayer,
listen_addr: format!("127.0.0.1:{}", port).parse()?,
rpc_addr: format!("127.0.0.1:{}", rpc_port).parse()?,
bootstrap_peers: args.bootstrap_peers,
data_dir: format!("./data/node_{}", args.index),
solana_rpc_url: args.solana_rpc,
keypair: SerializableKeypair::new(&Keypair::new()),
geyser_plugin_config: None,
metrics_addr: Some(format!("127.0.0.1:{}", metrics_port).parse()?),
}),
_ => return Err(anyhow!("Invalid node type. Must be 'publisher' or 'relayer'")),
};

let staking_config = StakingConfig {
min_stake: 100_000, // 100k minimum stake
Expand Down
63 changes: 40 additions & 23 deletions crates/windexer-network/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,12 @@ use {
anyhow::{anyhow, Context, Result},
futures::StreamExt,
libp2p::{
core::upgrade,
gossipsub::{
core::upgrade, gossipsub::{
self,
Behaviour as GossipsubBehaviour,
MessageAuthenticity,
ValidationMode,
},
mdns::{self, tokio::Behaviour as MdnsBehaviour},
noise,
swarm::{NetworkBehaviour, SwarmEvent, Swarm, Config as SwarmConfig},
tcp,
yamux,
Multiaddr,
PeerId,
Transport,
identity,
}, identity, mdns::{self, tokio::Behaviour as MdnsBehaviour}, noise, swarm::{Config as SwarmConfig, NetworkBehaviour, Swarm, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport
},
solana_sdk::{
pubkey::Pubkey,
Expand All @@ -35,11 +25,11 @@ use {
time::Duration,
},
tokio::{
sync::{mpsc, RwLock, Mutex},
sync::{mpsc, Mutex, RwLock},
time,
},
tracing::{debug, info, warn},
windexer_common::config::NodeConfig,
windexer_common::config::{NodeConfig, NodeType},
windexer_jito_staking::{JitoStakingService, StakingConfig},
};

Expand Down Expand Up @@ -83,7 +73,7 @@ impl From<mdns::Event> for NodeEvent {

// Add these derives to make Node thread-safe
pub struct Node {
config: NodeConfig,
config: Box<dyn NodeConfig>,
swarm: Arc<Mutex<Swarm<NodeBehaviour>>>,
metrics: Arc<RwLock<Metrics>>,
known_peers: Arc<RwLock<HashSet<PeerId>>>,
Expand All @@ -104,10 +94,10 @@ impl std::fmt::Debug for Node {

impl Node {
pub async fn new(
config: NodeConfig,
config: Box<dyn NodeConfig>,
staking_config: StakingConfig,
) -> Result<(Self, mpsc::Sender<()>)> {
let keypair = config.keypair.to_keypair()
let keypair = config.get_keypair().to_keypair()
.context("Failed to deserialize node keypair")?;
let libp2p_keypair = convert_keypair(&keypair);
let peer_id = PeerId::from(libp2p_keypair.public());
Expand Down Expand Up @@ -148,7 +138,7 @@ impl Node {
};

// Now use the original keypair for SwarmBuilder
let swarm = libp2p::SwarmBuilder::with_existing_identity(libp2p_keypair)
let mut swarm = libp2p::SwarmBuilder::with_existing_identity(libp2p_keypair)
.with_tokio()
.with_tcp(
tcp::Config::default().nodelay(true),
Expand All @@ -162,6 +152,25 @@ impl Node {
.build();

let (shutdown_tx, shutdown_rx) = mpsc::channel(1);

let publisher_topic = gossipsub::IdentTopic::new("publisher");
let relayer_topic = gossipsub::IdentTopic::new("relayer");
let common_topic = gossipsub::IdentTopic::new("common");

match config.get_node_type() {
NodeType::Publisher => {
swarm.behaviour_mut().gossipsub.subscribe(&publisher_topic)
.map_err(|e| anyhow!("Failed to subscribe to publisher topic: {}", e))?;
swarm.behaviour_mut().gossipsub.subscribe(&common_topic)
.map_err(|e| anyhow!("Failed to subscribe to common topic: {}", e))?;
}
NodeType::Relayer => {
swarm.behaviour_mut().gossipsub.subscribe(&relayer_topic)
.map_err(|e| anyhow!("Failed to subscribe to relayer topic: {}", e))?;
swarm.behaviour_mut().gossipsub.subscribe(&common_topic)
.map_err(|e| anyhow!("Failed to subscribe to common topic: {}", e))?;
}
}

Ok((
Self {
Expand All @@ -177,18 +186,26 @@ impl Node {
}

pub async fn start(&mut self) -> Result<()> {
info!("Starting node on {}", self.config.listen_addr);
info!("Starting node on {}", self.config.get_listen_addr());
match self.config.get_node_type() {
NodeType::Publisher => {
info!("Starting publisher node");
}
NodeType::Relayer => {
info!("Starting relayer node");
}
}

let addr = format!("/ip4/{}/tcp/{}",
self.config.listen_addr.ip(),
self.config.listen_addr.port()
self.config.get_listen_addr().ip(),
self.config.get_listen_addr().port()
).parse::<Multiaddr>()?;

{
let mut swarm = self.swarm.lock().await;
swarm.listen_on(addr)?;

for addr in &self.config.bootstrap_peers {
for addr in self.config.get_bootstrap_peers() {
let remote: Multiaddr = addr.parse()?;
match swarm.dial(remote.clone()) {
Ok(_) => info!("Dialing bootstrap peer {}", remote),
Expand Down Expand Up @@ -224,7 +241,7 @@ impl Node {
if let Some(event) = event {
self.handle_swarm_event(event).await?;
}
}
}
}
}

Expand Down
1 change: 0 additions & 1 deletion test-ledger/faucet-keypair.json

This file was deleted.

Binary file removed test-ledger/genesis.bin
Binary file not shown.
Binary file removed test-ledger/genesis.tar.bz2
Binary file not shown.
Binary file removed test-ledger/rocksdb/000014.sst
Binary file not shown.
Binary file removed test-ledger/rocksdb/000015.sst
Binary file not shown.
Binary file removed test-ledger/rocksdb/000016.sst
Binary file not shown.
Binary file removed test-ledger/rocksdb/000017.sst
Binary file not shown.
Binary file removed test-ledger/rocksdb/000018.sst
Binary file not shown.
Binary file removed test-ledger/rocksdb/000019.sst
Binary file not shown.
1 change: 0 additions & 1 deletion test-ledger/rocksdb/CURRENT

This file was deleted.

1 change: 0 additions & 1 deletion test-ledger/rocksdb/IDENTITY

This file was deleted.

Empty file removed test-ledger/rocksdb/LOCK
Empty file.
Loading