Skip to content

Commit 1081cdb

Browse files
pingesFilter94
andauthored
Minor cleanup (#417)
* minor cleanup Signed-off-by: [email protected] <[email protected]> * spotless Signed-off-by: [email protected] <[email protected]> * make it work Signed-off-by: [email protected] <[email protected]> * make search interval and search timeout in discovery configurable Signed-off-by: [email protected] <[email protected]> * add exception handler Signed-off-by: [email protected] <[email protected]> * updates after comments Signed-off-by: [email protected] <[email protected]> * job and logs Signed-off-by: [email protected] <[email protected]> * try to fix flaky test Signed-off-by: [email protected] <[email protected]> * Removed unneeded syncStatusProviderProvider * Removed unneeded syncStatusProviderProvider * Merged with main one more time * Cleaned unused argument * Removed MaruDiscoveryPeer, replaced with Teku's DiscoveryPeer * Fixed assertions * Fixed the test --------- Signed-off-by: [email protected] <[email protected]> Co-authored-by: Roman <[email protected]>
1 parent 5fd78c0 commit 1081cdb

File tree

17 files changed

+49
-186
lines changed

17 files changed

+49
-186
lines changed

app/src/integrationTest/kotlin/maru/app/MaruPeerScoringTest.kt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import maru.p2p.messages.BlockRetrievalStrategy
2626
import maru.p2p.messages.DefaultBlockRetrievalStrategy
2727
import maru.p2p.messages.StatusManager
2828
import maru.serialization.SerDe
29-
import maru.syncing.SyncStatusProvider
3029
import net.consensys.linea.metrics.MetricsFacade
3130
import org.apache.logging.log4j.LogManager
3231
import org.assertj.core.api.Assertions.assertThat
@@ -185,7 +184,6 @@ class MaruPeerScoringTest {
185184
forkIdHashManager: ForkPeeringManager,
186185
isBlockImportEnabledProvider: () -> Boolean,
187186
p2pState: P2PState,
188-
syncStatusProviderProvider: () -> SyncStatusProvider,
189187
->
190188
MisbehavingP2PNetwork(
191189
privateKeyBytes = privateKeyBytes,
@@ -199,7 +197,6 @@ class MaruPeerScoringTest {
199197
forkIdHashManager = forkIdHashManager,
200198
isBlockImportEnabledProvider = isBlockImportEnabledProvider,
201199
p2pState = p2pState,
202-
syncStatusProviderProvider = syncStatusProviderProvider,
203200
blockRetrievalStrategy = blockRetrievalStrategy,
204201
).p2pNetwork
205202
},

app/src/integrationTest/kotlin/testutils/MisbehavingP2PNetwork.kt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import maru.p2p.messages.BeaconBlocksByRangeRequest
2020
import maru.p2p.messages.BlockRetrievalStrategy
2121
import maru.p2p.messages.StatusManager
2222
import maru.serialization.SerDe
23-
import maru.syncing.SyncStatusProvider
2423
import net.consensys.linea.metrics.MetricsFacade
2524
import org.hyperledger.besu.plugin.services.MetricsSystem as BesuMetricsSystem
2625

@@ -36,7 +35,6 @@ class MisbehavingP2PNetwork(
3635
forkIdHashManager: ForkPeeringManager,
3736
isBlockImportEnabledProvider: () -> Boolean,
3837
p2pState: P2PState,
39-
syncStatusProviderProvider: () -> SyncStatusProvider,
4038
blockRetrievalStrategy: BlockRetrievalStrategy,
4139
) {
4240
val p2pNetwork: P2PNetworkImpl =
@@ -52,7 +50,6 @@ class MisbehavingP2PNetwork(
5250
forkIdHashManager = forkIdHashManager,
5351
isBlockImportEnabledProvider = isBlockImportEnabledProvider,
5452
p2PState = p2pState,
55-
syncStatusProviderProvider = syncStatusProviderProvider,
5653
rpcMethodsFactory = { statusMessageFactory, lineaRpcProtocolIdGenerator, peerLookup, beaconChain ->
5754
RpcMethods(statusMessageFactory, lineaRpcProtocolIdGenerator, peerLookup, beaconChain, blockRetrievalStrategy)
5855
},

app/src/main/kotlin/maru/app/MaruAppFactory.kt

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ import maru.syncing.HighestHeadTargetSelector
6868
import maru.syncing.MostFrequentHeadTargetSelector
6969
import maru.syncing.PeerChainTracker
7070
import maru.syncing.SyncController
71-
import maru.syncing.SyncStatusProvider
7271
import maru.syncing.SyncTargetSelector
7372
import maru.syncing.beaconchain.pipeline.BeaconChainDownloadPipelineFactory
7473
import net.consensys.linea.metrics.MetricsFacade
@@ -103,7 +102,6 @@ interface MaruAppFactoryCreator {
103102
ForkPeeringManager,
104103
() -> Boolean,
105104
P2PState,
106-
() -> SyncStatusProvider,
107105
) -> P2PNetworkImpl = ::P2PNetworkImpl,
108106
): LongRunningCloseable
109107
}
@@ -131,7 +129,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
131129
ForkPeeringManager,
132130
() -> Boolean,
133131
P2PState,
134-
() -> SyncStatusProvider,
135132
) -> P2PNetworkImpl,
136133
): MaruApp {
137134
log.info("configs={}", config)
@@ -219,7 +216,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
219216
}
220217
},
221218
p2PState = kvDatabase,
222-
syncStatusProviderProvider = { syncControllerImpl!! },
223219
clock = clock,
224220
p2pNetworkFactory = p2pNetworkFactory,
225221
)
@@ -386,7 +382,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
386382
besuMetricsSystem: BesuMetricsSystem,
387383
clock: Clock,
388384
p2PState: P2PState,
389-
syncStatusProviderProvider: () -> SyncStatusProvider,
390385
p2pNetworkFactory: (
391386
ByteArray,
392387
P2PConfig,
@@ -399,7 +394,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
399394
ForkPeeringManager,
400395
() -> Boolean,
401396
P2PState,
402-
() -> SyncStatusProvider,
403397
) -> P2PNetworkImpl = ::P2PNetworkImpl,
404398
): P2PNetwork {
405399
if (p2pConfig == null) {
@@ -433,7 +427,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
433427
forkIdHashManager,
434428
isBlockImportEnabledProvider,
435429
p2PState,
436-
syncStatusProviderProvider,
437430
)
438431
}
439432

app/src/test/kotlin/maru/app/MaruAppCliTest.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import maru.p2p.messages.StatusManager
2828
import maru.serialization.SerDe
2929
import maru.services.LongRunningService
3030
import maru.services.NoOpLongRunningService
31-
import maru.syncing.SyncStatusProvider
3231
import net.consensys.linea.metrics.MetricsFacade
3332
import org.assertj.core.api.Assertions.assertThat
3433
import org.junit.jupiter.api.AfterAll
@@ -170,7 +169,6 @@ class MaruAppCliTest {
170169
ForkPeeringManager,
171170
() -> Boolean,
172171
P2PState,
173-
() -> SyncStatusProvider,
174172
) -> P2PNetworkImpl,
175173
): LongRunningCloseable {
176174
capturedMaruConfig = config

config/src/main/kotlin/maru/config/Config.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ data class P2PConfig(
6565
val port: UInt = 9000u,
6666
val bootnodes: List<String> = emptyList(),
6767
val refreshInterval: Duration,
68+
val searchInterval: Duration = 1.seconds,
69+
val searchTimeout: Duration = 30.seconds,
6870
val advertisedIp: String? = null,
6971
) {
7072
init {

config/src/test/kotlin/maru/config/HopliteFriendlinessTest.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class HopliteFriendlinessTest {
4444
port = 3324
4545
bootnodes = ["enr:-Iu4QHk0YN5IRRnufqsWkbO6Tn0iGTx4H_hnyiIEdXDuhIe0KKrxmaECisyvO40mEmmqKLhz_tdIhx2yFBK8XFKhvxABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQOgBvD-dv0cX5szOeEsiAMtwxnP1q5CA5toYDrgUyOhV4N0Y3CCJBKDdWRwgiQT"]
4646
refresh-interval = "30 seconds"
47+
search-timeout = "10 seconds"
48+
search-interval = "2 seconds"
4749
advertised-ip = "13.12.11.10"
4850
4951
[p2p.reputation]
@@ -147,6 +149,8 @@ class HopliteFriendlinessTest {
147149
"enr:-Iu4QHk0YN5IRRnufqsWkbO6Tn0iGTx4H_hnyiIEdXDuhIe0KKrxmaECisyvO40mEmmqKLhz_tdIhx2yFBK8XFKhvxABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQOgBvD-dv0cX5szOeEsiAMtwxnP1q5CA5toYDrgUyOhV4N0Y3CCJBKDdWRwgiQT",
148150
),
149151
refreshInterval = 30.seconds,
152+
searchTimeout = 10.seconds,
153+
searchInterval = 2.seconds,
150154
advertisedIp = "13.12.11.10",
151155
),
152156
reputation =

jvm-libs/test-utils/src/main/kotlin/testutils/maru/MaruFactory.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ import maru.p2p.P2PNetworkImpl
5858
import maru.p2p.fork.ForkPeeringManager
5959
import maru.p2p.messages.StatusManager
6060
import maru.serialization.SerDe
61-
import maru.syncing.SyncStatusProvider
6261
import net.consensys.linea.metrics.MetricsFacade
6362
import org.hyperledger.besu.plugin.services.MetricsSystem as BesuMetricsSystem
6463

@@ -347,7 +346,6 @@ class MaruFactory(
347346
ForkPeeringManager,
348347
() -> Boolean,
349348
P2PState,
350-
() -> SyncStatusProvider,
351349
) -> P2PNetworkImpl = ::P2PNetworkImpl,
352350
startApiServer: Boolean = false,
353351
): MaruApp =
@@ -458,7 +456,6 @@ class MaruFactory(
458456
ForkPeeringManager,
459457
() -> Boolean,
460458
P2PState,
461-
() -> SyncStatusProvider,
462459
) -> P2PNetworkImpl = ::P2PNetworkImpl,
463460
apiPort: UInt = 0u,
464461
startApiServer: Boolean = false,
@@ -530,7 +527,6 @@ class MaruFactory(
530527
ForkPeeringManager,
531528
() -> Boolean,
532529
P2PState,
533-
() -> SyncStatusProvider,
534530
) -> P2PNetworkImpl = ::P2PNetworkImpl,
535531
): MaruApp {
536532
val p2pConfig =
@@ -598,7 +594,6 @@ class MaruFactory(
598594
ForkPeeringManager,
599595
() -> Boolean,
600596
P2PState,
601-
() -> SyncStatusProvider,
602597
) -> P2PNetworkImpl = ::P2PNetworkImpl,
603598
): MaruApp {
604599
val p2pConfig =

p2p/src/main/kotlin/maru/p2p/MaruPeerManager.kt

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@ import java.util.concurrent.TimeUnit
1515
import java.util.concurrent.atomic.AtomicBoolean
1616
import maru.config.P2PConfig
1717
import maru.p2p.discovery.MaruDiscoveryService
18-
import maru.syncing.SyncStatusProvider
1918
import org.apache.logging.log4j.LogManager
2019
import org.apache.logging.log4j.Logger
21-
import tech.pegasys.teku.infrastructure.async.SafeFuture
2220
import tech.pegasys.teku.networking.p2p.network.P2PNetwork
2321
import tech.pegasys.teku.networking.p2p.network.PeerHandler
2422
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason
@@ -27,10 +25,9 @@ import tech.pegasys.teku.networking.p2p.peer.Peer
2725

2826
class MaruPeerManager(
2927
private val maruPeerFactory: MaruPeerFactory,
30-
p2pConfig: P2PConfig,
28+
private val p2pConfig: P2PConfig,
3129
private val reputationManager: MaruReputationManager,
3230
private val isStaticPeer: (NodeId) -> Boolean,
33-
private val syncStatusProviderProvider: () -> SyncStatusProvider,
3431
) : PeerHandler,
3532
PeerLookup {
3633
private val log: Logger = LogManager.getLogger(this.javaClass)
@@ -43,8 +40,6 @@ class MaruPeerManager(
4340
private val peers: ConcurrentHashMap<NodeId, MaruPeer> = ConcurrentHashMap()
4441

4542
private var scheduler: ScheduledExecutorService? = null
46-
private lateinit var p2pNetwork: P2PNetwork<Peer>
47-
private lateinit var syncStatusProvider: SyncStatusProvider
4843

4944
val peerCount: Int
5045
get() = connectedPeers().size
@@ -61,8 +56,6 @@ class MaruPeerManager(
6156
return
6257
}
6358
this.discoveryService = discoveryService
64-
this.p2pNetwork = p2pNetwork
65-
this.syncStatusProvider = syncStatusProviderProvider()
6659
scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofPlatform().daemon().factory())
6760
scheduler!!.scheduleAtFixedRate({
6861
logConnectedPeers()
@@ -75,32 +68,26 @@ class MaruPeerManager(
7568
reputationManager = reputationManager,
7669
maxPeers = maxPeers,
7770
getPeerCount = { peerCount },
71+
discoveryConfig = p2pConfig.discovery!!,
7872
)
7973
discoveryTask!!.start()
8074
}
8175
}
8276

83-
fun stop(): SafeFuture<Unit> {
77+
fun stop() {
8478
if (!started.compareAndSet(true, false)) {
8579
log.warn("Trying to stop stopped MaruPeerManager")
86-
return SafeFuture.completedFuture(Unit)
80+
return
8781
}
8882
discoveryTask?.stop()
8983
discoveryTask = null
9084
scheduler!!.shutdown()
9185
scheduler = null
92-
return SafeFuture.completedFuture(Unit)
86+
peers.values.forEach { it.disconnectCleanly(DisconnectReason.SHUTTING_DOWN) }
9387
}
9488

9589
private fun logConnectedPeers() {
96-
connectedPeers().keys.toList().also { peers ->
97-
log.info(
98-
"currently connected peers: peerCount={} peers={}",
99-
peers.size,
100-
peers,
101-
)
102-
}
103-
90+
log.info("Currently connected peers={}", connectedPeers().keys.toList())
10491
if (log.isDebugEnabled) {
10592
discoveryService?.getKnownPeers()?.forEach { peer ->
10693
log.debug("discovered peer={}", peer)
@@ -121,7 +108,7 @@ class MaruPeerManager(
121108
log.debug("Connected to peer={}, static=$isAStaticPeer", peer.id)
122109
} else { // not static and not allowed to connect -> disconnect
123110
peer.disconnectCleanly(DisconnectReason.RATE_LIMITING)
124-
log.debug("Peer={} is not allowed to connect yet", peer.id)
111+
log.trace("Peer={} is not allowed to connect yet", peer.id)
125112
}
126113
}
127114

p2p/src/main/kotlin/maru/p2p/MaruRpcResponseCallback.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class MaruRpcResponseCallback<TResponse : Message<*, *>>(
6565
),
6666
).finishWarn(log)
6767
} catch (e: StreamClosedException) {
68-
log.debug(
68+
log.trace(
6969
"Unable to send error message ({}) to peer, rpc stream already closed: {}",
7070
error,
7171
rpcStream,

p2p/src/main/kotlin/maru/p2p/P2PNetworkImpl.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import maru.p2p.topics.QbftMessageSerDe
3333
import maru.p2p.topics.TopicHandlerWithInOrderDelivering
3434
import maru.serialization.SerDe
3535
import maru.serialization.rlp.MaruCompressorRLPSerDe
36-
import maru.syncing.SyncStatusProvider
3736
import net.consensys.linea.metrics.MetricsFacade
3837
import net.consensys.linea.metrics.Tag
3938
import org.apache.logging.log4j.LogManager
@@ -66,7 +65,6 @@ class P2PNetworkImpl(
6665
private val forkIdHashManager: ForkPeeringManager,
6766
isBlockImportEnabledProvider: () -> Boolean,
6867
private val p2PState: P2PState,
69-
private val syncStatusProviderProvider: () -> SyncStatusProvider,
7068
// for testing:
7169
private val rpcMethodsFactory: (
7270
StatusManager,
@@ -141,7 +139,6 @@ class P2PNetworkImpl(
141139
p2pConfig = p2pConfig,
142140
reputationManager = reputationManager,
143141
isStaticPeer = this::isStaticPeer,
144-
syncStatusProviderProvider = syncStatusProviderProvider,
145142
)
146143

147144
return Libp2pNetworkFactory(LINEA_DOMAIN).build(
@@ -250,10 +247,9 @@ class P2PNetworkImpl(
250247

251248
override fun stop(): SafeFuture<Unit> {
252249
log.info("Stopping={}", this::class.simpleName)
253-
val pmStop = maruPeerManager.stop()
250+
maruPeerManager.stop()
254251
discoveryService?.stop()
255-
val p2pStop = p2pNetwork.stop()
256-
return SafeFuture.allOf(p2pStop, pmStop).thenApply {}
252+
return p2pNetwork.stop().thenApply {}
257253
}
258254

259255
override fun close() {

0 commit comments

Comments
 (0)