Endpoint

Base Endpoint API

class lahja.base.EndpointAPI

Bases: abc.ABC

The Endpoint enables communication between different processes as well as within a single process via various event-driven APIs.

are_all_endpoints_subscribed_to(event_type: Type[lahja.common.BaseEvent]) → bool

Return True if every connected remote endpoint is subscribed to the specified event type from this endpoint. Otherwise return False.

broadcast(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None

Broadcast an instance of BaseEvent on the event bus. Takes an optional second parameter of BroadcastConfig to decide where this event should be broadcasted to. By default, events are broadcasted across all connected endpoints with their consuming call sites.

broadcast_nowait(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None

A sync compatible version of broadcast()

Warning

Heavy use of broadcast_nowait() in contiguous blocks of code without yielding to the async implementation should be expected to cause problems.

connect_to_endpoints(*endpoints) → None

Establish connections to the given endpoints.

get_connected_endpoints_and_subscriptions() → Tuple[Tuple[str, Set[Type[lahja.common.BaseEvent]]], ...]

Return 2-tuples for all all connected endpoints containing the name of the endpoint coupled with the set of messages the endpoint subscribes to

get_subscribed_events() → Set[Type[lahja.common.BaseEvent]]

Return the set of event types this endpoint subscribes to.

is_any_endpoint_subscribed_to(event_type: Type[lahja.common.BaseEvent]) → bool

Return True if at least one of the connected remote endpoints is subscribed to the specified event type from this endpoint. Otherwise return False.

is_connected_to(endpoint_name: str) → bool

Return whether this endpoint is connected to another endpoint with the given name.

is_endpoint_subscribed_to(remote_endpoint: str, event_type: Type[lahja.common.BaseEvent]) → bool

Return True if the specified remote endpoint is subscribed to the specified event type from this endpoint. Otherwise return False.

request(item: lahja.common.BaseRequestResponseEvent[TResponse], config: Optional[lahja.common.BroadcastConfig] = None) → TResponse

Broadcast an instance of BaseRequestResponseEvent on the event bus and immediately wait on an expected answer of type BaseEvent. Optionally pass a second parameter of BroadcastConfig to decide where the request should be broadcasted to. By default, requests are broadcasted across all connected endpoints with their consuming call sites.

run() → AsyncContextManager[lahja.base.EndpointAPI]

Context manager API for running endpoints.

async with endpoint.run() as endpoint:
    ... # endpoint running within context
... # endpoint stopped after
classmethod serve(config: lahja.common.ConnectionConfig) → AsyncContextManager[lahja.base.EndpointAPI]

Context manager API for running and endpoint server.

async with EndpointClass.serve(config):
    ... # server running within context
... # server stopped
stream(event_type: Type[TStreamEvent], num_events: Optional[int] = None) → AsyncGenerator[TStreamEvent, None]

Stream all events that match the specified event type. This returns an AsyncIterable[BaseEvent] which can be consumed through an async for loop. An optional num_events parameter can be passed to stop streaming after a maximum amount of events was received.

subscribe(event_type: Type[TSubscribeEvent], handler: Callable[TSubscribeEvent, Union[Any, Awaitable[Any]]]) → lahja.common.Subscription

Subscribe to receive updates for any event that matches the specified event type. A handler is passed as a second argument an Subscription is returned to unsubscribe from the event if needed.

wait_for(event_type: Type[TWaitForEvent]) → TWaitForEvent

Wait for a single instance of an event that matches the specified event type.

wait_until_all_endpoints_subscribed_to(event: Type[lahja.common.BaseEvent], *, include_self: bool = True) → None

Block until all currently connected remote endpoints are subscribed to the specified event type from this endpoint.

wait_until_any_endpoint_subscribed_to(event: Type[lahja.common.BaseEvent]) → None

Block until any other remote endpoint has subscribed to the specified event type from this endpoint.

wait_until_connected_to(endpoint_name: str) → None

Return once a connection exists to an endpoint with the given name.

wait_until_connections_change() → None

Block until the set of connected remote endpoints changes.

wait_until_endpoint_subscribed_to(remote_endpoint: str, event: Type[lahja.common.BaseEvent]) → None

Block until the specified remote endpoint has subscribed to the specified event type from this endpoint.

wait_until_endpoint_subscriptions_change() → None

Block until any subscription change occurs on any remote endpoint or the set of remote endpoints changes

is_running
is_serving
name
class lahja.base.BaseEndpoint(name: str)

Bases: lahja.base.EndpointAPI

Base class for endpoint implementations that implements shared/common logic

are_all_endpoints_subscribed_to(event_type: Type[lahja.common.BaseEvent], include_self: bool = True) → bool

Return True if every connected remote endpoint is subscribed to the specified event type from this endpoint. Otherwise return False.

get_connected_endpoints_and_subscriptions() → Tuple[Tuple[str, Set[Type[lahja.common.BaseEvent]]], ...]

Return all connected endpoints and their event type subscriptions to this endpoint.

is_any_endpoint_subscribed_to(event_type: Type[lahja.common.BaseEvent]) → bool

Return True if at least one of the connected remote endpoints is subscribed to the specified event type from this endpoint. Otherwise return False.

is_connected_to(endpoint_name: str) → bool

Return whether this endpoint is connected to another endpoint with the given name.

is_endpoint_subscribed_to(remote_endpoint: str, event_type: Type[lahja.common.BaseEvent]) → bool

Return True if the specified remote endpoint is subscribed to the specified event type from this endpoint. Otherwise return False.

maybe_raise_no_subscribers_exception(config: Optional[lahja.common.BroadcastConfig], event_type: Type[lahja.common.BaseEvent]) → None

Check the given config and event_type and raise a NoSubscriber if no subscribers exist for the event_type when they at least one subscriber is expected.

wait_for(event_type: Type[TWaitForEvent]) → TWaitForEvent

Wait for a single instance of an event that matches the specified event type.

wait_until_all_endpoints_subscribed_to(event: Type[lahja.common.BaseEvent], *, include_self: bool = True) → None

Block until all currently connected remote endpoints are subscribed to the specified event type from this endpoint.

wait_until_any_endpoint_subscribed_to(event: Type[lahja.common.BaseEvent]) → None

Block until any other remote endpoint has subscribed to the specified event type from this endpoint.

wait_until_connected_to(endpoint_name: str) → None

Return once a connection exists to an endpoint with the given name.

wait_until_connections_change() → None

Block until the set of connected remote endpoints changes.

wait_until_endpoint_subscribed_to(remote_endpoint: str, event: Type[lahja.common.BaseEvent]) → None

Block until the specified remote endpoint has subscribed to the specified event type from this endpoint.

wait_until_endpoint_subscriptions_change() → None

Block until any subscription change occurs on any remote endpoint or the set of remote endpoints changes

has_snappy_support = False
logger = <Logger lahja.endpoint.Endpoint (WARNING)>

AsyncioEndpoint

class lahja.asyncio.endpoint.AsyncioEndpoint(name: str)

Bases: lahja.base.BaseEndpoint

The AsyncioEndpoint enables communication between different processes as well as within a single process via various event-driven APIs.

broadcast(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None

Broadcast an instance of BaseEvent on the event bus. Takes an optional second parameter of BroadcastConfig to decide where this event should be broadcasted to. By default, events are broadcasted across all connected endpoints with their consuming call sites.

broadcast_nowait(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None

A non-async broadcast() (see broadcast() for more)

Instead of blocking the calling coroutine this function schedules the broadcast and immediately returns.

CAUTION: You probably don’t want to use this. broadcast() doesn’t return until the write socket has finished draining, meaning that the OS has accepted the message. This prevents us from sending more data than the remote process can handle. broadcast_nowait has no such backpressure. Even after the remote process stops accepting new messages this function will continue to accept them, which in the worst case could lead to runaway memory usage.

check_event_loop() → TFunc

All Endpoint methods must be called from the same event loop.

connect_to_endpoints(*endpoints) → None

Connect to the given endpoints and await until all connections are established.

get_subscribed_events() → Set[Type[lahja.common.BaseEvent]]

Return the set of events this Endpoint is currently listening for

request(item: lahja.common.BaseRequestResponseEvent[TResponse], config: Optional[lahja.common.BroadcastConfig] = None) → TResponse

Broadcast an instance of BaseRequestResponseEvent on the event bus and immediately wait on an expected answer of type BaseEvent. Optionally pass a second parameter of BroadcastConfig to decide where the request should be broadcasted to. By default, requests are broadcasted across all connected endpoints with their consuming call sites.

run() → AsyncIterator[lahja.base.EndpointAPI]

Context manager API for running endpoints.

async with endpoint.run() as endpoint:
    ... # endpoint running within context
... # endpoint stopped after
classmethod serve(config: lahja.common.ConnectionConfig) → AsyncIterator[AsyncioEndpoint]

Context manager API for running and endpoint server.

async with EndpointClass.serve(config):
    ... # server running within context
... # server stopped
stream(event_type: Type[TStreamEvent], num_events: Optional[int] = None) → AsyncGenerator[TStreamEvent, None]

Stream all events that match the specified event type. This returns an AsyncIterable[BaseEvent] which can be consumed through an async for loop. An optional num_events parameter can be passed to stop streaming after a maximum amount of events was received.

subscribe(event_type: Type[TSubscribeEvent], handler: Callable[TSubscribeEvent, Union[Any, Awaitable[Any]]]) → lahja.common.Subscription

Subscribe to receive updates for any event that matches the specified event type. A handler is passed as a second argument an Subscription is returned to unsubscribe from the event if needed.

event_loop
ipc_path
is_running
is_serving

TrioEndpoint

class lahja.trio.endpoint.TrioEndpoint(name: str)

Bases: lahja.base.BaseEndpoint

broadcast(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None

Broadcast an instance of BaseEvent on the event bus. Takes an optional second parameter of BroadcastConfig to decide where this event should be broadcasted to. By default, events are broadcasted across all connected endpoints with their consuming call sites.

broadcast_nowait(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None

A sync compatible version of broadcast()

Warning

Heavy use of broadcast_nowait() in contiguous blocks of code without yielding to the async implementation should be expected to cause problems.

connect_to_endpoints(*endpoints) → None

Connect to the given endpoints and await until all connections are established.

get_subscribed_events() → Set[Type[lahja.common.BaseEvent]]

Return the set of events this Endpoint is currently listening for

request(item: lahja.common.BaseRequestResponseEvent[TResponse], config: Optional[lahja.common.BroadcastConfig] = None) → TResponse

Broadcast an instance of BaseRequestResponseEvent on the event bus and immediately wait on an expected answer of type BaseEvent. Optionally pass a second parameter of BroadcastConfig to decide where the request should be broadcasted to. By default, requests are broadcasted across all connected endpoints with their consuming call sites.

run() → AsyncGenerator[lahja.base.EndpointAPI, None]

Context manager API for running endpoints.

async with endpoint.run() as endpoint:
    ... # endpoint running within context
... # endpoint stopped after
classmethod serve(config: lahja.common.ConnectionConfig) → AsyncIterator[TrioEndpoint]

Context manager API for running and endpoint server.

async with EndpointClass.serve(config):
    ... # server running within context
... # server stopped
stream(event_type: Type[TStreamEvent], num_events: Optional[int] = None) → AsyncGenerator[TStreamEvent, None]

Stream all events that match the specified event type. This returns an AsyncIterable[BaseEvent] which can be consumed through an async for loop. An optional num_events parameter can be passed to stop streaming after a maximum amount of events was received.

subscribe(event_type: Type[TSubscribeEvent], handler: Callable[TSubscribeEvent, Union[Any, Awaitable[Any]]]) → lahja.common.Subscription

Subscribe to receive updates for any event that matches the specified event type. A handler is passed as a second argument an Subscription is returned to unsubscribe from the event if needed.

wait_started() → None
wait_stopped() → None
TResponse = ~TResponse
TStreamEvent = ~TStreamEvent
TSubscribeEvent = ~TSubscribeEvent
is_running
is_server_stopped
is_serving
is_stopped
logger = <Logger lahja.trio.TrioEndpoint (WARNING)>