Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 51 additions & 60 deletions app/src/integrationTest/kotlin/maru/app/MaruPeerScoringTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand All @@ -30,7 +31,6 @@ import maru.p2p.messages.BlockRetrievalStrategy
import maru.p2p.messages.DefaultBlockRetrievalStrategy
import maru.p2p.messages.StatusManager
import maru.serialization.SerDe
import maru.syncing.SyncStatusProvider
import maru.test.util.NetworkUtil.findFreePort
import net.consensys.linea.metrics.MetricsFacade
import org.apache.logging.log4j.LogManager
Expand All @@ -45,6 +45,7 @@ import org.hyperledger.besu.tests.acceptance.dsl.node.cluster.ClusterConfigurati
import org.hyperledger.besu.tests.acceptance.dsl.transaction.net.NetTransactions
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.fail
import testutils.FourEmptyResponsesStrategy
import testutils.MisbehavingP2PNetwork
import testutils.PeeringNodeNetworkStack
Expand All @@ -53,7 +54,6 @@ import testutils.besu.BesuFactory
import testutils.besu.BesuTransactionsHelper
import testutils.besu.ethGetBlockByNumber
import testutils.maru.MaruFactory
import testutils.maru.awaitTillMaruHasPeers

class MaruPeerScoringTest {
private lateinit var cluster: Cluster
Expand All @@ -65,9 +65,11 @@ class MaruPeerScoringTest {
private lateinit var fakeLineaContract: FakeLineaRollupSmartContractClient
private lateinit var validatorEthApiClient: EthApiClient
private lateinit var followerEthApiClient: EthApiClient
private var job: Job? = null

@AfterEach
fun tearDown() {
job?.cancel()
followerStack.maruApp.stop()
validatorStack.maruApp.stop()
followerStack.maruApp.close()
Expand All @@ -77,22 +79,17 @@ class MaruPeerScoringTest {

@Test
fun `node gets in sync with default block retrieval strategy`() {
val maruNodeSetup =
setUpNodes(blockRetrievalStrategy = DefaultBlockRetrievalStrategy())
setUpNodes(blockRetrievalStrategy = DefaultBlockRetrievalStrategy())

try {
await
.atMost(20.seconds.toJavaDuration())
.pollInterval(200.milliseconds.toJavaDuration())
.ignoreExceptions()
.untilAsserted {
assertThat(
followerEthApiClient.getBlockByNumberWithoutTransactionsData(BlockParameter.Tag.LATEST).get().number,
).isGreaterThanOrEqualTo(15UL)
}
} finally {
maruNodeSetup.job.cancel()
}
await
.atMost(20.seconds.toJavaDuration())
.pollInterval(200.milliseconds.toJavaDuration())
.ignoreExceptions()
.untilAsserted {
assertThat(
followerEthApiClient.getBlockByNumberWithoutTransactionsData(BlockParameter.Tag.LATEST).get().number,
).isGreaterThanOrEqualTo(15UL)
}
}

@Test
Expand All @@ -104,31 +101,27 @@ class MaruPeerScoringTest {
followerCooldownPeriod = 10.minutes,
)

try {
// In setUpNodes we have made sure that the validator and the follower have 1 peer
// Now wait until it is disconnected because of empty responses
await
.atMost(2.seconds.toJavaDuration())
.pollInterval(250.milliseconds.toJavaDuration())
.ignoreExceptions()
.untilAsserted {
assertThat(
maruNodeSetup.followerMaruApp.p2pNetwork.peerCount == 0,
)
}
// reconnects after ban period and finishes syncing
await
.atMost(20.seconds.toJavaDuration())
.pollInterval(200.milliseconds.toJavaDuration())
.ignoreExceptions()
.untilAsserted {
assertThat(
followerEthApiClient.getBlockByNumberWithoutTransactionsData(BlockParameter.Tag.LATEST).get().number,
).isGreaterThanOrEqualTo(18UL)
}
} finally {
maruNodeSetup.job.cancel()
}
// In setUpNodes we have made sure that the validator and the follower have 1 peer
// Now wait until it is disconnected because of empty responses
await
.atMost(2.seconds.toJavaDuration())
.pollInterval(250.milliseconds.toJavaDuration())
.ignoreExceptions()
.untilAsserted {
assertThat(
maruNodeSetup.followerMaruApp.p2pNetwork.peerCount == 0,
)
}
// reconnects after ban period and finishes syncing
await
.atMost(20.seconds.toJavaDuration())
.pollInterval(200.milliseconds.toJavaDuration())
.ignoreExceptions()
.untilAsserted {
assertThat(
followerEthApiClient.getBlockByNumberWithoutTransactionsData(BlockParameter.Tag.LATEST).get().number,
).isGreaterThanOrEqualTo(18UL)
}
}

@Test
Expand All @@ -141,22 +134,17 @@ class MaruPeerScoringTest {
blockRetrievalStrategy = TimeOutResponsesStrategy(delay = delay),
validatorCooldownPeriod = 20.seconds,
)
try {
sleep((delay - 1.seconds).inWholeMilliseconds)
assertThat(maruNodeSetup.followerMaruApp.p2pNetwork.peerCount == 1)
sleep((timeout - 1.seconds).inWholeMilliseconds)
assertThat(maruNodeSetup.followerMaruApp.p2pNetwork.peerCount).isEqualTo(1)

await.untilAsserted {
assertThat(maruNodeSetup.followerMaruApp.p2pNetwork.peerCount == 0)
}
} finally {
maruNodeSetup.job.cancel()
await.untilAsserted {
assertThat(maruNodeSetup.followerMaruApp.p2pNetwork.peerCount).isEqualTo(0)
}
}

data class MaruNodeSetup(
val validatorMaruApp: MaruApp,
val followerMaruApp: MaruApp,
val job: Job,
)

fun setUpNodes(
Expand Down Expand Up @@ -206,7 +194,6 @@ class MaruPeerScoringTest {
forkIdHashManager: ForkPeeringManager,
isBlockImportEnabledProvider: () -> Boolean,
p2pState: P2PState,
syncStatusProviderProvider: () -> SyncStatusProvider,
->
MisbehavingP2PNetwork(
privateKeyBytes = privateKeyBytes,
Expand All @@ -220,7 +207,6 @@ class MaruPeerScoringTest {
forkIdHashManager = forkIdHashManager,
isBlockImportEnabledProvider = isBlockImportEnabledProvider,
p2pState = p2pState,
syncStatusProviderProvider = syncStatusProviderProvider,
blockRetrievalStrategy = blockRetrievalStrategy,
).p2pNetwork
},
Expand Down Expand Up @@ -272,8 +258,13 @@ class MaruPeerScoringTest {
)
followerStack.setMaruApp(followerMaruApp)

val job =
CoroutineScope(Dispatchers.Default).launch {
val handler =
CoroutineExceptionHandler { _, exception ->
fail("Coroutine failed with exception: $exception")
}

job =
CoroutineScope(Dispatchers.Default).launch(handler) {
while (true) {
transactionsHelper.run {
validatorStack.besuNode.sendTransactionAndAssertExecution(
Expand All @@ -298,16 +289,14 @@ class MaruPeerScoringTest {

followerStack.maruApp.start()

validatorStack.maruApp.awaitTillMaruHasPeers(1u)
followerStack.maruApp.awaitTillMaruHasPeers(1u)

followerEthApiClient =
createEthApiClient(
rpcUrl = followerStack.besuNode.jsonRpcBaseUrl().get(),
log = LogManager.getLogger("clients.l2.test.follower"),
requestRetryConfig = null,
vertx = null,
)

// wait for Besu to be fully started and synced,
// to avoid CI flakiness due to low resources sometimes
await
Expand All @@ -319,9 +308,11 @@ class MaruPeerScoringTest {
followerEthApiClient.getBlockByNumberWithoutTransactionsData(BlockParameter.Tag.LATEST).get().number,
).isGreaterThanOrEqualTo(0UL)
}

return MaruNodeSetup(validatorMaruApp = validatorMaruApp, followerMaruApp = followerMaruApp)
} catch (e: Exception) {
job.cancel()
job?.cancel()
throw e
}
return MaruNodeSetup(validatorMaruApp = validatorMaruApp, followerMaruApp = followerMaruApp, job = job)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import maru.p2p.messages.BeaconBlocksByRangeRequest
import maru.p2p.messages.BlockRetrievalStrategy
import maru.p2p.messages.StatusManager
import maru.serialization.SerDe
import maru.syncing.SyncStatusProvider
import net.consensys.linea.metrics.MetricsFacade
import org.hyperledger.besu.plugin.services.MetricsSystem as BesuMetricsSystem

Expand All @@ -36,7 +35,6 @@ class MisbehavingP2PNetwork(
forkIdHashManager: ForkPeeringManager,
isBlockImportEnabledProvider: () -> Boolean,
p2pState: P2PState,
syncStatusProviderProvider: () -> SyncStatusProvider,
blockRetrievalStrategy: BlockRetrievalStrategy,
) {
val p2pNetwork: P2PNetworkImpl =
Expand All @@ -52,7 +50,6 @@ class MisbehavingP2PNetwork(
forkIdHashManager = forkIdHashManager,
isBlockImportEnabledProvider = isBlockImportEnabledProvider,
p2PState = p2pState,
syncStatusProviderProvider = syncStatusProviderProvider,
rpcMethodsFactory = { statusMessageFactory, lineaRpcProtocolIdGenerator, peerLookup, beaconChain ->
RpcMethods(statusMessageFactory, lineaRpcProtocolIdGenerator, peerLookup, beaconChain, blockRetrievalStrategy)
},
Expand Down
7 changes: 0 additions & 7 deletions app/src/main/kotlin/maru/app/MaruAppFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import maru.syncing.HighestHeadTargetSelector
import maru.syncing.MostFrequentHeadTargetSelector
import maru.syncing.PeerChainTracker
import maru.syncing.SyncController
import maru.syncing.SyncStatusProvider
import maru.syncing.SyncTargetSelector
import maru.syncing.beaconchain.pipeline.BeaconChainDownloadPipelineFactory
import net.consensys.linea.metrics.MetricsFacade
Expand Down Expand Up @@ -103,7 +102,6 @@ interface MaruAppFactoryCreator {
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
): LongRunningCloseable
}
Expand Down Expand Up @@ -131,7 +129,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl,
): MaruApp {
log.info("configs={}", config)
Expand Down Expand Up @@ -219,7 +216,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
}
},
p2PState = kvDatabase,
syncStatusProviderProvider = { syncControllerImpl!! },
clock = clock,
p2pNetworkFactory = p2pNetworkFactory,
)
Expand Down Expand Up @@ -386,7 +382,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
besuMetricsSystem: BesuMetricsSystem,
clock: Clock,
p2PState: P2PState,
syncStatusProviderProvider: () -> SyncStatusProvider,
p2pNetworkFactory: (
ByteArray,
P2PConfig,
Expand All @@ -399,7 +394,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
): P2PNetwork {
if (p2pConfig == null) {
Expand Down Expand Up @@ -433,7 +427,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
forkIdHashManager,
isBlockImportEnabledProvider,
p2PState,
syncStatusProviderProvider,
)
}

Expand Down
2 changes: 0 additions & 2 deletions app/src/test/kotlin/maru/app/MaruAppCliTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import maru.p2p.messages.StatusManager
import maru.serialization.SerDe
import maru.services.LongRunningService
import maru.services.NoOpLongRunningService
import maru.syncing.SyncStatusProvider
import net.consensys.linea.metrics.MetricsFacade
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterAll
Expand Down Expand Up @@ -170,7 +169,6 @@ class MaruAppCliTest {
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl,
): LongRunningCloseable {
capturedMaruConfig = config
Expand Down
2 changes: 2 additions & 0 deletions config/src/main/kotlin/maru/config/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ data class P2PConfig(
val port: UInt = 9000u,
val bootnodes: List<String> = emptyList(),
val refreshInterval: Duration,
val searchInterval: Duration = 1.seconds,
val searchTimeout: Duration = 30.seconds,
val advertisedIp: String? = null,
) {
init {
Expand Down
4 changes: 4 additions & 0 deletions config/src/test/kotlin/maru/config/HopliteFriendlinessTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class HopliteFriendlinessTest {
port = 3324
bootnodes = ["enr:-Iu4QHk0YN5IRRnufqsWkbO6Tn0iGTx4H_hnyiIEdXDuhIe0KKrxmaECisyvO40mEmmqKLhz_tdIhx2yFBK8XFKhvxABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQOgBvD-dv0cX5szOeEsiAMtwxnP1q5CA5toYDrgUyOhV4N0Y3CCJBKDdWRwgiQT"]
refresh-interval = "30 seconds"
search-timeout = "10 seconds"
search-interval = "2 seconds"
advertised-ip = "13.12.11.10"

[p2p.reputation]
Expand Down Expand Up @@ -147,6 +149,8 @@ class HopliteFriendlinessTest {
"enr:-Iu4QHk0YN5IRRnufqsWkbO6Tn0iGTx4H_hnyiIEdXDuhIe0KKrxmaECisyvO40mEmmqKLhz_tdIhx2yFBK8XFKhvxABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQOgBvD-dv0cX5szOeEsiAMtwxnP1q5CA5toYDrgUyOhV4N0Y3CCJBKDdWRwgiQT",
),
refreshInterval = 30.seconds,
searchTimeout = 10.seconds,
searchInterval = 2.seconds,
advertisedIp = "13.12.11.10",
),
reputation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import maru.p2p.P2PNetworkImpl
import maru.p2p.fork.ForkPeeringManager
import maru.p2p.messages.StatusManager
import maru.serialization.SerDe
import maru.syncing.SyncStatusProvider
import net.consensys.linea.metrics.MetricsFacade
import org.hyperledger.besu.plugin.services.MetricsSystem as BesuMetricsSystem

Expand Down Expand Up @@ -347,7 +346,6 @@ class MaruFactory(
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
startApiServer: Boolean = false,
): MaruApp =
Expand Down Expand Up @@ -458,7 +456,6 @@ class MaruFactory(
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
apiPort: UInt = 0u,
startApiServer: Boolean = false,
Expand Down Expand Up @@ -530,7 +527,6 @@ class MaruFactory(
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
): MaruApp {
val p2pConfig =
Expand Down Expand Up @@ -598,7 +594,6 @@ class MaruFactory(
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
): MaruApp {
val p2pConfig =
Expand Down
Loading
Loading