Source code for celestia.node_api.blob

from collections.abc import AsyncIterator
from functools import wraps
from typing import Callable

from celestia._celestia import types  # noqa

from celestia.types import Blob, Namespace, TxConfig, Commitment, Unpack
from celestia.types.blob import SubmitBlobResult, Proof, CommitmentProof, SubscriptionBlobResult
from celestia.node_api.rpc.abc import Wrapper


def handle_blob_error(func):
    """ Decorator to handle blob-related errors."""

    @wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except ConnectionError as e:
            if 'blob: not found' in e.args[1].body['message'].lower():
                return None
            raise

    return wrapper


[docs] class BlobClient(Wrapper): """ Client for interacting with Celestia's Blob API."""
[docs] @handle_blob_error async def get(self, height: int, namespace: Namespace, commitment: Commitment, *, deserializer: Callable | None = None) -> Blob | None: """ Retrieves the blob by commitment under the given namespace and height. Args: height (int): The block height. namespace (Namespace): The namespace of the blob. commitment (Commitment): The commitment of the blob. deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.common_types.Blob.deserializer`. Returns: Blob | None: The retrieved blob, or None if not found. """ deserializer = deserializer if deserializer is not None else Blob.deserializer return await self._rpc.call("blob.Get", (height, Namespace(namespace), Commitment(commitment)), deserializer)
[docs] async def get_all(self, height: int, namespace: Namespace, *namespaces: Namespace, deserializer: Callable | None = None) -> list[Blob] | None: """ Returns all blobs under the given namespaces at the given height. If all blobs were found without any errors, the user will receive a list of blobs. If the BlobService couldn't find any blobs under the requested namespaces, the user will receive an empty list of blobs along with an empty error. If some of the requested namespaces were not found, the user will receive all the found blobs and an empty error. If there were internal errors during some of the requests, the user will receive all found blobs along with a combined error message. All blobs will preserve the order of the namespaces that were requested. Args: height (int): The block height. namespace (Namespace): The primary namespace of the blobs. namespaces (Namespace): Additional namespaces to query for blobs. deserializer (Callable | None): Custom deserializer. Defaults to None. Returns: list[Blob]: The list of blobs or [] if not found. """ def deserializer_(result) -> list['Blob']: if result is not None: return [Blob(**kwargs) for kwargs in result] else: return [] deserializer = deserializer if deserializer is not None else deserializer_ namespaces = tuple(Namespace(namespace) for namespace in (namespace, *namespaces)) return await self._rpc.call("blob.GetAll", (height, namespaces), deserializer)
[docs] async def submit(self, blob: Blob, *blobs: Blob, deserializer: Callable | None = None, **options: Unpack[TxConfig]) -> SubmitBlobResult: """ Sends Blobs and reports the height in which they were included. Allows sending multiple Blobs atomically synchronously. Uses default wallet registered on the Node. Args: blob (Blob): The main blob to submit. blobs (Blob): Additional blobs to submit. deserializer (Callable | None): Custom deserializer. Defaults to None. options (TxConfig): Additional configuration options. Returns: SubmitBlobResult: The result of the submission, including the height. """ def deserializer_(height): if height is not None: return SubmitBlobResult(height, tuple(blob.commitment for blob in blobs)) deserializer = deserializer if deserializer is not None else deserializer_ blobs = tuple(types.normalize_blob(blob) if blob.commitment is None else blob for blob in (blob, *blobs)) return await self._rpc.call("blob.Submit", (blobs, options), deserializer)
[docs] @handle_blob_error async def get_commitment_proof(self, height: int, namespace: Namespace, commitment: Commitment, *, deserializer: Callable | None = None) -> CommitmentProof | None: """ Generates a commitment proof for a share commitment. Args: height (int): The block height. namespace (Namespace): The namespace of the commitment. commitment (Commitment): The commitment to generate proof for. deserializer (Callable | None): Custom deserializer. Defaults to :meth:`~celestia.types.blob.CommitmentProof.deserializer`. Returns: CommitmentProof | None: The commitment proof, or None if not found. """ deserializer = deserializer if deserializer is not None else CommitmentProof.deserializer return await self._rpc.call("blob.GetCommitmentProof", (height, Namespace(namespace), Commitment(commitment)), deserializer)
[docs] async def get_proof(self, height: int, namespace: Namespace, commitment: Commitment, *, deserializer: Callable | None = None) -> list[Proof] | None: """ Retrieves proofs in the given namespaces at the given height by commitment. Args: height (int): The block height. namespace (Namespace): The namespace of the proof. commitment (Commitment): The commitment to generate the proof for. deserializer (Callable | None): Custom deserializer. Defaults to None. Returns: list[Proof]: A list of proofs or [] if not found. """ def deserializer_(result) -> list['Proof']: if result is not None: return [Proof(**kwargs) for kwargs in result] deserializer = deserializer if deserializer is not None else deserializer_ try: return await self._rpc.call("blob.GetProof", (height, Namespace(namespace), Commitment(commitment)), deserializer) except ConnectionError as e: if 'blob: not found' in e.args[1].body['message'].lower(): return [] raise
[docs] async def included(self, height: int, namespace: Namespace, proof: Proof, commitment: Commitment) -> bool: """ Checks whether a blob's given commitment(Merkle subtree root) is included at given height and under the namespace. Args: height (int): The block height. namespace (Namespace): The namespace of the blob. proof (Proof): The proof to check. commitment (Commitment): The commitment to check inclusion for. Returns: bool: True if included, False otherwise. """ return await self._rpc.call("blob.Included", (height, Namespace(namespace), proof, Commitment(commitment)))
[docs] async def subscribe(self, namespace: Namespace, *, deserializer: Callable | None = None) -> AsyncIterator[SubscriptionBlobResult | None]: """ Subscribe to published blobs from the given namespace as they are included. Args: namespace (Namespace): The namespace to subscribe to. deserializer (Callable | None): Custom deserializer. Defaults to None. Yields: SubscriptionBlobResult: A result with the blob information. """ def deserializer_(result): height = result['Height'] blobs = result.get('Blobs') if blobs is not None: return SubscriptionBlobResult(height, tuple(Blob(**kwargs) for kwargs in blobs)) deserializer = deserializer if deserializer is not None else deserializer_ async for subs_blob_result in self._rpc.iter("blob.Subscribe", (Namespace(namespace),), deserializer): if subs_blob_result is not None: yield subs_blob_result