Quickstart

Install the library

pip install lahja

Import Endpoint and BaseEvent

import asyncio
import multiprocessing
import time

from lahja import (
    BaseEvent,
    Endpoint,
    ConnectionConfig,
)


Setup application specific events

class BaseExampleEvent(BaseEvent):

    def __init__(self, payload):
        super().__init__()
        self.payload = payload
class FirstThingHappened(BaseExampleEvent):
    pass
class SecondThingHappened(BaseExampleEvent):
    pass

Setup first process to receive and broadcast events

def run_proc1():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(proc1_worker(endpoint))
async def proc1_worker(endpoint):
    endpoint = Endpoint()
    await endpoint.start_serving(ConnectionConfig.from_name('e1'))
    await endpoint.connect_to_endpoints(
        ConnectionConfig.from_name('e2')
    )
    endpoint.subscribe(SecondThingHappened, lambda event: 
        print("Received via SUBSCRIBE API in proc1: ", event.payload)
    )
    endpoint.subscribe(FirstThingHappened, lambda event: 
        print("Receiving own event: ", event.payload)
    )

    while True:
        print("Hello from proc1")
        if is_nth_second(5):
            await endpoint.broadcast(
                FirstThingHappened("Hit from proc1 ({})".format(time.time()))
            )
        await asyncio.sleep(1)

Setup second process to receive and broadcast events

def run_proc2():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(proc2_worker())
async def proc2_worker():
    endpoint = Endpoint()
    await endpoint.start_serving(ConnectionConfig.from_name('e2'))
    await endpoint.connect_to_endpoints(
        ConnectionConfig.from_name('e1')
    )
    asyncio.ensure_future(display_proc1_events(endpoint))
    endpoint.subscribe(FirstThingHappened, lambda event: 
        print("Received via SUBSCRIBE API in proc2:", event.payload)
    )
    while True:
        print("Hello from proc2")
        if is_nth_second(2):
            await endpoint.broadcast(
                SecondThingHappened("Hit from proc2 ({})".format(time.time()))
            )
        await asyncio.sleep(1)

Start both processes

    p1 = multiprocessing.Process(target=run_proc1)
    p1.start()

    p2 = multiprocessing.Process(target=run_proc2)
    p2.start()

Running the examples

Example: Chatter between two processes

python examples/inter_process_ping_pong.py

The output will look like this:

Hello from proc1
Hello from proc2
Received via SUBSCRIBE API in proc1:  Hit from proc2 (1533887068.9261594)
Hello from proc1
Hello from proc2
Hello from proc1
Hello from proc2
Received via SUBSCRIBE API in proc1:  Hit from proc2 (1533887070.9296985)
Receiving own event:  Hit from proc1 (1533887070.9288142)
Received via SUBSCRIBE API in proc2: Hit from proc1 (1533887070.9288142)
Received via STREAM API in proc2:  Hit from proc1 (1533887070.9288142)
Hello from proc1
Hello from proc2
Hello from proc1
Hello from proc2
Received via SUBSCRIBE API in proc1:  Hit from proc2 (1533887072.9331954)
Hello from proc1
Hello from proc2
Hello from proc1
Hello from proc2
Received via SUBSCRIBE API in proc1:  Hit from proc2 (1533887074.937018)
Hello from proc1
Hello from proc2
Received via SUBSCRIBE API in proc2: Hit from proc1 (1533887075.9378386)
Received via STREAM API in proc2:  Hit from proc1 (1533887075.9378386)
Receiving own event:  Hit from proc1 (1533887075.9378386)

Example: Request API

python examples/request_api.py

The output will look like this:

Requesting
Got answer: Yay
Requesting
Got answer: Yay
Requesting
Got answer: Yay