Source code for celestia.node_api.header

from collections.abc import AsyncIterator
from functools import wraps
from typing import Callable

from celestia.types.header import ExtendedHeader, State
from celestia.node_api.rpc.abc import Wrapper


def handle_header_error(func):
    """ Decorator to handle blob-related errors."""

    @wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except ConnectionError as e:
            if 'header: not found' in e.args[1].body['message'].lower():
                return None
            raise

    return wrapper


[docs] class HeaderClient(Wrapper): """ Client for interacting with Celestia's Header API."""
[docs] @handle_header_error async def get_by_hash(self, header_hash: str, *, deserializer: Callable | None = None) -> ExtendedHeader | None: """ Returns the header of the given hash from the node's header store. Args: header_hash (str): The hash of the header to retrieve. deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Returns: ExtendedHeader | None: The retrieved header if found, otherwise None. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer return await self._rpc.call("header.GetByHash", (header_hash,), deserializer)
[docs] async def get_by_height(self, height: int, *, deserializer: Callable | None = None) -> ExtendedHeader: """ Returns the ExtendedHeader at the given height if it is currently available. Args: height (int): The height of the header. deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Returns: ExtendedHeader: The retrieved header. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer return await self._rpc.call("header.GetByHeight", (int(height),), deserializer)
[docs] async def get_range_by_height(self, range_from: ExtendedHeader, range_to: int, *, deserializer: Callable | None = None) -> list[ExtendedHeader]: """ Returns the given range (from:to) of ExtendedHeaders from the node's header store and verifies that the returned headers are adjacent to each other. Args: range_from (ExtendedHeader): The starting header. range_to (int): The height of the last header in the range. deserializer (Callable | None): Custom deserializer. Defaults to None. Returns: list[ExtendedHeader]: A list of retrieved headers. """ def deserializer_(result): if result is not None: return [ExtendedHeader(**kwargs) for kwargs in result] deserializer = deserializer if deserializer is not None else deserializer_ return await self._rpc.call("header.GetRangeByHeight", (range_from, int(range_to)), deserializer)
[docs] async def local_head(self, *, deserializer: Callable | None = None) -> ExtendedHeader: """ Returns the ExtendedHeader of the chain head. Args: deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Returns: ExtendedHeader: The latest known header of the local node. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer return await self._rpc.call("header.LocalHead", (), deserializer)
[docs] async def network_head(self, *, deserializer: Callable | None = None) -> ExtendedHeader: """ Provides the Syncer's view of the current network head. Args: deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Returns: ExtendedHeader: The latest known header of the network. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer return await self._rpc.call("header.NetworkHead", (), deserializer)
[docs] async def subscribe(self, *, deserializer: Callable | None = None) -> AsyncIterator[ExtendedHeader | None]: """ Subscribes to recent ExtendedHeaders from the network. Args: deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Yields: ExtendedHeader | None: The latest headers as they become available. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer async for subs_header_result in self._rpc.iter("header.Subscribe", (), deserializer): if subs_header_result is not None: yield subs_header_result
[docs] async def sync_state(self, *, deserializer: Callable | None = None) -> State: """ Returns the current state of the header Syncer. Args: deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.State.deserializer`. Returns: State: The current synchronization state. """ deserializer = deserializer if deserializer is not None else State.deserializer return await self._rpc.call("header.SyncState", (), deserializer)
[docs] async def sync_wait(self) -> None: """ Blocks until the header Syncer is synced to network head. Returns: None """ return await self._rpc.call("header.SyncWait")
[docs] async def wait_for_height(self, height: int, *, deserializer: Callable | None = None) -> ExtendedHeader: """ Blocks until the header at the given height has been processed by the store or context deadline is exceeded. Args: height (int): The height of the header to wait for. deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.header.ExtendedHeader.deserializer`. Returns: ExtendedHeader: The retrieved header once available. """ deserializer = deserializer if deserializer is not None else ExtendedHeader.deserializer return await self._rpc.call("header.WaitForHeight", (height,), deserializer)