@@ -6,7 +6,9 @@ use network::{proto::OracleProtoHandler, OracleNetwork};
66use offchain_data:: DataFeederStream ;
77use oracle:: Oracle ;
88use reth:: chainspec:: EthereumChainSpecParser ;
9+ use reth_exex:: ExExContext ;
910use reth_network:: { protocol:: IntoRlpxSubProtocol , NetworkProtocols } ;
11+ use reth_node_api:: FullNodeComponents ;
1012use reth_node_ethereum:: EthereumNode ;
1113
1214mod cli_ext;
@@ -17,6 +19,35 @@ mod oracle;
1719
1820const ORACLE_EXEX_ID : & str = "exex-oracle" ;
1921
22+ async fn start < Node : FullNodeComponents > (
23+ ctx : ExExContext < Node > ,
24+ tcp_port : u16 ,
25+ udp_port : u16 ,
26+ binance_symbols : Vec < String > ,
27+ ) -> eyre:: Result < ( Oracle < Node > , Node :: Network ) >
28+ where
29+ Node :: Network : NetworkProtocols ,
30+ {
31+ // Define the oracle subprotocol
32+ let ( subproto, proto_events, to_peers) = OracleProtoHandler :: new ( ) ;
33+ // Add it to the network as a subprotocol
34+ let net = ctx. network ( ) . clone ( ) ;
35+ net. add_rlpx_sub_protocol ( subproto. into_rlpx_sub_protocol ( ) ) ;
36+
37+ // The instance of the execution extension that will handle chain events
38+ let exex = ExEx :: new ( ctx) ;
39+
40+ // The instance of the oracle network that will handle discovery and gossiping of data
41+ let network = OracleNetwork :: new ( proto_events, tcp_port, udp_port) . await ?;
42+ // The off-chain data feed stream
43+ let data_feed = DataFeederStream :: new ( binance_symbols) . await ?;
44+
45+ // The oracle instance that will orchestrate the network, the execution extensions,
46+ // the off-chain data stream, and the gossiping
47+ let oracle = Oracle :: new ( exex, network, data_feed, to_peers) ;
48+ Ok ( ( oracle, net. clone ( ) ) )
49+ }
50+
2051fn main ( ) -> eyre:: Result < ( ) > {
2152 reth:: cli:: Cli :: < EthereumChainSpecParser , OracleExt > :: parse ( ) . run ( |builder, args| async move {
2253 let tcp_port = args. tcp_port ;
@@ -67,3 +98,51 @@ fn main() -> eyre::Result<()> {
6798 handle. wait_for_node_exit ( ) . await
6899 } )
69100}
101+
102+ #[ cfg( test) ]
103+ mod tests {
104+ use crate :: start;
105+ use reth_exex_test_utils:: test_exex_context;
106+
107+ #[ tokio:: test]
108+ async fn test_oracle ( ) {
109+ reth_tracing:: init_test_tracing ( ) ;
110+ let ( ctx, _handle) = test_exex_context ( ) . await . unwrap ( ) ;
111+ let ( oracle, network_1) =
112+ start ( ctx, 30306 , 30307 , vec ! [ "btcusdc" . to_string( ) , "ethusdc" . to_string( ) ] )
113+ . await
114+ . unwrap ( ) ;
115+ tokio:: spawn ( oracle) ;
116+
117+ // // spawn second
118+ // let (ctx, _handle) = test_exex_context().await.unwrap();
119+ // let (oracle, network_2) =
120+ // start(ctx, 30308, 30309, vec!["btcusdc".to_string(), "ethusdc".to_string()])
121+ // .await
122+ // .unwrap();
123+ // tokio::spawn(oracle);
124+
125+ // // make them connect
126+ // let (peer_1, addr_1) = (network_1.peer_id(), network_1.local_addr());
127+ // let (peer_2, addr_2) = (network_2.peer_id(), network_2.local_addr());
128+ // network_1.add_peer(*peer_1, addr_1);
129+ // network_2.add_peer(*peer_2, addr_2);
130+
131+ // let mut events_1 = network_1.event_listener();
132+
133+ // let expected_peer_2 = loop {
134+ // if let Some(ev) = events_1.next().await {
135+ // match ev {
136+ // NetworkEvent::SessionEstablished { peer_id, .. } => {
137+ // info!("Session established with peer: {:?}", peer_id);
138+ // break peer_id;
139+ // }
140+ // _ => continue,
141+ // }
142+ // } else {
143+ // unreachable!()
144+ // }
145+ // };
146+ // assert_eq!(expected_peer_2, *peer_2);
147+ }
148+ }
0 commit comments