Skip to content

Commit 5af288e

Browse files
loocaproshekhirin
andauthored
added a basic test to run the oracle (#26)
* added a basic test to run the oracle * moving on * remote reth * oracle can peer * mock data feed * feat: e2e tests * Update Cargo.toml Co-authored-by: Alexey Shekhirin <[email protected]> * Update Cargo.toml Co-authored-by: Alexey Shekhirin <[email protected]> * chore: tokio tungstenite as workspace dep and 2 crates under misc crates --------- Co-authored-by: Alexey Shekhirin <[email protected]>
1 parent 4b6fb9b commit 5af288e

File tree

8 files changed

+395
-161
lines changed

8 files changed

+395
-161
lines changed

Cargo.lock

Lines changed: 209 additions & 123 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ futures = "0.3"
5858
futures-util = "0.3"
5959
tokio = { version = "1.0", features = ["full"] }
6060
tokio-stream = "0.1"
61+
tokio-tungstenite = { version = "0.23", features = ["native-tls"] }
6162

6263
# serde
6364
serde = "1"

oracle/Cargo.toml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ reth-node-ethereum.workspace = true
1919
reth-primitives.workspace = true
2020
reth-tracing.workspace = true
2121
reth.workspace = true
22+
reth-tokio-util = { git = "https://github.com/paradigmxyz/reth" }
2223

2324
# alloy
2425
alloy-primitives.workspace = true
@@ -34,17 +35,19 @@ enr = "0.12"
3435
# async
3536
futures.workspace = true
3637
tokio-stream.workspace = true
37-
tokio-tungstenite = "0.23"
38+
tokio-tungstenite.workspace = true
3839
tokio.workspace = true
3940

41+
4042
# misc
4143
clap = "4"
4244
eyre.workspace = true
4345
serde.workspace = true
4446
serde_json.workspace = true
45-
thiserror = "1"
4647
uuid = "1.10.0"
48+
rand = "0.8.5"
49+
thiserror = "1"
4750

4851
[dev-dependencies]
49-
reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" }
50-
reth-testing-utils = { git = "https://github.com/paradigmxyz/reth" }
52+
reth-exex-test-utils.workspace = true
53+
reth-testing-utils.workspace = true

oracle/src/main.rs

Lines changed: 139 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use clap::Parser;
22
use cli_ext::OracleExt;
33
use exex::ExEx;
4-
use futures::FutureExt;
4+
use futures::{FutureExt, Stream};
55
use network::{proto::OracleProtoHandler, OracleNetwork};
6-
use offchain_data::DataFeederStream;
6+
use offchain_data::{DataFeederError, DataFeederStream, DataFeeds};
77
use oracle::Oracle;
88
use reth::chainspec::EthereumChainSpecParser;
9+
use reth_exex::ExExContext;
910
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
11+
use reth_node_api::FullNodeComponents;
1012
use reth_node_ethereum::EthereumNode;
1113

1214
mod cli_ext;
@@ -17,6 +19,35 @@ mod oracle;
1719

1820
const ORACLE_EXEX_ID: &str = "exex-oracle";
1921

22+
/// Helper function to start the oracle stack.
23+
async fn start<Node: FullNodeComponents, D>(
24+
ctx: ExExContext<Node>,
25+
tcp_port: u16,
26+
udp_port: u16,
27+
data_feeder: D,
28+
) -> eyre::Result<(Oracle<Node, D>, Node::Network)>
29+
where
30+
Node::Network: NetworkProtocols,
31+
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
32+
{
33+
// Define the oracle subprotocol
34+
let (subproto, proto_events, to_peers) = OracleProtoHandler::new();
35+
// Add it to the network as a subprotocol
36+
let net = ctx.network().clone();
37+
net.add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol());
38+
39+
// The instance of the execution extension that will handle chain events
40+
let exex = ExEx::new(ctx);
41+
42+
// The instance of the oracle network that will handle discovery and gossiping of data
43+
let network = OracleNetwork::new(proto_events, tcp_port, udp_port).await?;
44+
45+
// The oracle instance that will orchestrate the network, the execution extensions,
46+
// the off-chain data stream, and the gossiping
47+
let oracle = Oracle::new(exex, network, data_feeder, to_peers);
48+
Ok((oracle, net.clone()))
49+
}
50+
2051
fn main() -> eyre::Result<()> {
2152
reth::cli::Cli::<EthereumChainSpecParser, OracleExt>::parse().run(|builder, args| async move {
2253
let tcp_port = args.tcp_port;
@@ -36,26 +67,8 @@ fn main() -> eyre::Result<()> {
3667
// Source: https://github.com/vados-cosmonic/wasmCloud/commit/440e8c377f6b02f45eacb02692e4d2fabd53a0ec
3768
tokio::task::spawn_blocking(move || {
3869
tokio::runtime::Handle::current().block_on(async move {
39-
// define the oracle subprotocol
40-
let (subproto, proto_events, to_peers) = OracleProtoHandler::new();
41-
// add it to the network as a subprotocol
42-
ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol());
43-
44-
// the instance of the execution extension that will handle chain events
45-
let exex = ExEx::new(ctx);
46-
47-
// the instance of the oracle network that will handle discovery and
48-
// gossiping of data
49-
let network = OracleNetwork::new(proto_events, tcp_port, udp_port).await?;
50-
// the off-chain data feed stream
5170
let data_feed = DataFeederStream::new(args.binance_symbols).await?;
52-
53-
// the oracle instance that will orchestrate the network, the execution
54-
// extensions, the offchain data stream and the
55-
// gossiping the oracle will always sign and
56-
// broadcast data via the channel until a peer is
57-
// subcribed to it
58-
let oracle = Oracle::new(exex, network, data_feed, to_peers);
71+
let (oracle, _net) = start(ctx, tcp_port, udp_port, data_feed).await?;
5972
Ok(oracle)
6073
})
6174
})
@@ -67,3 +80,108 @@ fn main() -> eyre::Result<()> {
6780
handle.wait_for_node_exit().await
6881
})
6982
}
83+
84+
#[cfg(test)]
85+
mod tests {
86+
use crate::{offchain_data::binance::ticker::Ticker, start};
87+
use futures::{Stream, StreamExt};
88+
use reth_exex_test_utils::test_exex_context;
89+
use reth_network::{NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers};
90+
use reth_network_api::PeerId;
91+
use reth_tokio_util::EventStream;
92+
use reth_tracing::tracing::info;
93+
use tokio_stream::wrappers::BroadcastStream;
94+
95+
async fn wait_for_session(mut events: EventStream<NetworkEvent>) -> PeerId {
96+
while let Some(event) = events.next().await {
97+
if let NetworkEvent::SessionEstablished { peer_id, .. } = event {
98+
info!("Session established with {}", peer_id);
99+
return peer_id;
100+
}
101+
info!("Unexpected event: {:?}", event);
102+
}
103+
104+
unreachable!()
105+
}
106+
107+
use crate::offchain_data::{DataFeederError, DataFeeds};
108+
use futures::stream::{self};
109+
use std::pin::Pin;
110+
111+
fn mock_stream() -> Pin<Box<dyn Stream<Item = Result<DataFeeds, DataFeederError>> + Send>> {
112+
let ticker = Ticker {
113+
event_type: "24hrTicker".to_string(),
114+
event_time: 1698323450000,
115+
symbol: "BTCUSDT".to_string(),
116+
price_change: "100.00".to_string(),
117+
price_change_percent: "2.5".to_string(),
118+
weighted_avg_price: "40200.00".to_string(),
119+
prev_close_price: "40000.00".to_string(),
120+
last_price: "40100.00".to_string(),
121+
last_quantity: "0.5".to_string(),
122+
best_bid_price: "40095.00".to_string(),
123+
best_bid_quantity: "1.0".to_string(),
124+
best_ask_price: "40105.00".to_string(),
125+
best_ask_quantity: "1.0".to_string(),
126+
open_price: "39900.00".to_string(),
127+
high_price: "40500.00".to_string(),
128+
low_price: "39800.00".to_string(),
129+
volume: "1500".to_string(),
130+
quote_volume: "60000000".to_string(),
131+
open_time: 1698237050000,
132+
close_time: 1698323450000,
133+
first_trade_id: 1,
134+
last_trade_id: 2000,
135+
num_trades: 2000,
136+
};
137+
138+
// Wrap the Ticker in DataFeeds::Binance
139+
let data_feed = DataFeeds::Binance(ticker);
140+
141+
// Create a stream that sends a single item and then ends, boxed and pinned
142+
Box::pin(stream::once(async { Ok(data_feed) }))
143+
}
144+
145+
#[tokio::test]
146+
async fn e2e_oracles() {
147+
reth_tracing::init_test_tracing();
148+
149+
// spawn first instance
150+
let (ctx_1, _handle) = test_exex_context().await.unwrap();
151+
let data_feed1 = mock_stream();
152+
let (oracle_1, network_1) = start(ctx_1, 30303, 30304, data_feed1).await.unwrap();
153+
let mut broadcast_stream_1 = BroadcastStream::new(oracle_1.signed_ticks().subscribe());
154+
let signer_1 = oracle_1.signer().address();
155+
tokio::spawn(oracle_1);
156+
let net_1_events = network_1.event_listener();
157+
158+
// spawn second instance
159+
let (ctx_2, _handle) = test_exex_context().await.unwrap();
160+
let data_feed2 = mock_stream();
161+
let (oracle_2, network_2) = start(ctx_2, 30305, 30306, data_feed2).await.unwrap();
162+
let mut broadcast_stream_2 = BroadcastStream::new(oracle_2.signed_ticks().subscribe());
163+
let signer_2 = oracle_2.signer().address();
164+
tokio::spawn(oracle_2);
165+
let net_2_events = network_2.event_listener();
166+
167+
// expect peers connected
168+
let (peer_2, addr_2) = (network_2.peer_id(), network_2.local_addr());
169+
network_1.add_peer(*peer_2, addr_2);
170+
let expected_peer_2 = wait_for_session(net_1_events).await;
171+
assert_eq!(expected_peer_2, *peer_2);
172+
173+
let (peer_1, addr_1) = (network_1.peer_id(), network_1.local_addr());
174+
network_2.add_peer(*peer_1, addr_1);
175+
let expected_peer_1 = wait_for_session(net_2_events).await;
176+
assert_eq!(expected_peer_1, *peer_1);
177+
178+
// expect signed data
179+
let signed_ticker_1 = broadcast_stream_1.next().await.unwrap().unwrap();
180+
assert_eq!(signed_ticker_1.ticker.symbol, "BTCUSDT");
181+
assert_eq!(signed_ticker_1.signer, signer_1);
182+
183+
let signed_ticker_2 = broadcast_stream_2.next().await.unwrap().unwrap();
184+
assert_eq!(signed_ticker_2.ticker.symbol, "BTCUSDT");
185+
assert_eq!(signed_ticker_2.signer, signer_2);
186+
}
187+
}

oracle/src/network/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl Future for OracleNetwork {
6262
"Established connection, will start gossiping"
6363
);
6464
}
65-
None => return Poll::Ready(Ok(())),
65+
None => return Poll::Pending,
6666
}
6767
}
6868
}

oracle/src/network/proto/connection.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use reth_eth_wire::{
1010
use reth_network::protocol::{ConnectionHandler, OnNotSupported};
1111
use reth_network_api::Direction;
1212
use reth_network_peers::PeerId;
13+
use reth_tracing::tracing::trace;
1314
use std::{
1415
collections::HashMap,
1516
pin::Pin,
@@ -59,6 +60,8 @@ impl Stream for OracleConnection {
5960
}
6061

6162
if let Poll::Ready(Some(Ok(tick))) = this.signed_ticks.poll_next_unpin(cx) {
63+
let signer = tick.signer;
64+
trace!(target: "oracle::conn", ?signer, "Received signed tick data.");
6265
return Poll::Ready(Some(
6366
OracleProtoMessage::signed_ticker(Box::new(tick)).encoded(),
6467
));
@@ -142,6 +145,7 @@ impl ConnectionHandler for OracleConnHandler {
142145
.events
143146
.send(ProtocolEvent::Established { direction, peer_id, to_connection: tx })
144147
.ok();
148+
trace!(target: "oracle::conn", "Connection established.");
145149
OracleConnection {
146150
conn,
147151
initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping),

oracle/src/offchain_data/binance/feeder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use futures::{ready, Stream, StreamExt};
2-
use reth_tracing::tracing::error;
2+
use reth_tracing::tracing::{error, trace};
33
use std::{
44
pin::Pin,
55
task::{Context, Poll},
@@ -96,7 +96,7 @@ impl Stream for BinanceDataFeeder {
9696
return Poll::Pending;
9797
}
9898
};
99-
99+
trace!(target: "oracle::binance", ?msg, "Received message");
100100
Poll::Ready(Some(Ok(msg.data)))
101101
}
102102
Some(Err(e)) => {

oracle/src/oracle.rs

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use crate::{
22
exex::ExEx,
33
network::{proto::data::SignedTicker, OracleNetwork},
4-
offchain_data::{DataFeederStream, DataFeeds},
4+
offchain_data::{DataFeederError, DataFeeds},
55
};
66
use alloy_rlp::{BytesMut, Encodable};
77
use alloy_signer::SignerSync;
88
use alloy_signer_local::PrivateKeySigner;
9-
use futures::{FutureExt, StreamExt};
9+
use futures::{FutureExt, Stream, StreamExt};
1010
use reth_node_api::FullNodeComponents;
11-
use reth_tracing::tracing::{error, info};
11+
use reth_tracing::tracing::{error, info, trace};
1212
use std::{
1313
future::Future,
1414
pin::Pin,
@@ -17,32 +17,53 @@ use std::{
1717

1818
/// The Oracle struct is a long running task that orchestrates discovery of new peers,
1919
/// decoding data from chain events (ExEx) and gossiping it to peers.
20-
pub(crate) struct Oracle<Node: FullNodeComponents> {
20+
pub(crate) struct Oracle<Node: FullNodeComponents, D>
21+
where
22+
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
23+
{
2124
/// The network task for this node.
2225
/// It is composed by a discovery task and a sub protocol RLPx task.
2326
network: OracleNetwork,
2427
/// The execution extension task for this node.
2528
exex: ExEx<Node>,
2629
/// The offchain data feed stream.
27-
data_feed: DataFeederStream,
30+
data_feed: D,
2831
/// The signer to sign the data feed.
2932
signer: PrivateKeySigner,
3033
/// Half of the broadcast channel to send data to connected peers.
3134
to_peers: tokio::sync::broadcast::Sender<SignedTicker>,
3235
}
3336

34-
impl<Node: FullNodeComponents> Oracle<Node> {
37+
impl<Node: FullNodeComponents, D> Oracle<Node, D>
38+
where
39+
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
40+
{
3541
pub(crate) fn new(
3642
exex: ExEx<Node>,
3743
network: OracleNetwork,
38-
data_feed: DataFeederStream,
44+
data_feed: D,
3945
to_peers: tokio::sync::broadcast::Sender<SignedTicker>,
4046
) -> Self {
4147
Self { exex, network, data_feed, signer: PrivateKeySigner::random(), to_peers }
4248
}
49+
50+
/// Returns the signer used by the oracle.
51+
#[allow(dead_code)]
52+
pub(crate) fn signer(&self) -> &PrivateKeySigner {
53+
&self.signer
54+
}
55+
56+
/// Returns the signed ticker broadcast channel.
57+
#[allow(dead_code)]
58+
pub(crate) fn signed_ticks(&self) -> &tokio::sync::broadcast::Sender<SignedTicker> {
59+
&self.to_peers
60+
}
4361
}
4462

45-
impl<Node: FullNodeComponents> Future for Oracle<Node> {
63+
impl<Node: FullNodeComponents, D> Future for Oracle<Node, D>
64+
where
65+
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static + std::marker::Unpin,
66+
{
4667
type Output = eyre::Result<()>;
4768

4869
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -74,8 +95,9 @@ impl<Node: FullNodeComponents> Future for Oracle<Node> {
7495
let signature = this.signer.sign_message_sync(&buffer)?;
7596
let signed_ticker = SignedTicker::new(ticker, signature, this.signer.address());
7697

77-
if let Err(err) = this.to_peers.send(signed_ticker.clone()) {
78-
error!(?err, "Failed to send ticker to gossip, no peers connected");
98+
if this.to_peers.send(signed_ticker.clone()).is_ok() {
99+
let signer = signed_ticker.signer;
100+
trace!(target: "oracle", ?signer, "Sent signed ticker");
79101
}
80102
}
81103
Some(Err(e)) => {

0 commit comments

Comments
 (0)