2626import org .hyperledger .besu .ethereum .eth .manager .EthProtocolManager ;
2727import org .hyperledger .besu .ethereum .eth .manager .EthProtocolManagerTestBuilder ;
2828import org .hyperledger .besu .ethereum .eth .manager .EthProtocolManagerTestUtil ;
29+ import org .hyperledger .besu .ethereum .eth .manager .EthScheduler ;
2930import org .hyperledger .besu .ethereum .eth .manager .peertask .PeerTaskExecutor ;
3031import org .hyperledger .besu .ethereum .eth .manager .peertask .PeerTaskExecutorResponseCode ;
3132import org .hyperledger .besu .ethereum .eth .manager .peertask .PeerTaskExecutorResult ;
3233import org .hyperledger .besu .ethereum .eth .manager .peertask .task .GetHeadersFromPeerTask ;
3334import org .hyperledger .besu .ethereum .eth .transactions .TransactionPool ;
3435import org .hyperledger .besu .ethereum .mainnet .ProtocolSchedule ;
36+ import org .hyperledger .besu .metrics .noop .NoOpMetricsSystem ;
3537import org .hyperledger .besu .plugin .services .storage .DataStorageFormat ;
36- import org .hyperledger .besu .testutil .DeterministicEthScheduler ;
3738import org .hyperledger .besu .util .ExceptionUtils ;
3839
3940import java .util .List ;
4041import java .util .Optional ;
4142import java .util .concurrent .CompletableFuture ;
42- import java .util .concurrent .atomic .AtomicBoolean ;
4343import java .util .stream .Stream ;
4444
4545import org .apache .tuweni .bytes .Bytes ;
@@ -55,7 +55,6 @@ public class PivotBlockRetrieverTest {
5555
5656 private static final long PIVOT_BLOCK_NUMBER = 10 ;
5757
58- private final AtomicBoolean timeout = new AtomicBoolean (false );
5958 private EthProtocolManager ethProtocolManager ;
6059 private MutableBlockchain blockchain ;
6160 private TransactionPool transactionPool ;
@@ -82,7 +81,7 @@ public void setUp(final DataStorageFormat storageFormat) {
8281 EthProtocolManagerTestBuilder .builder ()
8382 .setProtocolSchedule (protocolSchedule )
8483 .setBlockchain (blockchain )
85- .setEthScheduler (new DeterministicEthScheduler ( timeout :: get ))
84+ .setEthScheduler (new EthScheduler ( 1 , 1 , 1 , new NoOpMetricsSystem () ))
8685 .setWorldStateArchive (blockchainSetupUtil .getWorldArchive ())
8786 .setTransactionPool (transactionPool )
8887 .setEthereumWireProtocolConfiguration (EthProtocolConfiguration .defaultConfig ())
@@ -143,6 +142,8 @@ public void shouldSucceedWhenAllPeersAgree(final DataStorageFormat storageFormat
143142
144143 final CompletableFuture <FastSyncState > future = pivotBlockRetriever .downloadPivotBlockHeader ();
145144
145+ waitUntilComplete (future );
146+
146147 Mockito .verify (peerTaskExecutor )
147148 .executeAgainstPeer (Mockito .any (GetHeadersFromPeerTask .class ), Mockito .eq (peerA ));
148149 Mockito .verify (peerTaskExecutor )
@@ -190,6 +191,8 @@ public void shouldQueryBestPeersFirst(final DataStorageFormat storageFormat) {
190191
191192 final CompletableFuture <FastSyncState > future = pivotBlockRetriever .downloadPivotBlockHeader ();
192193
194+ waitUntilComplete (future );
195+
193196 Mockito .verify (peerTaskExecutor )
194197 .executeAgainstPeer (Mockito .any (GetHeadersFromPeerTask .class ), Mockito .eq (peerA ));
195198 Mockito .verify (peerTaskExecutor , Mockito .never ())
@@ -218,22 +221,24 @@ public void shouldRecoverFromUnresponsivePeer(final DataStorageFormat storageFor
218221 Mockito .when (
219222 peerTaskExecutor .executeAgainstPeer (
220223 Mockito .any (GetHeadersFromPeerTask .class ), Mockito .eq (peerA )))
221- .thenReturn (
222- new PeerTaskExecutorResult <>(
223- Optional .of (List .of (blockchain .getBlockHeader (PIVOT_BLOCK_NUMBER ).get ())),
224- PeerTaskExecutorResponseCode .SUCCESS ,
225- List .of (peerA )))
226- .thenReturn (
227- new PeerTaskExecutorResult <>(
228- Optional .of (List .of (blockchain .getBlockHeader (PIVOT_BLOCK_NUMBER ).get ())),
229- PeerTaskExecutorResponseCode .SUCCESS ,
230- List .of (peerA )));
224+ .thenAnswer (
225+ (invocationOnMock ) -> {
226+ Thread .sleep (200 );
227+ return new PeerTaskExecutorResult <>(
228+ Optional .of (List .of (blockchain .getBlockHeader (PIVOT_BLOCK_NUMBER ).get ())),
229+ PeerTaskExecutorResponseCode .SUCCESS ,
230+ List .of (peerA ));
231+ });
231232 Mockito .when (
232233 peerTaskExecutor .executeAgainstPeer (
233234 Mockito .any (GetHeadersFromPeerTask .class ), Mockito .eq (peerB )))
234- .thenReturn (
235- new PeerTaskExecutorResult <>(
236- Optional .empty (), PeerTaskExecutorResponseCode .TIMEOUT , List .of (peerB )));
235+ .thenAnswer (
236+ (invocationOnMock ) -> {
237+ Thread .sleep (200 );
238+ return new PeerTaskExecutorResult <>(
239+ Optional .empty (), PeerTaskExecutorResponseCode .TIMEOUT , List .of (peerB ));
240+ });
241+
237242 Mockito .when (
238243 peerTaskExecutor .executeAgainstPeer (
239244 Mockito .any (GetHeadersFromPeerTask .class ), Mockito .eq (peerC )))
@@ -245,6 +250,8 @@ public void shouldRecoverFromUnresponsivePeer(final DataStorageFormat storageFor
245250
246251 final CompletableFuture <FastSyncState > future = pivotBlockRetriever .downloadPivotBlockHeader ();
247252
253+ waitUntilComplete (future );
254+
248255 Mockito .verify (peerTaskExecutor )
249256 .executeAgainstPeer (Mockito .any (GetHeadersFromPeerTask .class ), Mockito .eq (peerA ));
250257 Mockito .verify (peerTaskExecutor )
@@ -304,6 +311,8 @@ public void shouldRetryWhenPeersDisagreeOnPivot_successfulRetry(
304311 // Execute task and wait for response
305312 final CompletableFuture <FastSyncState > future = pivotBlockRetriever .downloadPivotBlockHeader ();
306313
314+ waitUntilComplete (future );
315+
307316 Mockito .verify (peerTaskExecutor , Mockito .times (2 ))
308317 .executeAgainstPeer (Mockito .any (GetHeadersFromPeerTask .class ), Mockito .eq (peerA ));
309318 Mockito .verify (peerTaskExecutor , Mockito .times (2 ))
@@ -367,6 +376,8 @@ public void shouldRetryWhenPeersDisagreeOnPivot_exceedMaxRetries(
367376 // Execute task and wait for response
368377 final CompletableFuture <FastSyncState > future = pivotBlockRetriever .downloadPivotBlockHeader ();
369378
379+ waitUntilComplete (future );
380+
370381 Mockito .verify (peerTaskExecutor , Mockito .times (2 ))
371382 .executeAgainstPeer (Mockito .any (GetHeadersFromPeerTask .class ), Mockito .eq (peerA ));
372383 Mockito .verify (peerTaskExecutor , Mockito .times (2 ))
@@ -417,6 +428,8 @@ public void shouldRetryWhenPeersDisagreeOnPivot_pivotInvalidOnRetry(
417428 // Execute task and wait for response
418429 final CompletableFuture <FastSyncState > future = pivotBlockRetriever .downloadPivotBlockHeader ();
419430
431+ waitUntilComplete (future );
432+
420433 Mockito .verify (peerTaskExecutor )
421434 .executeAgainstPeer (Mockito .any (GetHeadersFromPeerTask .class ), Mockito .eq (peerA ));
422435 Mockito .verify (peerTaskExecutor )
@@ -429,6 +442,14 @@ public void shouldRetryWhenPeersDisagreeOnPivot_pivotInvalidOnRetry(
429442 .isEqualTo (SyncError .PIVOT_BLOCK_HEADER_MISMATCH );
430443 }
431444
445+ private void waitUntilComplete (final CompletableFuture <?> future ) {
446+ try {
447+ future .join ();
448+ } catch (RuntimeException e ) {
449+ // do nothing
450+ }
451+ }
452+
432453 @ Test
433454 void dryRunDetector () {
434455 assertThat (true )
0 commit comments