Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,6 @@ site/

# PQ test keys
packages/testing/src/consensus_testing/test_keys/prod_scheme/

# Local scripts
scripts/
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ dependencies = [
"cryptography>=46.0.0",
"numpy>=2.0.0,<3",
"prometheus-client>=0.21.0,<1",
"aioquic>=1.2.0,<2",
"pyyaml>=6.0.0,<7",
]

[project.license]
Expand Down
219 changes: 191 additions & 28 deletions src/lean_spec/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@

Usage::

python -m lean_spec --genesis genesis.json --bootnode /ip4/127.0.0.1/tcp/9000
python -m lean_spec --genesis genesis.json --bootnode enr:-IS4QHCYrYZbAKW...
python -m lean_spec --genesis genesis.json --checkpoint-sync-url http://localhost:5052
python -m lean_spec --genesis genesis.json --validator-keys ./keys --node-id node_0
python -m lean_spec --genesis config.yaml --bootnode /ip4/127.0.0.1/tcp/9000
python -m lean_spec --genesis config.yaml --bootnode enr:-IS4QHCYrYZbAKW...
python -m lean_spec --genesis config.yaml --checkpoint-sync-url http://localhost:5052
python -m lean_spec --genesis config.yaml --validator-keys ./keys --node-id node_0

Options:
--genesis Path to genesis JSON file (required)
--genesis Path to genesis YAML file (required)
--bootnode Bootnode address (multiaddr or ENR string, can be repeated)
--listen Address to listen on (default: /ip4/0.0.0.0/tcp/9000)
--listen Address to listen on (default: /ip4/0.0.0.0/tcp/9001)
--checkpoint-sync-url URL to fetch finalized checkpoint state for fast sync
--validator-keys Path to validator keys directory
--node-id Node identifier for validator assignment (default: node_0)
Expand All @@ -32,11 +32,18 @@
from lean_spec.subspecs.forkchoice import Store
from lean_spec.subspecs.genesis import GenesisConfig
from lean_spec.subspecs.networking.client import LiveNetworkEventSource
from lean_spec.subspecs.networking.gossipsub import GossipTopic
from lean_spec.subspecs.networking.reqresp.message import Status
from lean_spec.subspecs.node import Node, NodeConfig
from lean_spec.subspecs.ssz.hash import hash_tree_root
from lean_spec.subspecs.validator import ValidatorRegistry
from lean_spec.types import Bytes32
from lean_spec.types import Bytes32, Uint64

# Fork identifier for gossip topics.
#
# Must match the fork string used by ream and other clients.
# For devnet, this is "devnet0".
GOSSIP_FORK_DIGEST = "devnet0"

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -172,6 +179,7 @@ def _init_from_genesis(
event_source=event_source,
network=event_source.reqresp_client,
validator_registry=validator_registry,
fork_digest=GOSSIP_FORK_DIGEST,
)

# Create and return the node.
Expand Down Expand Up @@ -279,6 +287,7 @@ async def _init_from_checkpoint(
event_source=event_source,
network=event_source.reqresp_client,
validator_registry=validator_registry,
fork_digest=GOSSIP_FORK_DIGEST,
)

# Create node and inject checkpoint store.
Expand All @@ -295,14 +304,71 @@ async def _init_from_checkpoint(
return None


def setup_logging(verbose: bool = False) -> None:
"""Configure logging for the node."""
class ColoredFormatter(logging.Formatter):
"""Logging formatter with ANSI colors for better readability."""

# ANSI color codes
GREY = "\x1b[38;5;244m"
BLUE = "\x1b[38;5;39m"
GREEN = "\x1b[38;5;40m"
YELLOW = "\x1b[38;5;220m"
RED = "\x1b[38;5;196m"
BOLD_RED = "\x1b[38;5;196;1m"
CYAN = "\x1b[38;5;51m"
RESET = "\x1b[0m"

LEVEL_COLORS = {
logging.DEBUG: GREY,
logging.INFO: GREEN,
logging.WARNING: YELLOW,
logging.ERROR: RED,
logging.CRITICAL: BOLD_RED,
}

def format(self, record: logging.LogRecord) -> str:
"""Format log record with colors."""
# Get color for this level
color = self.LEVEL_COLORS.get(record.levelno, self.RESET)

# Format timestamp in cyan
timestamp = self.formatTime(record, self.datefmt)
colored_time = f"{self.CYAN}{timestamp}{self.RESET}"

# Format level name with color
levelname = f"{color}{record.levelname:8}{self.RESET}"

# Format logger name in blue
name = f"{self.BLUE}{record.name}{self.RESET}"

# Format message
message = record.getMessage()

return f"{colored_time} {levelname} {name}: {message}"


def setup_logging(verbose: bool = False, no_color: bool = False) -> None:
"""Configure logging for the node with optional colors."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)

# Create handler
handler = logging.StreamHandler()
handler.setLevel(level)

# Use colored formatter unless disabled
if no_color:
formatter = logging.Formatter(
"%(asctime)s %(levelname)-8s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
else:
formatter = ColoredFormatter(datefmt="%Y-%m-%d %H:%M:%S")

handler.setFormatter(formatter)

# Configure root logger
root = logging.getLogger()
root.setLevel(level)
root.addHandler(handler)


async def run_node(
Expand All @@ -312,20 +378,40 @@ async def run_node(
checkpoint_sync_url: str | None = None,
validator_keys_path: Path | None = None,
node_id: str = "node_0",
genesis_time_now: bool = False,
) -> None:
"""
Run the lean consensus node.

Args:
genesis_path: Path to genesis JSON file.
genesis_path: Path to genesis YAML file (config.yaml).
bootnodes: List of bootnode multiaddrs to connect to.
listen_addr: Address to listen on.
checkpoint_sync_url: Optional URL to fetch checkpoint state for fast sync.
validator_keys_path: Optional path to validator keys directory.
node_id: Node identifier for validator assignment.
genesis_time_now: Override genesis time to current time for testing.
"""
import time

logger.info("Loading genesis from %s", genesis_path)
genesis = GenesisConfig.from_json_file(genesis_path)
genesis = GenesisConfig.from_yaml_file(genesis_path)

# Override genesis time for testing if requested
if genesis_time_now:
original_time = genesis.genesis_time
new_time = Uint64(int(time.time()))
# Create new config with updated genesis time.
#
# GenesisConfig is frozen, so we use model_copy to create
# a new instance with the updated field.
genesis = genesis.model_copy(update={"genesis_time": new_time})
logger.warning(
"Overriding genesis time: %d -> %d (now)",
original_time,
new_time,
)

logger.info(
"Genesis loaded: time=%d, validators=%d",
genesis.genesis_time,
Expand All @@ -336,25 +422,50 @@ async def run_node(
#
# The registry holds secret keys for validators assigned to this node.
# Without a registry, the node runs in passive mode (sync only).
#
# Expected directory structure (ream/zeam compatible):
# validators.yaml - node to validator index mapping
# hash-sig-keys/validator-keys-manifest.yaml - key metadata and file paths
validator_registry: ValidatorRegistry | None = None
if validator_keys_path is not None:
validator_registry = ValidatorRegistry.from_json(
node_id=node_id,
validators_path=validator_keys_path / "validators.json",
manifest_path=validator_keys_path / "hash-sig-keys/validator-keys-manifest.json",
)
if len(validator_registry) > 0:
validators_yaml = validator_keys_path / "validators.yaml"
manifest_path = validator_keys_path / "hash-sig-keys/validator-keys-manifest.yaml"

if manifest_path.exists():
validator_registry = ValidatorRegistry.from_yaml(
node_id=node_id,
validators_path=validators_yaml,
manifest_path=manifest_path,
)
else:
logger.error(
"Validator keys manifest not found: %s",
manifest_path,
)

if validator_registry is not None and len(validator_registry) > 0:
logger.info(
"Loaded %d validators for node %s: indices=%s",
len(validator_registry),
node_id,
validator_registry.indices(),
)
else:
elif validator_registry is not None:
logger.warning("No validators assigned to node %s", node_id)

event_source = LiveNetworkEventSource.create()

# Subscribe to gossip topics.
#
# We subscribe before connecting to bootnodes so that when
# we establish connections, we can immediately announce our
# subscriptions to peers.
block_topic = str(GossipTopic.block(GOSSIP_FORK_DIGEST))
attestation_topic = str(GossipTopic.attestation(GOSSIP_FORK_DIGEST))
event_source.subscribe_gossip_topic(block_topic)
event_source.subscribe_gossip_topic(attestation_topic)
logger.info("Subscribed to gossip topics: %s, %s", block_topic, attestation_topic)

# Two initialization paths: checkpoint sync or genesis sync.
#
# Checkpoint sync (preferred for mainnet/testnets):
Expand Down Expand Up @@ -417,9 +528,38 @@ async def run_node(
logger.warning("Invalid bootnode %s: %s", bootnode[:40], e)

# Start listening (in background).
#
# We start the listener as a background task, but give it a moment
# to bind the port. If binding fails (e.g., port already in use),
# we want to fail fast with a clear error rather than continue
# running without the ability to accept incoming connections.
listener_task = None
if listen_addr:
logger.info("Starting listener on %s", listen_addr)
asyncio.create_task(event_source.listen(listen_addr))
listener_task = asyncio.create_task(event_source.listen(listen_addr))

# Give the listener a moment to bind the port.
# If it fails immediately (e.g., "Address already in use"),
# the task will complete with an exception.
await asyncio.sleep(0.1)

if listener_task.done():
# Listener failed early - propagate the error.
try:
listener_task.result()
except OSError as e:
logger.error("Failed to start listener: %s", e)
logger.error(
"Port may be in use. Run './scripts/run_leanspec.sh clean' to free ports."
)
return

# Start gossipsub behavior.
#
# This starts the heartbeat loop and enables message forwarding.
# Must be called after subscribing to topics and connecting to peers.
logger.info("Starting gossipsub behavior...")
await event_source.start_gossipsub()

# Run the node.
logger.info("Starting consensus node...")
Expand All @@ -438,7 +578,7 @@ def main() -> None:
"--genesis",
required=True,
type=Path,
help="Path to genesis JSON file",
help="Path to genesis YAML file (config.yaml)",
)
parser.add_argument(
"--bootnode",
Expand All @@ -449,8 +589,8 @@ def main() -> None:
)
parser.add_argument(
"--listen",
default="/ip4/0.0.0.0/tcp/9000",
help="Address to listen on (default: /ip4/0.0.0.0/tcp/9000)",
default="/ip4/0.0.0.0/tcp/9001",
help="Address to listen on (default: /ip4/0.0.0.0/tcp/9001)",
)
parser.add_argument(
"--checkpoint-sync-url",
Expand All @@ -476,11 +616,23 @@ def main() -> None:
action="store_true",
help="Enable debug logging",
)
parser.add_argument(
"--no-color",
action="store_true",
help="Disable colored logging output",
)
parser.add_argument(
"--genesis-time-now",
action="store_true",
help="Override genesis time to current time (for testing)",
)

args = parser.parse_args()

setup_logging(args.verbose)
setup_logging(args.verbose, args.no_color)

# Use asyncio.run with proper task cancellation on interrupt.
# This ensures all tasks are cancelled and resources are released.
try:
asyncio.run(
run_node(
Expand All @@ -490,10 +642,21 @@ def main() -> None:
args.checkpoint_sync_url,
args.validator_keys,
args.node_id,
args.genesis_time_now,
)
)
except KeyboardInterrupt:
# asyncio.run() handles task cancellation, but we log for clarity.
logger.info("Shutting down...")
finally:
# Force exit to ensure all threads/sockets are released.
# This is important for QUIC which may have background threads.
import os
import sys

sys.stdout.flush()
sys.stderr.flush()
os._exit(0)


if __name__ == "__main__":
Expand Down
Loading
Loading