Source code for celestia.node_api.header

from collections.abc import AsyncIterator
from functools import wraps
from typing import Callable

from _jsonrpc import Wrapper
from celestia.types.header import ExtendedHeader, State


def handle_header_error(func):
    """Decorator to handle blob-related errors."""

    @wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except ConnectionError as e:
            if "header: not found" in e.args[1].body["message"].lower():
                return None
            raise

    return wrapper


[docs] class HeaderClient(Wrapper): """Client for interacting with Celestia's Header API."""
[docs] @handle_header_error async def get_by_hash( self, header_hash: str, *, deserializer: Callable | None = None ) -> ExtendedHeader | None: """Returns the header of the given hash from the node's header store. Args: header_hash (str): The hash of the header to retrieve. deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Returns: ExtendedHeader | None: The retrieved header if found, otherwise None. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer return await self._rpc.call("header.GetByHash", (header_hash,), deserializer)
[docs] async def get_by_height( self, height: int, *, deserializer: Callable | None = None ) -> ExtendedHeader: """Returns the ExtendedHeader at the given height if it is currently available. Args: height (int): The height of the header. deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Returns: ExtendedHeader: The retrieved header. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer return await self._rpc.call("header.GetByHeight", (int(height),), deserializer)
[docs] async def get_range_by_height( self, range_from: ExtendedHeader, range_to: int, *, deserializer: Callable | None = None ) -> list[ExtendedHeader]: """Returns the given range (from:to) of ExtendedHeaders from the node's header store and verifies that the returned headers are adjacent to each other. Args: range_from (ExtendedHeader): The starting header. range_to (int): The height of the last header in the range. deserializer (Callable | None): Custom deserializer. Defaults to None. Returns: list[ExtendedHeader]: A list of retrieved headers. """ def deserializer_(result): if result is not None: return [ExtendedHeader(**kwargs) for kwargs in result] deserializer = deserializer if deserializer is not None else deserializer_ return await self._rpc.call( "header.GetRangeByHeight", (range_from, int(range_to)), deserializer )
[docs] async def local_head(self, *, deserializer: Callable | None = None) -> ExtendedHeader: """Returns the ExtendedHeader of the chain head. Args: deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Returns: ExtendedHeader: The latest known header of the local node. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer return await self._rpc.call("header.LocalHead", (), deserializer)
[docs] async def network_head(self, *, deserializer: Callable | None = None) -> ExtendedHeader: """Provides the Syncer's view of the current network head. Args: deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Returns: ExtendedHeader: The latest known header of the network. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer return await self._rpc.call("header.NetworkHead", (), deserializer)
[docs] async def subscribe( self, *, deserializer: Callable | None = None ) -> AsyncIterator[ExtendedHeader | None]: """Subscribes to recent ExtendedHeaders from the network. Args: deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Yields: ExtendedHeader | None: The latest headers as they become available. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer async for subs_header_result in self._rpc.iter("header.Subscribe", (), deserializer): if subs_header_result is not None: yield subs_header_result
[docs] async def sync_state(self, *, deserializer: Callable | None = None) -> State: """Returns the current state of the header Syncer. Args: deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.State.deserializer`. Returns: State: The current synchronization state. """ deserializer = deserializer if deserializer is not None else State.deserializer return await self._rpc.call("header.SyncState", (), deserializer)
[docs] async def sync_wait(self) -> None: """Blocks until the header Syncer is synced to network head. Returns: None """ return await self._rpc.call("header.SyncWait")
[docs] async def wait_for_height( self, height: int, *, deserializer: Callable | None = None ) -> ExtendedHeader: """Blocks until the header at the given height has been processed by the store or context deadline is exceeded. Args: height (int): The height of the header to wait for. deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Returns: ExtendedHeader: The retrieved header once available. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer return await self._rpc.call("header.WaitForHeight", (height,), deserializer)