diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index ee1f823c..ce03cdd5 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -657,7 +657,7 @@ async def run_node( # Run the node. logger.info("Starting consensus node...") - event_source._running = True + event_source._stop_event.clear() await node.run() diff --git a/src/lean_spec/subspecs/koalabear/field.py b/src/lean_spec/subspecs/koalabear/field.py index 6869b1a2..6cf942cf 100644 --- a/src/lean_spec/subspecs/koalabear/field.py +++ b/src/lean_spec/subspecs/koalabear/field.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import IO, Final, Self +from typing import IO, Final, Self, override from lean_spec.types import SSZType @@ -48,21 +48,25 @@ def __init__(self, value: int) -> None: self.value: int = value % P @classmethod + @override def is_fixed_size(cls) -> bool: """Fp elements are fixed-size (4 bytes).""" return True @classmethod + @override def get_byte_length(cls) -> int: """Get the byte length of an Fp element.""" return P_BYTES + @override def serialize(self, stream: IO[bytes]) -> int: """Serialize the field element to a binary stream.""" stream.write(self.value.to_bytes(P_BYTES, byteorder="little")) return P_BYTES @classmethod + @override def deserialize(cls, stream: IO[bytes], scope: int) -> Self: """Deserialize a field element from a binary stream.""" if scope != P_BYTES: diff --git a/src/lean_spec/subspecs/networking/client/event_source/live.py b/src/lean_spec/subspecs/networking/client/event_source/live.py index ce7bc92f..6b7ad753 100644 --- a/src/lean_spec/subspecs/networking/client/event_source/live.py +++ b/src/lean_spec/subspecs/networking/client/event_source/live.py @@ -206,16 +206,13 @@ class LiveNetworkEventSource: Used to validate incoming messages belong to the same fork. """ - _running: bool = False - """Whether the event source is running. - - Controls the main loop and background tasks. - """ - _stop_event: asyncio.Event = field(default_factory=asyncio.Event) - """Set when the event source stops. + """Lifecycle signal. - Lets awaiters wake immediately rather than poll the running flag. + Set means "stopped": fresh instances start stopped (the event is + forced to the set state in __post_init__) and stay stopped until + dial or listen clears it. Awaiters of wait() wake the moment + stop() sets it again. """ _gossip_handler: GossipHandler = field(init=False) @@ -258,6 +255,8 @@ def __post_init__(self) -> None: self._reqresp_handler = RequestHandler() self._reqresp_server = ReqRespServer(handler=self._reqresp_handler) self._gossipsub_behavior = GossipsubBehavior(params=GossipsubParameters()) + # Initial lifecycle: stopped. dial() or listen() clears the event to start. + self._stop_event.set() @classmethod async def create( @@ -377,7 +376,7 @@ async def start_gossipsub(self) -> None: async def _forward_gossipsub_events(self) -> None: """Forward events from GossipsubBehavior to our event queue.""" try: - while self._running: + while not self._stop_event.is_set(): event = await self._gossipsub_behavior.get_next_event() if event is None: # Stopped or no event. @@ -447,7 +446,7 @@ async def __anext__(self) -> NetworkEvent: Raises: StopAsyncIteration: When no more events will arrive. """ - if not self._running: + if self._stop_event.is_set(): raise StopAsyncIteration # Race the queue against the stop signal. @@ -479,10 +478,9 @@ async def dial(self, multiaddr: str) -> PeerId | None: Returns: Peer ID on success, None on failure. """ - # Ensure _running is set so background tasks (like _accept_streams) + # Clear the stop signal so background tasks (like _accept_streams) # can operate. Without this, _accept_streams exits immediately if # dial() is called before listen(). - self._running = True self._stop_event.clear() try: @@ -565,7 +563,6 @@ async def listen(self, multiaddr: str) -> None: Args: multiaddr: Address to listen on (e.g., "/ip4/0.0.0.0/udp/9000/quic-v1"). """ - self._running = True self._stop_event.clear() if is_quic_multiaddr(multiaddr): @@ -713,7 +710,6 @@ async def disconnect(self, peer_id: PeerId) -> None: async def stop(self) -> None: """Stop the event source and cancel background tasks.""" - self._running = False self._stop_event.set() # Cancel gossip tasks first (including event forwarding task). @@ -830,9 +826,9 @@ async def _accept_streams(self, peer_id: PeerId, conn: QuicConnection) -> None: # Main loop: accept streams until shutdown or disconnection. # # The loop continues as long as: - # - We haven't been told to stop (_running is True). + # - We haven't been told to stop (stop event is clear). # - The peer is still connected (peer_id in _connections). - while self._running and peer_id in self._connections: + while not self._stop_event.is_set() and peer_id in self._connections: try: # Accept the next incoming stream. # diff --git a/src/lean_spec/subspecs/networking/gossipsub/behavior.py b/src/lean_spec/subspecs/networking/gossipsub/behavior.py index 994e3d91..2515ba87 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/behavior.py +++ b/src/lean_spec/subspecs/networking/gossipsub/behavior.py @@ -223,14 +223,17 @@ class GossipsubBehavior: ) """Queue of events for the application.""" - _running: bool = False - """Whether the behavior is running.""" - _heartbeat_task: asyncio.Task[None] | None = None """Background heartbeat task.""" _stop_event: asyncio.Event = field(default_factory=asyncio.Event) - """Event to signal stop to the events generator.""" + """Lifecycle signal. + + Set means "stopped": fresh instances start stopped (the event is + forced to the set state in __post_init__) and stay stopped until + start clears it. Awaiters of wait() wake the moment stop sets it + again. + """ _background_tasks: set[asyncio.Task[None]] = field(default_factory=set) """Tracked background tasks for subscribe/unsubscribe broadcasts. @@ -252,6 +255,8 @@ def __post_init__(self) -> None: mcache_gossip=self.params.mcache_gossip, ) self.seen_cache = SeenCache(ttl_seconds=self.params.seen_ttl_secs) + # Initial lifecycle: stopped. start() clears the event to begin running. + self._stop_event.set() def subscribe(self, topic: TopicId) -> None: """Subscribe to a topic. @@ -273,7 +278,7 @@ def subscribe(self, topic: TopicId) -> None: logger.debug("Subscribed to topic %s", topic) - if self._running: + if not self._stop_event.is_set(): self._spawn_background_task(self._broadcast_subscription(topic, subscribe=True)) def unsubscribe(self, topic: TopicId) -> None: @@ -297,24 +302,22 @@ def unsubscribe(self, topic: TopicId) -> None: logger.debug("Unsubscribed from topic %s", topic) - if self._running: + if not self._stop_event.is_set(): self._spawn_background_task( self._broadcast_subscription(topic, subscribe=False, prune_peers=mesh_peers) ) async def start(self) -> None: """Start the gossipsub behavior (heartbeat loop).""" - if self._running: + if not self._stop_event.is_set(): return - self._running = True + self._stop_event.clear() self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) logger.info("[GS %x] GossipsubBehavior started", self._short_id) async def stop(self) -> None: """Stop the gossipsub behavior.""" - self._running = False - self._stop_event.set() if self._heartbeat_task: @@ -512,7 +515,7 @@ async def get_next_event( Returns None when stopped or no event available. """ - if not self._running: + if self._stop_event.is_set(): return None # Race the queue against the stop signal. @@ -754,7 +757,7 @@ async def _heartbeat_loop(self) -> None: """Background heartbeat for mesh maintenance.""" interval = self.params.heartbeat_interval_secs - while self._running: + while not self._stop_event.is_set(): try: await asyncio.sleep(interval) await self._heartbeat() diff --git a/src/lean_spec/subspecs/storage/namespaces.py b/src/lean_spec/subspecs/storage/namespaces.py index 511f018d..141a0bc5 100644 --- a/src/lean_spec/subspecs/storage/namespaces.py +++ b/src/lean_spec/subspecs/storage/namespaces.py @@ -2,155 +2,98 @@ Database namespace definitions for storage tables. Defines table names and schema constants for SQLite storage. -Each namespace represents a logical grouping of related data. +Each prefix marks a logical grouping of related data. """ from __future__ import annotations -from dataclasses import dataclass from typing import Final +# Blocks: SSZ root primary key, SSZ-encoded bytes stored directly. -@dataclass(frozen=True, slots=True) -class BlockNamespace: - """ - Namespace for block storage. +BLOCKS_TABLE_NAME: Final = "blocks" +"""Table name for block storage.""" - Blocks are stored by their SSZ root hash. - SSZ-encoded bytes are stored directly. - """ - - TABLE_NAME: str = "blocks" - """Table name for block storage.""" - - CREATE_TABLE: str = """ - CREATE TABLE IF NOT EXISTS blocks ( - root BLOB PRIMARY KEY, - slot INTEGER NOT NULL, - data BLOB NOT NULL - ) - """ - """SQL to create blocks table.""" - - CREATE_INDEX: str = """ - CREATE INDEX IF NOT EXISTS idx_blocks_slot ON blocks(slot) - """ - """SQL to create slot index.""" - - -@dataclass(frozen=True, slots=True) -class StateNamespace: - """ - Namespace for state storage. - - States are stored by their SSZ root hash. - SSZ-encoded bytes are stored directly. - """ - - TABLE_NAME: str = "states" - """Table name for state storage.""" - - CREATE_TABLE: str = """ - CREATE TABLE IF NOT EXISTS states ( - root BLOB PRIMARY KEY, - slot INTEGER NOT NULL, - data BLOB NOT NULL - ) - """ - """SQL to create states table.""" - - CREATE_INDEX: str = """ - CREATE INDEX IF NOT EXISTS idx_states_slot ON states(slot) - """ - """SQL to create slot index.""" - - -@dataclass(frozen=True, slots=True) -class CheckpointNamespace: - """ - Namespace for checkpoint tracking. - - Stores latest justified and finalized checkpoints. - Uses a key-value pattern with fixed keys. - """ - - TABLE_NAME: str = "checkpoints" - """Table name for checkpoint storage.""" - - KEY_JUSTIFIED: str = "justified" - """Key for justified checkpoint.""" - - KEY_FINALIZED: str = "finalized" - """Key for finalized checkpoint.""" - - KEY_HEAD: str = "head" - """Key for head block root.""" +BLOCKS_CREATE_TABLE: Final = """ + CREATE TABLE IF NOT EXISTS blocks ( + root BLOB PRIMARY KEY, + slot INTEGER NOT NULL, + data BLOB NOT NULL + ) +""" +"""SQL to create blocks table.""" - KEY_GENESIS_TIME: str = "genesis_time" - """Key for genesis time. Enables self-contained restarts without external config.""" +BLOCKS_CREATE_INDEX: Final = """ + CREATE INDEX IF NOT EXISTS idx_blocks_slot ON blocks(slot) +""" +"""SQL to create slot index.""" - CREATE_TABLE: str = """ - CREATE TABLE IF NOT EXISTS checkpoints ( - key TEXT PRIMARY KEY, - data BLOB NOT NULL - ) - """ - """SQL to create checkpoints table.""" +# States: SSZ root primary key, SSZ-encoded bytes stored directly. +STATES_TABLE_NAME: Final = "states" +"""Table name for state storage.""" -@dataclass(frozen=True, slots=True) -class SlotIndexNamespace: - """ - Namespace for slot-to-root mapping. +STATES_CREATE_TABLE: Final = """ + CREATE TABLE IF NOT EXISTS states ( + root BLOB PRIMARY KEY, + slot INTEGER NOT NULL, + data BLOB NOT NULL + ) +""" +"""SQL to create states table.""" - Enables lookup of block by slot number. - Used for historical queries. - """ +STATES_CREATE_INDEX: Final = """ + CREATE INDEX IF NOT EXISTS idx_states_slot ON states(slot) +""" +"""SQL to create slot index.""" - TABLE_NAME: str = "slot_index" - """Table name for slot index.""" +# Checkpoints: key-value table with fixed keys for justified/finalized/head/genesis-time. - CREATE_TABLE: str = """ - CREATE TABLE IF NOT EXISTS slot_index ( - slot INTEGER PRIMARY KEY, - root BLOB NOT NULL - ) - """ - """SQL to create slot index table.""" +CHECKPOINTS_TABLE_NAME: Final = "checkpoints" +"""Table name for checkpoint storage.""" +CHECKPOINTS_KEY_JUSTIFIED: Final = "justified" +"""Key for justified checkpoint.""" -@dataclass(frozen=True, slots=True) -class StateRootIndexNamespace: - """ - Namespace for state root to block root mapping. +CHECKPOINTS_KEY_FINALIZED: Final = "finalized" +"""Key for finalized checkpoint.""" - Enables lookup of block root by state root. - Needed for checkpoint sync and API queries by state root. - """ +CHECKPOINTS_KEY_HEAD: Final = "head" +"""Key for head block root.""" - TABLE_NAME: str = "state_root_index" - """Table name for state root index.""" +CHECKPOINTS_KEY_GENESIS_TIME: Final = "genesis_time" +"""Key for genesis time. Enables self-contained restarts without external config.""" - CREATE_TABLE: str = """ - CREATE TABLE IF NOT EXISTS state_root_index ( - state_root BLOB PRIMARY KEY, - block_root BLOB NOT NULL - ) - """ - """SQL to create state root index table.""" +CHECKPOINTS_CREATE_TABLE: Final = """ + CREATE TABLE IF NOT EXISTS checkpoints ( + key TEXT PRIMARY KEY, + data BLOB NOT NULL + ) +""" +"""SQL to create checkpoints table.""" +# Slot index: slot-to-root mapping for historical queries. -BLOCKS: Final = BlockNamespace() -"""Block storage namespace.""" +SLOT_INDEX_TABLE_NAME: Final = "slot_index" +"""Table name for slot index.""" -STATES: Final = StateNamespace() -"""State storage namespace.""" +SLOT_INDEX_CREATE_TABLE: Final = """ + CREATE TABLE IF NOT EXISTS slot_index ( + slot INTEGER PRIMARY KEY, + root BLOB NOT NULL + ) +""" +"""SQL to create slot index table.""" -CHECKPOINTS: Final = CheckpointNamespace() -"""Checkpoint tracking namespace.""" +# State root index: state-root-to-block-root mapping for checkpoint sync and API. -SLOT_INDEX: Final = SlotIndexNamespace() -"""Slot-to-root index namespace.""" +STATE_ROOT_INDEX_TABLE_NAME: Final = "state_root_index" +"""Table name for state root index.""" -STATE_ROOT_INDEX: Final = StateRootIndexNamespace() -"""State root to block root index namespace.""" +STATE_ROOT_INDEX_CREATE_TABLE: Final = """ + CREATE TABLE IF NOT EXISTS state_root_index ( + state_root BLOB PRIMARY KEY, + block_root BLOB NOT NULL + ) +""" +"""SQL to create state root index table.""" diff --git a/src/lean_spec/subspecs/storage/sqlite.py b/src/lean_spec/subspecs/storage/sqlite.py index 34b12a75..f91db8ed 100644 --- a/src/lean_spec/subspecs/storage/sqlite.py +++ b/src/lean_spec/subspecs/storage/sqlite.py @@ -31,11 +31,22 @@ from .exceptions import StorageCorruptionError, StorageReadError, StorageWriteError from .namespaces import ( - BLOCKS, - CHECKPOINTS, - SLOT_INDEX, - STATE_ROOT_INDEX, - STATES, + BLOCKS_CREATE_INDEX, + BLOCKS_CREATE_TABLE, + BLOCKS_TABLE_NAME, + CHECKPOINTS_CREATE_TABLE, + CHECKPOINTS_KEY_FINALIZED, + CHECKPOINTS_KEY_GENESIS_TIME, + CHECKPOINTS_KEY_HEAD, + CHECKPOINTS_KEY_JUSTIFIED, + CHECKPOINTS_TABLE_NAME, + SLOT_INDEX_CREATE_TABLE, + SLOT_INDEX_TABLE_NAME, + STATE_ROOT_INDEX_CREATE_TABLE, + STATE_ROOT_INDEX_TABLE_NAME, + STATES_CREATE_INDEX, + STATES_CREATE_TABLE, + STATES_TABLE_NAME, ) @@ -97,25 +108,25 @@ def _init_schema(self) -> None: # # This matches how consensus clients identify data: by SSZ merkle root. # The slot index enables efficient range queries for historical data. - cursor.execute(BLOCKS.CREATE_TABLE) - cursor.execute(BLOCKS.CREATE_INDEX) - cursor.execute(STATES.CREATE_TABLE) - cursor.execute(STATES.CREATE_INDEX) + cursor.execute(BLOCKS_CREATE_TABLE) + cursor.execute(BLOCKS_CREATE_INDEX) + cursor.execute(STATES_CREATE_TABLE) + cursor.execute(STATES_CREATE_INDEX) # Checkpoints use a key-value pattern for singleton values. # # Only one justified and one finalized checkpoint exist at any time. - cursor.execute(CHECKPOINTS.CREATE_TABLE) + cursor.execute(CHECKPOINTS_CREATE_TABLE) # Slot index maps slot numbers to block roots. # # Enables queries like "what block was at slot N?" - cursor.execute(SLOT_INDEX.CREATE_TABLE) + cursor.execute(SLOT_INDEX_CREATE_TABLE) # State root index maps state roots to block roots. # # Needed for checkpoint sync and API endpoints that query by state root. - cursor.execute(STATE_ROOT_INDEX.CREATE_TABLE) + cursor.execute(STATE_ROOT_INDEX_CREATE_TABLE) self._conn.commit() @@ -131,7 +142,7 @@ def get_block(self, root: Bytes32) -> SpecBlockType | None: # The root is the SSZ merkle root of the block. # This 32-byte hash uniquely identifies the block content. cursor.execute( - f"SELECT data FROM {BLOCKS.TABLE_NAME} WHERE root = ?", + f"SELECT data FROM {BLOCKS_TABLE_NAME} WHERE root = ?", (bytes(root),), ) row = cursor.fetchone() @@ -158,7 +169,7 @@ def put_block(self, block: SpecBlockType, root: Bytes32) -> None: # The slot column enables efficient historical range queries. cursor.execute( f""" - INSERT OR REPLACE INTO {BLOCKS.TABLE_NAME} (root, slot, data) + INSERT OR REPLACE INTO {BLOCKS_TABLE_NAME} (root, slot, data) VALUES (?, ?, ?) """, (bytes(root), int(block.slot), block.encode_bytes()), @@ -178,7 +189,7 @@ def get_state(self, root: Bytes32) -> SpecStateType | None: try: cursor = self._conn.cursor() cursor.execute( - f"SELECT data FROM {STATES.TABLE_NAME} WHERE root = ?", + f"SELECT data FROM {STATES_TABLE_NAME} WHERE root = ?", (bytes(root),), ) row = cursor.fetchone() @@ -206,7 +217,7 @@ def put_state(self, state: SpecStateType, root: Bytes32) -> None: # storage costs against replay costs for intermediate slots. cursor.execute( f""" - INSERT OR REPLACE INTO {STATES.TABLE_NAME} (root, slot, data) + INSERT OR REPLACE INTO {STATES_TABLE_NAME} (root, slot, data) VALUES (?, ?, ?) """, (bytes(root), int(state.slot), state.encode_bytes()), @@ -231,8 +242,8 @@ def get_justified_checkpoint(self) -> Checkpoint | None: # This checkpoint may still be reverted if a competing # checkpoint gains more support. Not yet final. cursor.execute( - f"SELECT data FROM {CHECKPOINTS.TABLE_NAME} WHERE key = ?", - (CHECKPOINTS.KEY_JUSTIFIED,), + f"SELECT data FROM {CHECKPOINTS_TABLE_NAME} WHERE key = ?", + (CHECKPOINTS_KEY_JUSTIFIED,), ) row = cursor.fetchone() except sqlite3.Error as e: @@ -252,10 +263,10 @@ def put_justified_checkpoint(self, checkpoint: Checkpoint) -> None: cursor = self._conn.cursor() cursor.execute( f""" - INSERT OR REPLACE INTO {CHECKPOINTS.TABLE_NAME} (key, data) + INSERT OR REPLACE INTO {CHECKPOINTS_TABLE_NAME} (key, data) VALUES (?, ?) """, - (CHECKPOINTS.KEY_JUSTIFIED, checkpoint.encode_bytes()), + (CHECKPOINTS_KEY_JUSTIFIED, checkpoint.encode_bytes()), ) except sqlite3.Error as e: raise StorageWriteError(f"Failed to write justified checkpoint: {e}") from e @@ -270,8 +281,8 @@ def get_finalized_checkpoint(self) -> Checkpoint | None: # Once finalized, all blocks in the checkpoint's chain are permanent. # Reorging past finality requires 1/3 validators to be slashed. cursor.execute( - f"SELECT data FROM {CHECKPOINTS.TABLE_NAME} WHERE key = ?", - (CHECKPOINTS.KEY_FINALIZED,), + f"SELECT data FROM {CHECKPOINTS_TABLE_NAME} WHERE key = ?", + (CHECKPOINTS_KEY_FINALIZED,), ) row = cursor.fetchone() except sqlite3.Error as e: @@ -291,10 +302,10 @@ def put_finalized_checkpoint(self, checkpoint: Checkpoint) -> None: cursor = self._conn.cursor() cursor.execute( f""" - INSERT OR REPLACE INTO {CHECKPOINTS.TABLE_NAME} (key, data) + INSERT OR REPLACE INTO {CHECKPOINTS_TABLE_NAME} (key, data) VALUES (?, ?) """, - (CHECKPOINTS.KEY_FINALIZED, checkpoint.encode_bytes()), + (CHECKPOINTS_KEY_FINALIZED, checkpoint.encode_bytes()), ) except sqlite3.Error as e: raise StorageWriteError(f"Failed to write finalized checkpoint: {e}") from e @@ -315,8 +326,8 @@ def get_head_root(self) -> Bytes32 | None: # Fork choice updates this after processing each new block. # Stored in the checkpoints table as a special singleton key. cursor.execute( - f"SELECT data FROM {CHECKPOINTS.TABLE_NAME} WHERE key = ?", - (CHECKPOINTS.KEY_HEAD,), + f"SELECT data FROM {CHECKPOINTS_TABLE_NAME} WHERE key = ?", + (CHECKPOINTS_KEY_HEAD,), ) row = cursor.fetchone() except sqlite3.Error as e: @@ -334,10 +345,10 @@ def put_head_root(self, root: Bytes32) -> None: cursor = self._conn.cursor() cursor.execute( f""" - INSERT OR REPLACE INTO {CHECKPOINTS.TABLE_NAME} (key, data) + INSERT OR REPLACE INTO {CHECKPOINTS_TABLE_NAME} (key, data) VALUES (?, ?) """, - (CHECKPOINTS.KEY_HEAD, bytes(root)), + (CHECKPOINTS_KEY_HEAD, bytes(root)), ) except sqlite3.Error as e: raise StorageWriteError(f"Failed to write head root: {e}") from e @@ -359,7 +370,7 @@ def get_block_root_by_slot(self, slot: Slot) -> Bytes32 | None: # A slot may have no block (proposer missed their turn). # A slot may have had multiple competing blocks (only one is canonical). cursor.execute( - f"SELECT root FROM {SLOT_INDEX.TABLE_NAME} WHERE slot = ?", + f"SELECT root FROM {SLOT_INDEX_TABLE_NAME} WHERE slot = ?", (int(slot),), ) row = cursor.fetchone() @@ -381,7 +392,7 @@ def put_block_root_by_slot(self, slot: Slot, root: Bytes32) -> None: # at different times. This always reflects the current canonical chain. cursor.execute( f""" - INSERT OR REPLACE INTO {SLOT_INDEX.TABLE_NAME} (slot, root) + INSERT OR REPLACE INTO {SLOT_INDEX_TABLE_NAME} (slot, root) VALUES (?, ?) """, (int(slot), bytes(root)), @@ -401,7 +412,7 @@ def get_block_root_by_state_root(self, state_root: Bytes32) -> Bytes32 | None: try: cursor = self._conn.cursor() cursor.execute( - f"SELECT block_root FROM {STATE_ROOT_INDEX.TABLE_NAME} WHERE state_root = ?", + f"SELECT block_root FROM {STATE_ROOT_INDEX_TABLE_NAME} WHERE state_root = ?", (bytes(state_root),), ) row = cursor.fetchone() @@ -420,7 +431,7 @@ def put_block_root_by_state_root(self, state_root: Bytes32, block_root: Bytes32) cursor = self._conn.cursor() cursor.execute( f""" - INSERT OR REPLACE INTO {STATE_ROOT_INDEX.TABLE_NAME} (state_root, block_root) + INSERT OR REPLACE INTO {STATE_ROOT_INDEX_TABLE_NAME} (state_root, block_root) VALUES (?, ?) """, (bytes(state_root), bytes(block_root)), @@ -441,8 +452,8 @@ def get_genesis_time(self) -> Uint64 | None: try: cursor = self._conn.cursor() cursor.execute( - f"SELECT data FROM {CHECKPOINTS.TABLE_NAME} WHERE key = ?", - (CHECKPOINTS.KEY_GENESIS_TIME,), + f"SELECT data FROM {CHECKPOINTS_TABLE_NAME} WHERE key = ?", + (CHECKPOINTS_KEY_GENESIS_TIME,), ) row = cursor.fetchone() except sqlite3.Error as e: @@ -460,10 +471,10 @@ def put_genesis_time(self, genesis_time: Uint64) -> None: cursor = self._conn.cursor() cursor.execute( f""" - INSERT OR REPLACE INTO {CHECKPOINTS.TABLE_NAME} (key, data) + INSERT OR REPLACE INTO {CHECKPOINTS_TABLE_NAME} (key, data) VALUES (?, ?) """, - (CHECKPOINTS.KEY_GENESIS_TIME, int(genesis_time).to_bytes(8, byteorder="little")), + (CHECKPOINTS_KEY_GENESIS_TIME, int(genesis_time).to_bytes(8, byteorder="little")), ) except sqlite3.Error as e: raise StorageWriteError(f"Failed to write genesis time: {e}") from e @@ -522,13 +533,13 @@ def prune_before_slot(self, slot: Slot, keep_roots: frozenset[Bytes32]) -> int: # Prune blocks below the threshold, preserving kept roots. if keep_bytes: cursor.execute( - f"DELETE FROM {BLOCKS.TABLE_NAME} " + f"DELETE FROM {BLOCKS_TABLE_NAME} " f"WHERE slot < ? AND root NOT IN ({placeholders})", [int(slot), *keep_bytes], ) else: cursor.execute( - f"DELETE FROM {BLOCKS.TABLE_NAME} WHERE slot < ?", + f"DELETE FROM {BLOCKS_TABLE_NAME} WHERE slot < ?", (int(slot),), ) total_pruned += cursor.rowcount @@ -536,20 +547,20 @@ def prune_before_slot(self, slot: Slot, keep_roots: frozenset[Bytes32]) -> int: # Prune states below the threshold, preserving kept roots. if keep_bytes: cursor.execute( - f"DELETE FROM {STATES.TABLE_NAME} " + f"DELETE FROM {STATES_TABLE_NAME} " f"WHERE slot < ? AND root NOT IN ({placeholders})", [int(slot), *keep_bytes], ) else: cursor.execute( - f"DELETE FROM {STATES.TABLE_NAME} WHERE slot < ?", + f"DELETE FROM {STATES_TABLE_NAME} WHERE slot < ?", (int(slot),), ) total_pruned += cursor.rowcount # Prune slot index entries below the threshold. cursor.execute( - f"DELETE FROM {SLOT_INDEX.TABLE_NAME} WHERE slot < ?", + f"DELETE FROM {SLOT_INDEX_TABLE_NAME} WHERE slot < ?", (int(slot),), ) total_pruned += cursor.rowcount diff --git a/src/lean_spec/subspecs/xmss/constants.py b/src/lean_spec/subspecs/xmss/constants.py index 212264d0..9aefb256 100644 --- a/src/lean_spec/subspecs/xmss/constants.py +++ b/src/lean_spec/subspecs/xmss/constants.py @@ -20,7 +20,7 @@ from lean_spec.types import StrictBaseModel, Uint64 from lean_spec.types.constants import OFFSET_BYTE_LENGTH -from ..koalabear import P_BYTES, Fp, P +from ..koalabear import P_BYTES, P class XmssConfig(StrictBaseModel): @@ -158,13 +158,13 @@ def SIGNATURE_LEN_BYTES(self) -> int: # noqa: N802 """Lightweight XMSS configuration for fast test execution.""" -TWEAK_PREFIX_CHAIN: Final = Fp(value=0x00) +TWEAK_PREFIX_CHAIN: Final[int] = 0x00 """The unique prefix for tweaks used in Winternitz-style hash chains.""" -TWEAK_PREFIX_TREE: Final = Fp(value=0x01) +TWEAK_PREFIX_TREE: Final[int] = 0x01 """The unique prefix for tweaks used when hashing Merkle tree nodes.""" -TWEAK_PREFIX_MESSAGE: Final = Fp(value=0x02) +TWEAK_PREFIX_MESSAGE: Final[int] = 0x02 """The unique prefix for tweaks used in the initial message hashing step.""" PRF_KEY_LENGTH: Final = 32 diff --git a/src/lean_spec/subspecs/xmss/message_hash.py b/src/lean_spec/subspecs/xmss/message_hash.py index d9619aa4..9096a252 100644 --- a/src/lean_spec/subspecs/xmss/message_hash.py +++ b/src/lean_spec/subspecs/xmss/message_hash.py @@ -80,7 +80,7 @@ def encode_epoch(self, epoch: Uint64) -> list[Fp]: hash input in a structured, domain-separated way. """ # Combine the epoch and the message hash prefix into a single integer. - acc = (int(epoch) << 8) | TWEAK_PREFIX_MESSAGE.value + acc = (int(epoch) << 8) | TWEAK_PREFIX_MESSAGE # Decompose the integer into its base-P representation. return int_to_base_p(acc, self.config.TWEAK_LEN_FE) diff --git a/src/lean_spec/subspecs/xmss/tweak_hash.py b/src/lean_spec/subspecs/xmss/tweak_hash.py index d4f26a43..d83c2ea7 100644 --- a/src/lean_spec/subspecs/xmss/tweak_hash.py +++ b/src/lean_spec/subspecs/xmss/tweak_hash.py @@ -24,9 +24,7 @@ **domain-separated**, eliminating the risk of cross-context collisions. """ -from __future__ import annotations - -from pydantic import Field +from typing import NamedTuple from lean_spec.types import StrictBaseModel, Uint64 @@ -47,7 +45,7 @@ from .utils import int_to_base_p -class TreeTweak(StrictBaseModel): +class TreeTweak(NamedTuple): """ A tweak used for hashing nodes within the Merkle tree. @@ -55,13 +53,14 @@ class TreeTweak(StrictBaseModel): Merkle tree has a unique context. """ - level: int = Field( - ge=0, description="The level (height) in the Merkle tree, where 0 is the leaf level." - ) - index: Uint64 = Field(description="The node's index (from the left) within that level.") + level: int + """The level (height) in the Merkle tree, where 0 is the leaf level.""" + + index: Uint64 + """The node's index (from the left) within that level.""" -class ChainTweak(StrictBaseModel): +class ChainTweak(NamedTuple): """ A tweak used for hashing elements within a WOTS+ hash chain. @@ -69,11 +68,14 @@ class ChainTweak(StrictBaseModel): chain is distinct across all epochs. """ - epoch: Uint64 = Field(description="The signature epoch.") - chain_index: int = Field( - ge=0, description="The index of the hash chain (from 0 to DIMENSION-1)." - ) - step: int = Field(ge=0, description="The step number within the chain (from 1 to BASE-1).") + epoch: Uint64 + """The signature epoch.""" + + chain_index: int + """The index of the hash chain (from 0 to DIMENSION-1).""" + + step: int + """The step number within the chain (from 1 to BASE-1).""" class TweakHasher(StrictBaseModel): @@ -117,15 +119,10 @@ def _encode_tweak(self, tweak: TreeTweak | ChainTweak, length: int) -> list[Fp]: match tweak: case TreeTweak(level=level, index=index): # Packing scheme: (level << 40) | (index << 8) | PREFIX - acc = (level << 40) | (int(index) << 8) | TWEAK_PREFIX_TREE.value + acc = (level << 40) | (int(index) << 8) | TWEAK_PREFIX_TREE case ChainTweak(epoch=epoch, chain_index=chain_index, step=step): # Packing scheme: (epoch << 24) | (chain_index << 16) | (step << 8) | PREFIX - acc = ( - (int(epoch) << 24) - | (chain_index << 16) - | (step << 8) - | TWEAK_PREFIX_CHAIN.value - ) + acc = (int(epoch) << 24) | (chain_index << 16) | (step << 8) | TWEAK_PREFIX_CHAIN # Decompose the packed integer `acc` into a list of base-P field elements. return int_to_base_p(acc, length) diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index c371fdb7..e7c7a895 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -125,7 +125,7 @@ async def stop(self) -> None: """Stop the node gracefully.""" # Signal the node and event source to stop. self.node.stop() - self.event_source._running = False + self.event_source._stop_event.set() # Set the stop event on gossipsub to release waiting tasks. self.event_source._gossipsub_behavior._stop_event.set() @@ -341,10 +341,10 @@ async def start_node( # Start listener in background (listen() calls serve_forever() which blocks). # - # Set _running BEFORE starting the listener to avoid race conditions. - # The network service checks _running when iterating over events. - # If _running is False, the iteration stops immediately. - event_source._running = True + # Clear the stop event BEFORE starting the listener to avoid race conditions. + # The network service checks the stop event when iterating over events. + # If the stop event is set, iteration stops immediately. + event_source._stop_event.clear() listener_task = asyncio.create_task( event_source.listen(listen_addr), diff --git a/tests/lean_spec/subspecs/networking/client/test_event_source.py b/tests/lean_spec/subspecs/networking/client/test_event_source.py index 7988ac6f..e28cc3d4 100644 --- a/tests/lean_spec/subspecs/networking/client/test_event_source.py +++ b/tests/lean_spec/subspecs/networking/client/test_event_source.py @@ -513,18 +513,18 @@ class TestLiveNetworkEventSourceStop: """ async def test_stop_sets_running_false(self) -> None: - """Stopping clears the running flag.""" + """Stopping flips the stop event back to set.""" es = _make_event_source() - es._running = True + es._stop_event.clear() await es.stop() - assert es._running is False + assert es._stop_event.is_set() async def test_stop_cancels_gossip_tasks(self) -> None: """Stopping cancels all tracked background tasks.""" es = _make_event_source() - es._running = True + es._stop_event.clear() task = asyncio.create_task(asyncio.sleep(100)) es._gossip_tasks.add(task) @@ -536,7 +536,7 @@ async def test_stop_cancels_gossip_tasks(self) -> None: async def test_stop_clears_task_set(self) -> None: """The gossip task set is empty after stopping.""" es = _make_event_source() - es._running = True + es._stop_event.clear() task = asyncio.create_task(asyncio.sleep(100)) es._gossip_tasks.add(task) @@ -614,7 +614,7 @@ def test_gossip_handler_matches_fork_digest(self) -> None: def test_not_running_initially(self) -> None: """Event source starts in stopped state.""" es = _make_event_source() - assert es._running is False + assert es._stop_event.is_set() def test_no_connections_initially(self) -> None: """No peer connections on construction.""" diff --git a/tests/lean_spec/subspecs/networking/gossipsub/test_publish.py b/tests/lean_spec/subspecs/networking/gossipsub/test_publish.py index 44b359d9..c1fb54c9 100644 --- a/tests/lean_spec/subspecs/networking/gossipsub/test_publish.py +++ b/tests/lean_spec/subspecs/networking/gossipsub/test_publish.py @@ -108,7 +108,7 @@ class TestBroadcastSubscription: async def test_subscribe_sends_subscription_to_all_peers(self) -> None: """Subscribing broadcasts a subscription RPC to all peers.""" behavior, capture = make_behavior(d=2, d_low=1, d_high=4) - behavior._running = True + behavior._stop_event.clear() p1 = add_peer(behavior, "peerA", set()) p2 = add_peer(behavior, "peerB", set()) @@ -126,7 +126,7 @@ async def test_subscribe_sends_subscription_to_all_peers(self) -> None: async def test_subscribe_grafts_eligible_peers(self) -> None: """Subscribing GRAFTs eligible peers into the mesh.""" behavior, capture = make_behavior(d=2, d_low=1, d_high=4) - behavior._running = True + behavior._stop_event.clear() topic = TopicId("graftTopic") # These peers are already subscribed to the topic. @@ -148,7 +148,7 @@ async def test_subscribe_grafts_eligible_peers(self) -> None: async def test_subscribe_respects_fanout_promotion(self) -> None: """When subscribing, fanout peers are promoted and GRAFT fills to D.""" behavior, capture = make_behavior(d=3, d_low=2, d_high=6) - behavior._running = True + behavior._stop_event.clear() topic = TopicId("promoteTopic") p1 = add_peer(behavior, "peerA", {topic}) @@ -176,7 +176,7 @@ async def test_subscribe_respects_fanout_promotion(self) -> None: async def test_unsubscribe_prunes_mesh_peers(self) -> None: """Unsubscribing sends PRUNE to former mesh peers.""" behavior, capture = make_behavior() - behavior._running = True + behavior._stop_event.clear() topic = TopicId("pruneTopic") behavior.subscribe(topic) diff --git a/tests/lean_spec/subspecs/xmss/test_message_hash.py b/tests/lean_spec/subspecs/xmss/test_message_hash.py index 45476ff4..c805e4aa 100644 --- a/tests/lean_spec/subspecs/xmss/test_message_hash.py +++ b/tests/lean_spec/subspecs/xmss/test_message_hash.py @@ -44,7 +44,7 @@ def test_encode_epoch() -> None: # Test specific values from the Rust reference tests. test_epochs = [0, 42, 2**32 - 1] for epoch in test_epochs: - acc = (epoch << 8) | TWEAK_PREFIX_MESSAGE.value + acc = (epoch << 8) | TWEAK_PREFIX_MESSAGE expected = int_to_base_p(acc, config.TWEAK_LEN_FE) assert hasher.encode_epoch(Uint64(epoch)) == expected