Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import maru.p2p.messages.BlockRetrievalStrategy
import maru.p2p.messages.DefaultBlockRetrievalStrategy
import maru.p2p.messages.StatusManager
import maru.serialization.SerDe
import maru.syncing.SyncStatusProvider
import net.consensys.linea.metrics.MetricsFacade
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
Expand Down Expand Up @@ -185,7 +184,6 @@ class MaruPeerScoringTest {
forkIdHashManager: ForkPeeringManager,
isBlockImportEnabledProvider: () -> Boolean,
p2pState: P2PState,
syncStatusProviderProvider: () -> SyncStatusProvider,
->
MisbehavingP2PNetwork(
privateKeyBytes = privateKeyBytes,
Expand All @@ -199,7 +197,6 @@ class MaruPeerScoringTest {
forkIdHashManager = forkIdHashManager,
isBlockImportEnabledProvider = isBlockImportEnabledProvider,
p2pState = p2pState,
syncStatusProviderProvider = syncStatusProviderProvider,
blockRetrievalStrategy = blockRetrievalStrategy,
).p2pNetwork
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import maru.p2p.messages.BeaconBlocksByRangeRequest
import maru.p2p.messages.BlockRetrievalStrategy
import maru.p2p.messages.StatusManager
import maru.serialization.SerDe
import maru.syncing.SyncStatusProvider
import net.consensys.linea.metrics.MetricsFacade
import org.hyperledger.besu.plugin.services.MetricsSystem as BesuMetricsSystem

Expand All @@ -36,7 +35,6 @@ class MisbehavingP2PNetwork(
forkIdHashManager: ForkPeeringManager,
isBlockImportEnabledProvider: () -> Boolean,
p2pState: P2PState,
syncStatusProviderProvider: () -> SyncStatusProvider,
blockRetrievalStrategy: BlockRetrievalStrategy,
) {
val p2pNetwork: P2PNetworkImpl =
Expand All @@ -52,7 +50,6 @@ class MisbehavingP2PNetwork(
forkIdHashManager = forkIdHashManager,
isBlockImportEnabledProvider = isBlockImportEnabledProvider,
p2PState = p2pState,
syncStatusProviderProvider = syncStatusProviderProvider,
rpcMethodsFactory = { statusMessageFactory, lineaRpcProtocolIdGenerator, peerLookup, beaconChain ->
RpcMethods(statusMessageFactory, lineaRpcProtocolIdGenerator, peerLookup, beaconChain, blockRetrievalStrategy)
},
Expand Down
7 changes: 0 additions & 7 deletions app/src/main/kotlin/maru/app/MaruAppFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import maru.syncing.HighestHeadTargetSelector
import maru.syncing.MostFrequentHeadTargetSelector
import maru.syncing.PeerChainTracker
import maru.syncing.SyncController
import maru.syncing.SyncStatusProvider
import maru.syncing.SyncTargetSelector
import maru.syncing.beaconchain.pipeline.BeaconChainDownloadPipelineFactory
import net.consensys.linea.metrics.MetricsFacade
Expand Down Expand Up @@ -103,7 +102,6 @@ interface MaruAppFactoryCreator {
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
): LongRunningCloseable
}
Expand Down Expand Up @@ -131,7 +129,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl,
): MaruApp {
log.info("configs={}", config)
Expand Down Expand Up @@ -219,7 +216,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
}
},
p2PState = kvDatabase,
syncStatusProviderProvider = { syncControllerImpl!! },
clock = clock,
p2pNetworkFactory = p2pNetworkFactory,
)
Expand Down Expand Up @@ -386,7 +382,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
besuMetricsSystem: BesuMetricsSystem,
clock: Clock,
p2PState: P2PState,
syncStatusProviderProvider: () -> SyncStatusProvider,
p2pNetworkFactory: (
ByteArray,
P2PConfig,
Expand All @@ -399,7 +394,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
): P2PNetwork {
if (p2pConfig == null) {
Expand Down Expand Up @@ -433,7 +427,6 @@ class MaruAppFactory : MaruAppFactoryCreator {
forkIdHashManager,
isBlockImportEnabledProvider,
p2PState,
syncStatusProviderProvider,
)
}

Expand Down
2 changes: 0 additions & 2 deletions app/src/test/kotlin/maru/app/MaruAppCliTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import maru.p2p.messages.StatusManager
import maru.serialization.SerDe
import maru.services.LongRunningService
import maru.services.NoOpLongRunningService
import maru.syncing.SyncStatusProvider
import net.consensys.linea.metrics.MetricsFacade
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterAll
Expand Down Expand Up @@ -170,7 +169,6 @@ class MaruAppCliTest {
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl,
): LongRunningCloseable {
capturedMaruConfig = config
Expand Down
2 changes: 2 additions & 0 deletions config/src/main/kotlin/maru/config/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ data class P2PConfig(
val port: UInt = 9000u,
val bootnodes: List<String> = emptyList(),
val refreshInterval: Duration,
val searchInterval: Duration = 1.seconds,
val searchTimeout: Duration = 30.seconds,
val advertisedIp: String? = null,
) {
init {
Expand Down
4 changes: 4 additions & 0 deletions config/src/test/kotlin/maru/config/HopliteFriendlinessTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class HopliteFriendlinessTest {
port = 3324
bootnodes = ["enr:-Iu4QHk0YN5IRRnufqsWkbO6Tn0iGTx4H_hnyiIEdXDuhIe0KKrxmaECisyvO40mEmmqKLhz_tdIhx2yFBK8XFKhvxABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQOgBvD-dv0cX5szOeEsiAMtwxnP1q5CA5toYDrgUyOhV4N0Y3CCJBKDdWRwgiQT"]
refresh-interval = "30 seconds"
search-timeout = "10 seconds"
search-interval = "2 seconds"
advertised-ip = "13.12.11.10"

[p2p.reputation]
Expand Down Expand Up @@ -147,6 +149,8 @@ class HopliteFriendlinessTest {
"enr:-Iu4QHk0YN5IRRnufqsWkbO6Tn0iGTx4H_hnyiIEdXDuhIe0KKrxmaECisyvO40mEmmqKLhz_tdIhx2yFBK8XFKhvxABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQOgBvD-dv0cX5szOeEsiAMtwxnP1q5CA5toYDrgUyOhV4N0Y3CCJBKDdWRwgiQT",
),
refreshInterval = 30.seconds,
searchTimeout = 10.seconds,
searchInterval = 2.seconds,
advertisedIp = "13.12.11.10",
),
reputation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import maru.p2p.P2PNetworkImpl
import maru.p2p.fork.ForkPeeringManager
import maru.p2p.messages.StatusManager
import maru.serialization.SerDe
import maru.syncing.SyncStatusProvider
import net.consensys.linea.metrics.MetricsFacade
import org.hyperledger.besu.plugin.services.MetricsSystem as BesuMetricsSystem

Expand Down Expand Up @@ -347,7 +346,6 @@ class MaruFactory(
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
startApiServer: Boolean = false,
): MaruApp =
Expand Down Expand Up @@ -458,7 +456,6 @@ class MaruFactory(
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
apiPort: UInt = 0u,
startApiServer: Boolean = false,
Expand Down Expand Up @@ -530,7 +527,6 @@ class MaruFactory(
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
): MaruApp {
val p2pConfig =
Expand Down Expand Up @@ -598,7 +594,6 @@ class MaruFactory(
ForkPeeringManager,
() -> Boolean,
P2PState,
() -> SyncStatusProvider,
) -> P2PNetworkImpl = ::P2PNetworkImpl,
): MaruApp {
val p2pConfig =
Expand Down
27 changes: 7 additions & 20 deletions p2p/src/main/kotlin/maru/p2p/MaruPeerManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import maru.config.P2PConfig
import maru.p2p.discovery.MaruDiscoveryService
import maru.syncing.SyncStatusProvider
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import tech.pegasys.teku.infrastructure.async.SafeFuture
import tech.pegasys.teku.networking.p2p.network.P2PNetwork
import tech.pegasys.teku.networking.p2p.network.PeerHandler
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason
Expand All @@ -27,10 +25,9 @@ import tech.pegasys.teku.networking.p2p.peer.Peer

class MaruPeerManager(
private val maruPeerFactory: MaruPeerFactory,
p2pConfig: P2PConfig,
private val p2pConfig: P2PConfig,
private val reputationManager: MaruReputationManager,
private val isStaticPeer: (NodeId) -> Boolean,
private val syncStatusProviderProvider: () -> SyncStatusProvider,
) : PeerHandler,
PeerLookup {
private val log: Logger = LogManager.getLogger(this.javaClass)
Expand All @@ -43,8 +40,6 @@ class MaruPeerManager(
private val peers: ConcurrentHashMap<NodeId, MaruPeer> = ConcurrentHashMap()

private var scheduler: ScheduledExecutorService? = null
private lateinit var p2pNetwork: P2PNetwork<Peer>
private lateinit var syncStatusProvider: SyncStatusProvider

val peerCount: Int
get() = connectedPeers().size
Expand All @@ -61,8 +56,6 @@ class MaruPeerManager(
return
}
this.discoveryService = discoveryService
this.p2pNetwork = p2pNetwork
this.syncStatusProvider = syncStatusProviderProvider()
scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofPlatform().daemon().factory())
scheduler!!.scheduleAtFixedRate({
logConnectedPeers()
Expand All @@ -75,32 +68,26 @@ class MaruPeerManager(
reputationManager = reputationManager,
maxPeers = maxPeers,
getPeerCount = { peerCount },
discoveryConfig = p2pConfig.discovery!!,
)
discoveryTask!!.start()
}
}

fun stop(): SafeFuture<Unit> {
fun stop() {
if (!started.compareAndSet(true, false)) {
log.warn("Trying to stop stopped MaruPeerManager")
return SafeFuture.completedFuture(Unit)
return
}
discoveryTask?.stop()
discoveryTask = null
scheduler!!.shutdown()
scheduler = null
return SafeFuture.completedFuture(Unit)
peers.values.forEach { it.disconnectCleanly(DisconnectReason.SHUTTING_DOWN) }
}

private fun logConnectedPeers() {
connectedPeers().keys.toList().also { peers ->
log.info(
"currently connected peers: peerCount={} peers={}",
peers.size,
peers,
)
}

log.info("Currently connected peers={}", connectedPeers().keys.toList())
if (log.isDebugEnabled) {
discoveryService?.getKnownPeers()?.forEach { peer ->
log.debug("discovered peer={}", peer)
Expand All @@ -121,7 +108,7 @@ class MaruPeerManager(
log.debug("Connected to peer={}, static=$isAStaticPeer", peer.id)
} else { // not static and not allowed to connect -> disconnect
peer.disconnectCleanly(DisconnectReason.RATE_LIMITING)
log.debug("Peer={} is not allowed to connect yet", peer.id)
log.trace("Peer={} is not allowed to connect yet", peer.id)
}
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/src/main/kotlin/maru/p2p/MaruRpcResponseCallback.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class MaruRpcResponseCallback<TResponse : Message<*, *>>(
),
).finishWarn(log)
} catch (e: StreamClosedException) {
log.debug(
log.trace(
"Unable to send error message ({}) to peer, rpc stream already closed: {}",
error,
rpcStream,
Expand Down
8 changes: 2 additions & 6 deletions p2p/src/main/kotlin/maru/p2p/P2PNetworkImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import maru.p2p.topics.QbftMessageSerDe
import maru.p2p.topics.TopicHandlerWithInOrderDelivering
import maru.serialization.SerDe
import maru.serialization.rlp.MaruCompressorRLPSerDe
import maru.syncing.SyncStatusProvider
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.metrics.Tag
import org.apache.logging.log4j.LogManager
Expand Down Expand Up @@ -66,7 +65,6 @@ class P2PNetworkImpl(
private val forkIdHashManager: ForkPeeringManager,
isBlockImportEnabledProvider: () -> Boolean,
private val p2PState: P2PState,
private val syncStatusProviderProvider: () -> SyncStatusProvider,
// for testing:
private val rpcMethodsFactory: (
StatusManager,
Expand Down Expand Up @@ -141,7 +139,6 @@ class P2PNetworkImpl(
p2pConfig = p2pConfig,
reputationManager = reputationManager,
isStaticPeer = this::isStaticPeer,
syncStatusProviderProvider = syncStatusProviderProvider,
)

return Libp2pNetworkFactory(LINEA_DOMAIN).build(
Expand Down Expand Up @@ -250,10 +247,9 @@ class P2PNetworkImpl(

override fun stop(): SafeFuture<Unit> {
log.info("Stopping={}", this::class.simpleName)
val pmStop = maruPeerManager.stop()
maruPeerManager.stop()
discoveryService?.stop()
val p2pStop = p2pNetwork.stop()
return SafeFuture.allOf(p2pStop, pmStop).thenApply {}
return p2pNetwork.stop().thenApply {}
}

override fun close() {
Expand Down
13 changes: 8 additions & 5 deletions p2p/src/main/kotlin/maru/p2p/PeerDiscoveryTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
*/
package maru.p2p

import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import maru.p2p.discovery.MaruDiscoveryPeer
import kotlin.time.toJavaDuration
import maru.config.P2PConfig
import maru.p2p.discovery.MaruDiscoveryService
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.apache.tuweni.bytes.Bytes
import tech.pegasys.teku.networking.p2p.discovery.DiscoveryPeer
import tech.pegasys.teku.networking.p2p.libp2p.PeerAlreadyConnectedException
import tech.pegasys.teku.networking.p2p.network.P2PNetwork
import tech.pegasys.teku.networking.p2p.peer.Peer
Expand All @@ -29,6 +30,7 @@ class PeerDiscoveryTask(
private val reputationManager: ReputationManager,
private val maxPeers: Int,
private val getPeerCount: () -> Int,
private val discoveryConfig: P2PConfig.Discovery,
) {
private val log: Logger = LogManager.getLogger(this.javaClass)

Expand All @@ -37,6 +39,7 @@ class PeerDiscoveryTask(
private val started = AtomicBoolean(false)
private val connectionInProgress = mutableListOf<Bytes>()
private val scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofPlatform().daemon().factory())
private val searchTimeout = discoveryConfig.searchTimeout.toJavaDuration()

fun start() {
if (!started.compareAndSet(false, true)) {
Expand All @@ -47,7 +50,7 @@ class PeerDiscoveryTask(
scheduler.scheduleWithFixedDelay(
{ runSearchTask(discoveryService) },
0,
1000,
discoveryConfig.searchInterval.inWholeMilliseconds,
TimeUnit.MILLISECONDS,
)
}
Expand All @@ -69,7 +72,7 @@ class PeerDiscoveryTask(
try {
discoveryService
.searchForPeers()
.orTimeout(Duration.ofSeconds(30L))
.orTimeout(searchTimeout)
.whenComplete { availablePeers, throwable ->
if (throwable != null) {
log.debug("finished searching for peers with error={}", throwable.message)
Expand All @@ -87,7 +90,7 @@ class PeerDiscoveryTask(
}
}

private fun tryToConnect(peer: MaruDiscoveryPeer) {
private fun tryToConnect(peer: DiscoveryPeer) {
try {
if (!started.get()) return
if (getPeerCount() >= maxPeers) return
Expand Down
Loading
Loading