Endpoint¶
Base Endpoint API¶
-
class
lahja.base.EndpointAPI¶ Bases:
abc.ABCThe
Endpointenables communication between different processes as well as within a single process via various event-driven APIs.-
are_all_endpoints_subscribed_to(event_type: Type[lahja.common.BaseEvent]) → bool¶ Return
Trueif every connected remote endpoint is subscribed to the specified event type from this endpoint. Otherwise returnFalse.
-
broadcast(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ Broadcast an instance of
BaseEventon the event bus. Takes an optional second parameter ofBroadcastConfigto decide where this event should be broadcasted to. By default, events are broadcasted across all connected endpoints with their consuming call sites.
-
broadcast_nowait(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ A sync compatible version of
broadcast()Warning
Heavy use of
broadcast_nowait()in contiguous blocks of code without yielding to the async implementation should be expected to cause problems.
-
connect_to_endpoints(*endpoints) → None¶ Establish connections to the given endpoints.
-
get_connected_endpoints_and_subscriptions() → Tuple[Tuple[str, Set[Type[lahja.common.BaseEvent]]], ...]¶ Return 2-tuples for all all connected endpoints containing the name of the endpoint coupled with the set of messages the endpoint subscribes to
-
get_subscribed_events() → Set[Type[lahja.common.BaseEvent]]¶ Return the set of event types this endpoint subscribes to.
-
is_any_endpoint_subscribed_to(event_type: Type[lahja.common.BaseEvent]) → bool¶ Return
Trueif at least one of the connected remote endpoints is subscribed to the specified event type from this endpoint. Otherwise returnFalse.
-
is_connected_to(endpoint_name: str) → bool¶ Return whether this endpoint is connected to another endpoint with the given name.
-
is_endpoint_subscribed_to(remote_endpoint: str, event_type: Type[lahja.common.BaseEvent]) → bool¶ Return
Trueif the specified remote endpoint is subscribed to the specified event type from this endpoint. Otherwise returnFalse.
-
request(item: lahja.common.BaseRequestResponseEvent[TResponse], config: Optional[lahja.common.BroadcastConfig] = None) → TResponse¶ Broadcast an instance of
BaseRequestResponseEventon the event bus and immediately wait on an expected answer of typeBaseEvent. Optionally pass a second parameter ofBroadcastConfigto decide where the request should be broadcasted to. By default, requests are broadcasted across all connected endpoints with their consuming call sites.
-
run() → AsyncContextManager[lahja.base.EndpointAPI]¶ Context manager API for running endpoints.
async with endpoint.run() as endpoint: ... # endpoint running within context ... # endpoint stopped after
-
classmethod
serve(config: lahja.common.ConnectionConfig) → AsyncContextManager[lahja.base.EndpointAPI]¶ Context manager API for running and endpoint server.
async with EndpointClass.serve(config): ... # server running within context ... # server stopped
-
stream(event_type: Type[TStreamEvent], num_events: Optional[int] = None) → AsyncGenerator[TStreamEvent, None]¶ Stream all events that match the specified event type. This returns an
AsyncIterable[BaseEvent]which can be consumed through anasync forloop. An optionalnum_eventsparameter can be passed to stop streaming after a maximum amount of events was received.
-
subscribe(event_type: Type[TSubscribeEvent], handler: Callable[TSubscribeEvent, Union[Any, Awaitable[Any]]]) → lahja.common.Subscription¶ Subscribe to receive updates for any event that matches the specified event type. A handler is passed as a second argument an
Subscriptionis returned to unsubscribe from the event if needed.
-
wait_for(event_type: Type[TWaitForEvent]) → TWaitForEvent¶ Wait for a single instance of an event that matches the specified event type.
-
wait_until_all_endpoints_subscribed_to(event: Type[lahja.common.BaseEvent], *, include_self: bool = True) → None¶ Block until all currently connected remote endpoints are subscribed to the specified event type from this endpoint.
-
wait_until_any_endpoint_subscribed_to(event: Type[lahja.common.BaseEvent]) → None¶ Block until any other remote endpoint has subscribed to the specified event type from this endpoint.
-
wait_until_connected_to(endpoint_name: str) → None¶ Return once a connection exists to an endpoint with the given name.
-
wait_until_connections_change() → None¶ Block until the set of connected remote endpoints changes.
-
wait_until_endpoint_subscribed_to(remote_endpoint: str, event: Type[lahja.common.BaseEvent]) → None¶ Block until the specified remote endpoint has subscribed to the specified event type from this endpoint.
-
wait_until_endpoint_subscriptions_change() → None¶ Block until any subscription change occurs on any remote endpoint or the set of remote endpoints changes
-
is_running¶
-
is_serving¶
-
name¶
-
-
class
lahja.base.BaseEndpoint(name: str)¶ Bases:
lahja.base.EndpointAPIBase class for endpoint implementations that implements shared/common logic
-
are_all_endpoints_subscribed_to(event_type: Type[lahja.common.BaseEvent], include_self: bool = True) → bool¶ Return
Trueif every connected remote endpoint is subscribed to the specified event type from this endpoint. Otherwise returnFalse.
-
get_connected_endpoints_and_subscriptions() → Tuple[Tuple[str, Set[Type[lahja.common.BaseEvent]]], ...]¶ Return all connected endpoints and their event type subscriptions to this endpoint.
-
is_any_endpoint_subscribed_to(event_type: Type[lahja.common.BaseEvent]) → bool¶ Return
Trueif at least one of the connected remote endpoints is subscribed to the specified event type from this endpoint. Otherwise returnFalse.
-
is_connected_to(endpoint_name: str) → bool¶ Return whether this endpoint is connected to another endpoint with the given name.
-
is_endpoint_subscribed_to(remote_endpoint: str, event_type: Type[lahja.common.BaseEvent]) → bool¶ Return
Trueif the specified remote endpoint is subscribed to the specified event type from this endpoint. Otherwise returnFalse.
-
maybe_raise_no_subscribers_exception(config: Optional[lahja.common.BroadcastConfig], event_type: Type[lahja.common.BaseEvent]) → None¶ Check the given
configandevent_typeand raise aNoSubscriberif no subscribers exist for theevent_typewhen they at least one subscriber is expected.
-
wait_for(event_type: Type[TWaitForEvent]) → TWaitForEvent¶ Wait for a single instance of an event that matches the specified event type.
-
wait_until_all_endpoints_subscribed_to(event: Type[lahja.common.BaseEvent], *, include_self: bool = True) → None¶ Block until all currently connected remote endpoints are subscribed to the specified event type from this endpoint.
-
wait_until_any_endpoint_subscribed_to(event: Type[lahja.common.BaseEvent]) → None¶ Block until any other remote endpoint has subscribed to the specified event type from this endpoint.
-
wait_until_connected_to(endpoint_name: str) → None¶ Return once a connection exists to an endpoint with the given name.
-
wait_until_connections_change() → None¶ Block until the set of connected remote endpoints changes.
-
wait_until_endpoint_subscribed_to(remote_endpoint: str, event: Type[lahja.common.BaseEvent]) → None¶ Block until the specified remote endpoint has subscribed to the specified event type from this endpoint.
-
wait_until_endpoint_subscriptions_change() → None¶ Block until any subscription change occurs on any remote endpoint or the set of remote endpoints changes
-
has_snappy_support= False¶
-
logger= <Logger lahja.endpoint.Endpoint (WARNING)>¶
-
AsyncioEndpoint¶
-
class
lahja.asyncio.endpoint.AsyncioEndpoint(name: str)¶ Bases:
lahja.base.BaseEndpointThe
AsyncioEndpointenables communication between different processes as well as within a single process via various event-driven APIs.-
broadcast(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ Broadcast an instance of
BaseEventon the event bus. Takes an optional second parameter ofBroadcastConfigto decide where this event should be broadcasted to. By default, events are broadcasted across all connected endpoints with their consuming call sites.
-
broadcast_nowait(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ A non-async
broadcast()(seebroadcast()for more)Instead of blocking the calling coroutine this function schedules the broadcast and immediately returns.
CAUTION: You probably don’t want to use this. broadcast() doesn’t return until the write socket has finished draining, meaning that the OS has accepted the message. This prevents us from sending more data than the remote process can handle. broadcast_nowait has no such backpressure. Even after the remote process stops accepting new messages this function will continue to accept them, which in the worst case could lead to runaway memory usage.
-
check_event_loop() → TFunc¶ All Endpoint methods must be called from the same event loop.
-
connect_to_endpoints(*endpoints) → None¶ Connect to the given endpoints and await until all connections are established.
-
get_subscribed_events() → Set[Type[lahja.common.BaseEvent]]¶ Return the set of events this Endpoint is currently listening for
-
request(item: lahja.common.BaseRequestResponseEvent[TResponse], config: Optional[lahja.common.BroadcastConfig] = None) → TResponse¶ Broadcast an instance of
BaseRequestResponseEventon the event bus and immediately wait on an expected answer of typeBaseEvent. Optionally pass a second parameter ofBroadcastConfigto decide where the request should be broadcasted to. By default, requests are broadcasted across all connected endpoints with their consuming call sites.
-
run() → AsyncIterator[lahja.base.EndpointAPI]¶ Context manager API for running endpoints.
async with endpoint.run() as endpoint: ... # endpoint running within context ... # endpoint stopped after
-
classmethod
serve(config: lahja.common.ConnectionConfig) → AsyncIterator[AsyncioEndpoint]¶ Context manager API for running and endpoint server.
async with EndpointClass.serve(config): ... # server running within context ... # server stopped
-
stream(event_type: Type[TStreamEvent], num_events: Optional[int] = None) → AsyncGenerator[TStreamEvent, None]¶ Stream all events that match the specified event type. This returns an
AsyncIterable[BaseEvent]which can be consumed through anasync forloop. An optionalnum_eventsparameter can be passed to stop streaming after a maximum amount of events was received.
-
subscribe(event_type: Type[TSubscribeEvent], handler: Callable[TSubscribeEvent, Union[Any, Awaitable[Any]]]) → lahja.common.Subscription¶ Subscribe to receive updates for any event that matches the specified event type. A handler is passed as a second argument an
Subscriptionis returned to unsubscribe from the event if needed.
-
event_loop¶
-
ipc_path¶
-
is_running¶
-
is_serving¶
-
TrioEndpoint¶
-
class
lahja.trio.endpoint.TrioEndpoint(name: str)¶ Bases:
lahja.base.BaseEndpoint-
broadcast(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ Broadcast an instance of
BaseEventon the event bus. Takes an optional second parameter ofBroadcastConfigto decide where this event should be broadcasted to. By default, events are broadcasted across all connected endpoints with their consuming call sites.
-
broadcast_nowait(item: lahja.common.BaseEvent, config: Optional[lahja.common.BroadcastConfig] = None) → None¶ A sync compatible version of
broadcast()Warning
Heavy use of
broadcast_nowait()in contiguous blocks of code without yielding to the async implementation should be expected to cause problems.
-
connect_to_endpoints(*endpoints) → None¶ Connect to the given endpoints and await until all connections are established.
-
get_subscribed_events() → Set[Type[lahja.common.BaseEvent]]¶ Return the set of events this Endpoint is currently listening for
-
request(item: lahja.common.BaseRequestResponseEvent[TResponse], config: Optional[lahja.common.BroadcastConfig] = None) → TResponse¶ Broadcast an instance of
BaseRequestResponseEventon the event bus and immediately wait on an expected answer of typeBaseEvent. Optionally pass a second parameter ofBroadcastConfigto decide where the request should be broadcasted to. By default, requests are broadcasted across all connected endpoints with their consuming call sites.
-
run() → AsyncGenerator[lahja.base.EndpointAPI, None]¶ Context manager API for running endpoints.
async with endpoint.run() as endpoint: ... # endpoint running within context ... # endpoint stopped after
-
classmethod
serve(config: lahja.common.ConnectionConfig) → AsyncIterator[TrioEndpoint]¶ Context manager API for running and endpoint server.
async with EndpointClass.serve(config): ... # server running within context ... # server stopped
-
stream(event_type: Type[TStreamEvent], num_events: Optional[int] = None) → AsyncGenerator[TStreamEvent, None]¶ Stream all events that match the specified event type. This returns an
AsyncIterable[BaseEvent]which can be consumed through anasync forloop. An optionalnum_eventsparameter can be passed to stop streaming after a maximum amount of events was received.
-
subscribe(event_type: Type[TSubscribeEvent], handler: Callable[TSubscribeEvent, Union[Any, Awaitable[Any]]]) → lahja.common.Subscription¶ Subscribe to receive updates for any event that matches the specified event type. A handler is passed as a second argument an
Subscriptionis returned to unsubscribe from the event if needed.
-
wait_started() → None¶
-
wait_stopped() → None¶
-
TResponse= ~TResponse¶
-
TStreamEvent= ~TStreamEvent¶
-
TSubscribeEvent= ~TSubscribeEvent¶
-
is_running¶
-
is_server_stopped¶
-
is_serving¶
-
is_stopped¶
-
logger= <Logger lahja.trio.TrioEndpoint (WARNING)>¶
-