API Docs

Endpoint

class lahja.endpoint.Broadcast(event, config)

Bases: tuple

config

Alias for field number 1

event

Alias for field number 0

class lahja.endpoint.ConnectionConfig

Bases: tuple

Configuration class needed to establish Endpoint connections.

classmethod from_name(name: str, base_path: Optional[pathlib.Path] = None) → lahja.endpoint.ConnectionConfig
name

Alias for field number 0

path

Alias for field number 1

class lahja.endpoint.Endpoint

Bases: object

The Endpoint enables communication between different processes as well as within a single process via various event-driven APIs.

TResponse = ~TResponse
TStreamEvent = ~TStreamEvent
TSubscribeEvent = ~TSubscribeEvent
TWaitForEvent = ~TWaitForEvent
broadcast(item: lahja.misc.BaseEvent, config: Optional[lahja.misc.BroadcastConfig] = None) → None

Broadcast an instance of BaseEvent on the event bus. Takes an optional second parameter of BroadcastConfig to decide where this event should be broadcasted to. By default, events are broadcasted across all connected endpoints with their consuming call sites.

broadcast_nowait(item: lahja.misc.BaseEvent, config: Optional[lahja.misc.BroadcastConfig] = None) → None

A non-async broadcast() (see the docstring for broadcast() for more)

Instead of blocking the calling coroutine this function schedules the broadcast and immediately returns.

CAUTION: You probably don’t want to use this. broadcast() doesn’t return until the write socket has finished draining, meaning that the OS has accepted the message. This prevents us from sending more data than the remote process can handle. broadcast_nowait has no such backpressure. Even after the remote process stops accepting new messages this function will continue to accept them, which in the worst case could lead to runaway memory usage.

check_event_loop() → TFunc

All Endpoint methods must be called from the same event loop.

connect_to_endpoint(config: lahja.endpoint.ConnectionConfig) → None
connect_to_endpoints(*endpoints) → None

Connect to the given endpoints and await until all connections are established.

connect_to_endpoints_nowait(*endpoints) → None

Connect to the given endpoints as soon as they become available but do not block.

event_loop
has_snappy_support
ipc_path
is_connected_to(endpoint_name: str) → bool
logger
name
request(item: lahja.misc.BaseRequestResponseEvent[TResponse], config: Optional[lahja.misc.BroadcastConfig] = None) → TResponse

Broadcast an instance of BaseRequestResponseEvent on the event bus and immediately wait on an expected answer of type BaseEvent. Optionally pass a second parameter of BroadcastConfig to decide where the request should be broadcasted to. By default, requests are broadcasted across all connected endpoints with their consuming call sites.

start_serving(connection_config: lahja.endpoint.ConnectionConfig) → None

Start serving this Endpoint so that it can receive events. Await until the Endpoint is ready.

stop() → None

Stop the Endpoint from receiving further events.

stream(event_type: Type[TStreamEvent], num_events: Optional[int] = None) → AsyncGenerator[TStreamEvent, None]

Stream all events that match the specified event type. This returns an AsyncIterable[BaseEvent] which can be consumed through an async for loop. An optional num_events parameter can be passed to stop streaming after a maximum amount of events was received.

subscribe(event_type: Type[TSubscribeEvent], handler: Callable[TSubscribeEvent, None]) → lahja.misc.Subscription

Subscribe to receive updates for any event that matches the specified event type. A handler is passed as a second argument an Subscription is returned to unsubscribe from the event if needed.

wait_for(event_type: Type[TWaitForEvent]) → TWaitForEvent

Wait for a single instance of an event that matches the specified event type.

wait_until_serving() → None

Await until the Endpoint is ready to receive events.

class lahja.endpoint.RemoteEndpoint(reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)

Bases: object

static connect_to(path: str) → lahja.endpoint.RemoteEndpoint
read_message() → lahja.endpoint.Broadcast
send_message(message: lahja.endpoint.Broadcast) → None

ConnectionConfig

class lahja.endpoint.ConnectionConfig

Bases: tuple

Configuration class needed to establish Endpoint connections.

classmethod from_name(name: str, base_path: Optional[pathlib.Path] = None) → lahja.endpoint.ConnectionConfig
name

Alias for field number 0

path

Alias for field number 1

Exceptions

exception lahja.exceptions.ConnectionAttemptRejected

Bases: Exception

Raised when an attempt was made to connect to an endpoint that is already connected.

exception lahja.exceptions.LahjaError

Bases: Exception

Base class for all lahja errors

exception lahja.exceptions.NotServing

Bases: lahja.exceptions.LahjaError

Raised when an API call was made before start_serving() was called.

exception lahja.exceptions.ReaderAtEof

Bases: lahja.exceptions.LahjaError

Raised by RemoteEndpoint when the remote has closed the connection.

exception lahja.exceptions.UnexpectedResponse

Bases: lahja.exceptions.LahjaError

Raised when the type of a response did not match the expected_response_type.

BaseEvent

class lahja.misc.BaseEvent

Bases: object

broadcast_config(internal: bool = False) → lahja.misc.BroadcastConfig

BaseRequestResponseEvent

class lahja.misc.BaseRequestResponseEvent

Bases: abc.ABC, lahja.misc.BaseEvent, typing.Generic

static expected_response_type() → Type[TResponse]

Return the type that is expected to be send back for this request. This ensures that at runtime, only expected responses can be send back to callsites that issued a BaseRequestResponseEvent

BroadcastConfig

class lahja.misc.BroadcastConfig(filter_endpoint: Optional[str] = None, filter_event_id: Optional[str] = None, internal: bool = False)

Bases: object

allowed_to_receive(endpoint: str) → bool

Subscription

class lahja.misc.Subscription(unsubscribe_fn: Callable[Any])

Bases: object

unsubscribe() → None