Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lean_spec/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ async def run_node(

# Run the node.
logger.info("Starting consensus node...")
event_source._running = True
event_source._stop_event.clear()
await node.run()


Expand Down
6 changes: 5 additions & 1 deletion src/lean_spec/subspecs/koalabear/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from typing import IO, Final, Self
from typing import IO, Final, Self, override

from lean_spec.types import SSZType

Expand Down Expand Up @@ -48,21 +48,25 @@ def __init__(self, value: int) -> None:
self.value: int = value % P

@classmethod
@override
def is_fixed_size(cls) -> bool:
"""Fp elements are fixed-size (4 bytes)."""
return True

@classmethod
@override
def get_byte_length(cls) -> int:
"""Get the byte length of an Fp element."""
return P_BYTES

@override
def serialize(self, stream: IO[bytes]) -> int:
"""Serialize the field element to a binary stream."""
stream.write(self.value.to_bytes(P_BYTES, byteorder="little"))
return P_BYTES

@classmethod
@override
def deserialize(cls, stream: IO[bytes], scope: int) -> Self:
"""Deserialize a field element from a binary stream."""
if scope != P_BYTES:
Expand Down
28 changes: 12 additions & 16 deletions src/lean_spec/subspecs/networking/client/event_source/live.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,13 @@ class LiveNetworkEventSource:
Used to validate incoming messages belong to the same fork.
"""

_running: bool = False
"""Whether the event source is running.

Controls the main loop and background tasks.
"""

_stop_event: asyncio.Event = field(default_factory=asyncio.Event)
"""Set when the event source stops.
"""Lifecycle signal.

Lets awaiters wake immediately rather than poll the running flag.
Set means "stopped": fresh instances start stopped (the event is
forced to the set state in __post_init__) and stay stopped until
dial or listen clears it. Awaiters of wait() wake the moment
stop() sets it again.
"""

_gossip_handler: GossipHandler = field(init=False)
Expand Down Expand Up @@ -258,6 +255,8 @@ def __post_init__(self) -> None:
self._reqresp_handler = RequestHandler()
self._reqresp_server = ReqRespServer(handler=self._reqresp_handler)
self._gossipsub_behavior = GossipsubBehavior(params=GossipsubParameters())
# Initial lifecycle: stopped. dial() or listen() clears the event to start.
self._stop_event.set()

@classmethod
async def create(
Expand Down Expand Up @@ -377,7 +376,7 @@ async def start_gossipsub(self) -> None:
async def _forward_gossipsub_events(self) -> None:
"""Forward events from GossipsubBehavior to our event queue."""
try:
while self._running:
while not self._stop_event.is_set():
event = await self._gossipsub_behavior.get_next_event()
if event is None:
# Stopped or no event.
Expand Down Expand Up @@ -447,7 +446,7 @@ async def __anext__(self) -> NetworkEvent:
Raises:
StopAsyncIteration: When no more events will arrive.
"""
if not self._running:
if self._stop_event.is_set():
raise StopAsyncIteration

# Race the queue against the stop signal.
Expand Down Expand Up @@ -479,10 +478,9 @@ async def dial(self, multiaddr: str) -> PeerId | None:
Returns:
Peer ID on success, None on failure.
"""
# Ensure _running is set so background tasks (like _accept_streams)
# Clear the stop signal so background tasks (like _accept_streams)
# can operate. Without this, _accept_streams exits immediately if
# dial() is called before listen().
self._running = True
self._stop_event.clear()

try:
Expand Down Expand Up @@ -565,7 +563,6 @@ async def listen(self, multiaddr: str) -> None:
Args:
multiaddr: Address to listen on (e.g., "/ip4/0.0.0.0/udp/9000/quic-v1").
"""
self._running = True
self._stop_event.clear()

if is_quic_multiaddr(multiaddr):
Expand Down Expand Up @@ -713,7 +710,6 @@ async def disconnect(self, peer_id: PeerId) -> None:

async def stop(self) -> None:
"""Stop the event source and cancel background tasks."""
self._running = False
self._stop_event.set()

# Cancel gossip tasks first (including event forwarding task).
Expand Down Expand Up @@ -830,9 +826,9 @@ async def _accept_streams(self, peer_id: PeerId, conn: QuicConnection) -> None:
# Main loop: accept streams until shutdown or disconnection.
#
# The loop continues as long as:
# - We haven't been told to stop (_running is True).
# - We haven't been told to stop (stop event is clear).
# - The peer is still connected (peer_id in _connections).
while self._running and peer_id in self._connections:
while not self._stop_event.is_set() and peer_id in self._connections:
try:
# Accept the next incoming stream.
#
Expand Down
27 changes: 15 additions & 12 deletions src/lean_spec/subspecs/networking/gossipsub/behavior.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,17 @@ class GossipsubBehavior:
)
"""Queue of events for the application."""

_running: bool = False
"""Whether the behavior is running."""

_heartbeat_task: asyncio.Task[None] | None = None
"""Background heartbeat task."""

_stop_event: asyncio.Event = field(default_factory=asyncio.Event)
"""Event to signal stop to the events generator."""
"""Lifecycle signal.

Set means "stopped": fresh instances start stopped (the event is
forced to the set state in __post_init__) and stay stopped until
start clears it. Awaiters of wait() wake the moment stop sets it
again.
"""

_background_tasks: set[asyncio.Task[None]] = field(default_factory=set)
"""Tracked background tasks for subscribe/unsubscribe broadcasts.
Expand All @@ -252,6 +255,8 @@ def __post_init__(self) -> None:
mcache_gossip=self.params.mcache_gossip,
)
self.seen_cache = SeenCache(ttl_seconds=self.params.seen_ttl_secs)
# Initial lifecycle: stopped. start() clears the event to begin running.
self._stop_event.set()

def subscribe(self, topic: TopicId) -> None:
"""Subscribe to a topic.
Expand All @@ -273,7 +278,7 @@ def subscribe(self, topic: TopicId) -> None:

logger.debug("Subscribed to topic %s", topic)

if self._running:
if not self._stop_event.is_set():
self._spawn_background_task(self._broadcast_subscription(topic, subscribe=True))

def unsubscribe(self, topic: TopicId) -> None:
Expand All @@ -297,24 +302,22 @@ def unsubscribe(self, topic: TopicId) -> None:

logger.debug("Unsubscribed from topic %s", topic)

if self._running:
if not self._stop_event.is_set():
self._spawn_background_task(
self._broadcast_subscription(topic, subscribe=False, prune_peers=mesh_peers)
)

async def start(self) -> None:
"""Start the gossipsub behavior (heartbeat loop)."""
if self._running:
if not self._stop_event.is_set():
return

self._running = True
self._stop_event.clear()
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
logger.info("[GS %x] GossipsubBehavior started", self._short_id)

async def stop(self) -> None:
"""Stop the gossipsub behavior."""
self._running = False

self._stop_event.set()

if self._heartbeat_task:
Expand Down Expand Up @@ -512,7 +515,7 @@ async def get_next_event(

Returns None when stopped or no event available.
"""
if not self._running:
if self._stop_event.is_set():
return None

# Race the queue against the stop signal.
Expand Down Expand Up @@ -754,7 +757,7 @@ async def _heartbeat_loop(self) -> None:
"""Background heartbeat for mesh maintenance."""
interval = self.params.heartbeat_interval_secs

while self._running:
while not self._stop_event.is_set():
try:
await asyncio.sleep(interval)
await self._heartbeat()
Expand Down
Loading
Loading