Endpoint¶
Base Endpoint API¶
-
class
lahja.base.
EndpointAPI
¶ Bases:
abc.ABC
The
Endpoint
enables communication between different processes as well as within a single process via various event-driven APIs.-
are_all_endpoints_subscribed_to
(event_type: Type[lahja.common.BaseEvent]) → bool¶ Return
True
if every connected remote endpoint is subscribed to the specified event type from this endpoint. Otherwise returnFalse
.
-
broadcast
(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ Broadcast an instance of
BaseEvent
on the event bus. Takes an optional second parameter ofBroadcastConfig
to decide where this event should be broadcasted to. By default, events are broadcasted across all connected endpoints with their consuming call sites.
-
broadcast_nowait
(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ A sync compatible version of
broadcast()
Warning
Heavy use of
broadcast_nowait()
in contiguous blocks of code without yielding to the async implementation should be expected to cause problems.
-
connect_to_endpoints
(*endpoints) → None¶ Establish connections to the given endpoints.
-
get_connected_endpoints_and_subscriptions
() → Tuple[Tuple[str, Set[Type[lahja.common.BaseEvent]]], ...]¶ Return 2-tuples for all all connected endpoints containing the name of the endpoint coupled with the set of messages the endpoint subscribes to
-
get_subscribed_events
() → Set[Type[lahja.common.BaseEvent]]¶ Return the set of event types this endpoint subscribes to.
-
is_any_endpoint_subscribed_to
(event_type: Type[lahja.common.BaseEvent]) → bool¶ Return
True
if at least one of the connected remote endpoints is subscribed to the specified event type from this endpoint. Otherwise returnFalse
.
-
is_connected_to
(endpoint_name: str) → bool¶ Return whether this endpoint is connected to another endpoint with the given name.
-
is_endpoint_subscribed_to
(remote_endpoint: str, event_type: Type[lahja.common.BaseEvent]) → bool¶ Return
True
if the specified remote endpoint is subscribed to the specified event type from this endpoint. Otherwise returnFalse
.
-
request
(item: lahja.common.BaseRequestResponseEvent[TResponse], config: Optional[lahja.common.BroadcastConfig] = None) → TResponse¶ Broadcast an instance of
BaseRequestResponseEvent
on the event bus and immediately wait on an expected answer of typeBaseEvent
. Optionally pass a second parameter ofBroadcastConfig
to decide where the request should be broadcasted to. By default, requests are broadcasted across all connected endpoints with their consuming call sites.
-
run
() → AsyncContextManager[lahja.base.EndpointAPI]¶ Context manager API for running endpoints.
async with endpoint.run() as endpoint: ... # endpoint running within context ... # endpoint stopped after
-
classmethod
serve
(config: lahja.common.ConnectionConfig) → AsyncContextManager[lahja.base.EndpointAPI]¶ Context manager API for running and endpoint server.
async with EndpointClass.serve(config): ... # server running within context ... # server stopped
-
stream
(event_type: Type[TStreamEvent], num_events: Optional[int] = None) → AsyncGenerator[TStreamEvent, None]¶ Stream all events that match the specified event type. This returns an
AsyncIterable[BaseEvent]
which can be consumed through anasync for
loop. An optionalnum_events
parameter can be passed to stop streaming after a maximum amount of events was received.
-
subscribe
(event_type: Type[TSubscribeEvent], handler: Callable[TSubscribeEvent, Union[Any, Awaitable[Any]]]) → lahja.common.Subscription¶ Subscribe to receive updates for any event that matches the specified event type. A handler is passed as a second argument an
Subscription
is returned to unsubscribe from the event if needed.
-
wait_for
(event_type: Type[TWaitForEvent]) → TWaitForEvent¶ Wait for a single instance of an event that matches the specified event type.
-
wait_until_all_endpoints_subscribed_to
(event: Type[lahja.common.BaseEvent], *, include_self: bool = True) → None¶ Block until all currently connected remote endpoints are subscribed to the specified event type from this endpoint.
-
wait_until_any_endpoint_subscribed_to
(event: Type[lahja.common.BaseEvent]) → None¶ Block until any other remote endpoint has subscribed to the specified event type from this endpoint.
-
wait_until_connected_to
(endpoint_name: str) → None¶ Return once a connection exists to an endpoint with the given name.
-
wait_until_connections_change
() → None¶ Block until the set of connected remote endpoints changes.
-
wait_until_endpoint_subscribed_to
(remote_endpoint: str, event: Type[lahja.common.BaseEvent]) → None¶ Block until the specified remote endpoint has subscribed to the specified event type from this endpoint.
-
wait_until_endpoint_subscriptions_change
() → None¶ Block until any subscription change occurs on any remote endpoint or the set of remote endpoints changes
-
is_running
¶
-
is_serving
¶
-
name
¶
-
-
class
lahja.base.
BaseEndpoint
(name: str)¶ Bases:
lahja.base.EndpointAPI
Base class for endpoint implementations that implements shared/common logic
-
are_all_endpoints_subscribed_to
(event_type: Type[lahja.common.BaseEvent], include_self: bool = True) → bool¶ Return
True
if every connected remote endpoint is subscribed to the specified event type from this endpoint. Otherwise returnFalse
.
-
get_connected_endpoints_and_subscriptions
() → Tuple[Tuple[str, Set[Type[lahja.common.BaseEvent]]], ...]¶ Return all connected endpoints and their event type subscriptions to this endpoint.
-
is_any_endpoint_subscribed_to
(event_type: Type[lahja.common.BaseEvent]) → bool¶ Return
True
if at least one of the connected remote endpoints is subscribed to the specified event type from this endpoint. Otherwise returnFalse
.
-
is_connected_to
(endpoint_name: str) → bool¶ Return whether this endpoint is connected to another endpoint with the given name.
-
is_endpoint_subscribed_to
(remote_endpoint: str, event_type: Type[lahja.common.BaseEvent]) → bool¶ Return
True
if the specified remote endpoint is subscribed to the specified event type from this endpoint. Otherwise returnFalse
.
-
maybe_raise_no_subscribers_exception
(config: Optional[lahja.common.BroadcastConfig], event_type: Type[lahja.common.BaseEvent]) → None¶ Check the given
config
andevent_type
and raise aNoSubscriber
if no subscribers exist for theevent_type
when they at least one subscriber is expected.
-
wait_for
(event_type: Type[TWaitForEvent]) → TWaitForEvent¶ Wait for a single instance of an event that matches the specified event type.
-
wait_until_all_endpoints_subscribed_to
(event: Type[lahja.common.BaseEvent], *, include_self: bool = True) → None¶ Block until all currently connected remote endpoints are subscribed to the specified event type from this endpoint.
-
wait_until_any_endpoint_subscribed_to
(event: Type[lahja.common.BaseEvent]) → None¶ Block until any other remote endpoint has subscribed to the specified event type from this endpoint.
-
wait_until_connected_to
(endpoint_name: str) → None¶ Return once a connection exists to an endpoint with the given name.
-
wait_until_connections_change
() → None¶ Block until the set of connected remote endpoints changes.
-
wait_until_endpoint_subscribed_to
(remote_endpoint: str, event: Type[lahja.common.BaseEvent]) → None¶ Block until the specified remote endpoint has subscribed to the specified event type from this endpoint.
-
wait_until_endpoint_subscriptions_change
() → None¶ Block until any subscription change occurs on any remote endpoint or the set of remote endpoints changes
-
has_snappy_support
= False¶
-
logger
= <Logger lahja.endpoint.Endpoint (WARNING)>¶
-
AsyncioEndpoint¶
-
class
lahja.asyncio.endpoint.
AsyncioEndpoint
(name: str)¶ Bases:
lahja.base.BaseEndpoint
The
AsyncioEndpoint
enables communication between different processes as well as within a single process via various event-driven APIs.-
broadcast
(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ Broadcast an instance of
BaseEvent
on the event bus. Takes an optional second parameter ofBroadcastConfig
to decide where this event should be broadcasted to. By default, events are broadcasted across all connected endpoints with their consuming call sites.
-
broadcast_nowait
(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ A non-async
broadcast()
(seebroadcast()
for more)Instead of blocking the calling coroutine this function schedules the broadcast and immediately returns.
CAUTION: You probably don’t want to use this. broadcast() doesn’t return until the write socket has finished draining, meaning that the OS has accepted the message. This prevents us from sending more data than the remote process can handle. broadcast_nowait has no such backpressure. Even after the remote process stops accepting new messages this function will continue to accept them, which in the worst case could lead to runaway memory usage.
-
check_event_loop
() → TFunc¶ All Endpoint methods must be called from the same event loop.
-
connect_to_endpoints
(*endpoints) → None¶ Connect to the given endpoints and await until all connections are established.
-
get_subscribed_events
() → Set[Type[lahja.common.BaseEvent]]¶ Return the set of events this Endpoint is currently listening for
-
request
(item: lahja.common.BaseRequestResponseEvent[TResponse], config: Optional[lahja.common.BroadcastConfig] = None) → TResponse¶ Broadcast an instance of
BaseRequestResponseEvent
on the event bus and immediately wait on an expected answer of typeBaseEvent
. Optionally pass a second parameter ofBroadcastConfig
to decide where the request should be broadcasted to. By default, requests are broadcasted across all connected endpoints with their consuming call sites.
-
run
() → AsyncIterator[lahja.base.EndpointAPI]¶ Context manager API for running endpoints.
async with endpoint.run() as endpoint: ... # endpoint running within context ... # endpoint stopped after
-
classmethod
serve
(config: lahja.common.ConnectionConfig) → AsyncIterator[AsyncioEndpoint]¶ Context manager API for running and endpoint server.
async with EndpointClass.serve(config): ... # server running within context ... # server stopped
-
stream
(event_type: Type[TStreamEvent], num_events: Optional[int] = None) → AsyncGenerator[TStreamEvent, None]¶ Stream all events that match the specified event type. This returns an
AsyncIterable[BaseEvent]
which can be consumed through anasync for
loop. An optionalnum_events
parameter can be passed to stop streaming after a maximum amount of events was received.
-
subscribe
(event_type: Type[TSubscribeEvent], handler: Callable[TSubscribeEvent, Union[Any, Awaitable[Any]]]) → lahja.common.Subscription¶ Subscribe to receive updates for any event that matches the specified event type. A handler is passed as a second argument an
Subscription
is returned to unsubscribe from the event if needed.
-
event_loop
¶
-
ipc_path
¶
-
is_running
¶
-
is_serving
¶
-
TrioEndpoint¶
-
class
lahja.trio.endpoint.
TrioEndpoint
(name: str)¶ Bases:
lahja.base.BaseEndpoint
-
broadcast
(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ Broadcast an instance of
BaseEvent
on the event bus. Takes an optional second parameter ofBroadcastConfig
to decide where this event should be broadcasted to. By default, events are broadcasted across all connected endpoints with their consuming call sites.
-
broadcast_nowait
(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ A sync compatible version of
broadcast()
Warning
Heavy use of
broadcast_nowait()
in contiguous blocks of code without yielding to the async implementation should be expected to cause problems.
-
connect_to_endpoints
(*endpoints) → None¶ Connect to the given endpoints and await until all connections are established.
-
get_subscribed_events
() → Set[Type[lahja.common.BaseEvent]]¶ Return the set of events this Endpoint is currently listening for
-
request
(item: lahja.common.BaseRequestResponseEvent[TResponse], config: Optional[lahja.common.BroadcastConfig] = None) → TResponse¶ Broadcast an instance of
BaseRequestResponseEvent
on the event bus and immediately wait on an expected answer of typeBaseEvent
. Optionally pass a second parameter ofBroadcastConfig
to decide where the request should be broadcasted to. By default, requests are broadcasted across all connected endpoints with their consuming call sites.
-
run
() → AsyncGenerator[lahja.base.EndpointAPI, None]¶ Context manager API for running endpoints.
async with endpoint.run() as endpoint: ... # endpoint running within context ... # endpoint stopped after
-
classmethod
serve
(config: lahja.common.ConnectionConfig) → AsyncIterator[TrioEndpoint]¶ Context manager API for running and endpoint server.
async with EndpointClass.serve(config): ... # server running within context ... # server stopped
-
stream
(event_type: Type[TStreamEvent], num_events: Optional[int] = None) → AsyncGenerator[TStreamEvent, None]¶ Stream all events that match the specified event type. This returns an
AsyncIterable[BaseEvent]
which can be consumed through anasync for
loop. An optionalnum_events
parameter can be passed to stop streaming after a maximum amount of events was received.
-
subscribe
(event_type: Type[TSubscribeEvent], handler: Callable[TSubscribeEvent, Union[Any, Awaitable[Any]]]) → lahja.common.Subscription¶ Subscribe to receive updates for any event that matches the specified event type. A handler is passed as a second argument an
Subscription
is returned to unsubscribe from the event if needed.
-
wait_started
() → None¶
-
wait_stopped
() → None¶
-
TResponse
= ~TResponse¶
-
TStreamEvent
= ~TStreamEvent¶
-
TSubscribeEvent
= ~TSubscribeEvent¶
-
is_running
¶
-
is_server_stopped
¶
-
is_serving
¶
-
is_stopped
¶
-
logger
= <Logger lahja.trio.TrioEndpoint (WARNING)>¶
-