Quickstart

Install the library

pip install lahja

Import Endpoint and BaseEvent

import asyncio
import logging
import multiprocessing

from lahja import BaseEvent, AsyncioEndpoint, ConnectionConfig


Setup application specific events

class BaseExampleEvent(BaseEvent):
    def __init__(self, payload):
        super().__init__()
        self.payload = payload
class FirstThingHappened(BaseExampleEvent):
    pass
class SecondThingHappened(BaseExampleEvent):
    pass

Setup first process to receive and broadcast events

def run_proc1():
    setup_logging()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(proc1_worker())
async def proc1_worker():
    async with AsyncioEndpoint.serve(ConnectionConfig.from_name("e1")) as server:
        server.subscribe(
            SecondThingHappened,
            lambda event: logging.info(
                "Received via SUBSCRIBE API in proc1: %s", event.payload
            ),
        )
        await server.wait_until_any_endpoint_subscribed_to(FirstThingHappened)

        while True:
            logging.info("Hello from proc1")
            await server.broadcast(FirstThingHappened("Hit from proc1"))
            await asyncio.sleep(2)

Setup second process to receive and broadcast events

def run_proc2():
    setup_logging()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(proc2_worker())
async def proc2_worker():
    config = ConnectionConfig.from_name("e1")
    async with AsyncioEndpoint("e2").run() as client:
        await client.connect_to_endpoints(config)
        asyncio.ensure_future(display_proc1_events(client))
        client.subscribe(
            FirstThingHappened,
            lambda event: logging.info(
                "Received via SUBSCRIBE API in proc2: %s", event.payload
            ),
        )
        await client.wait_until_any_endpoint_subscribed_to(SecondThingHappened)

        while True:
            logging.info("Hello from proc2")
            await client.broadcast(SecondThingHappened("Hit from proc2 "))
            await asyncio.sleep(2)

Start both processes

    p1 = multiprocessing.Process(target=run_proc1)
    p1.start()

    p2 = multiprocessing.Process(target=run_proc2)
    p2.start()
    p1.join()
    p2.join()

Running the examples

Example: Chatter between two processes

python examples/inter_process_ping_pong.py

The output will look like this:

INFO  05-29 11:31:45  Hello from proc2
INFO  05-29 11:31:45  Hello from proc1
INFO  05-29 11:31:45  Received via SUBSCRIBE API in proc2: Hit from proc1
INFO  05-29 11:31:45  Received via STREAM API in proc2: Hit from proc1
INFO  05-29 11:31:46  Hello from proc2
INFO  05-29 11:31:46  Received via SUBSCRIBE API in proc1: Hit from proc2
INFO  05-29 11:31:46  Hello from proc1
INFO  05-29 11:31:47  Hello from proc2
INFO  05-29 11:31:47  Hello from proc1
INFO  05-29 11:31:48  Hello from proc2
INFO  05-29 11:31:48  Received via SUBSCRIBE API in proc1: Hit from proc2
INFO  05-29 11:31:48  Hello from proc1
INFO  05-29 11:31:49  Hello from proc2
INFO  05-29 11:31:49  Hello from proc1
INFO  05-29 11:31:50  Hello from proc2
INFO  05-29 11:31:50  Received via SUBSCRIBE API in proc1: Hit from proc2
INFO  05-29 11:31:50  Hello from proc1
INFO  05-29 11:31:50  Received via SUBSCRIBE API in proc2: Hit from proc1
INFO  05-29 11:31:50  Received via STREAM API in proc2: Hit from proc1
INFO  05-29 11:31:51  Hello from proc2
INFO  05-29 11:31:51  Hello from proc1

Example: Request API

python examples/request_api.py

The output will look like this:

Requesting
Got answer: Yay
Requesting
Got answer: Yay
Requesting
Got answer: Yay