Skip to content

Class peagen.tui.ws_client.TaskStreamClient

peagen.tui.ws_client.TaskStreamClient

TaskStreamClient(ws_url)

Consume gateway events from /ws/tasks.

Source code in peagen/tui/ws_client.py
14
15
16
17
18
19
20
21
def __init__(self, ws_url: str) -> None:
    self.ws_url = ws_url
    self.tasks: Dict[str, dict] = {}
    self.workers: Dict[str, dict] = {}
    self.queues: Dict[str, int] = {}
    self._callbacks: List[Callable[[dict], Awaitable[None]]] = []
    self._connection_callbacks: List[Callable[[bool, str], Awaitable[None]]] = []
    self.connected = False

ws_url instance-attribute

ws_url = ws_url

tasks instance-attribute

tasks = {}

workers instance-attribute

workers = {}

queues instance-attribute

queues = {}

connected instance-attribute

connected = False

on_event

on_event(cb)

Register cb to be awaited for every event.

Source code in peagen/tui/ws_client.py
23
24
25
def on_event(self, cb: Callable[[dict], Awaitable[None]]) -> None:
    """Register *cb* to be awaited for every event."""
    self._callbacks.append(cb)

on_connection_change

on_connection_change(cb)

Register callback for connection status changes.

PARAMETER DESCRIPTION
cb

Async callback that takes (is_connected, error_message)

TYPE: Callable[[bool, str], Awaitable[None]]

Source code in peagen/tui/ws_client.py
27
28
29
30
31
32
33
def on_connection_change(self, cb: Callable[[bool, str], Awaitable[None]]) -> None:
    """Register callback for connection status changes.

    Args:
        cb: Async callback that takes (is_connected, error_message)
    """
    self._connection_callbacks.append(cb)

listen async

listen()
Source code in peagen/tui/ws_client.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def listen(self) -> None:
    try:
        async with websockets.connect(self.ws_url) as ws:
            # Connection established
            await self._notify_connection_change(True)

            async for message in ws:
                try:
                    event = json.loads(message)
                except json.JSONDecodeError:
                    continue
                ev_type = event.get("type")
                data = event.get("data")
                if not isinstance(data, dict):
                    continue
                if ev_type == "task.update":
                    tid = data.get("id")
                    if tid:
                        if "time" in event:
                            data["time"] = event["time"]
                        self.tasks[tid] = data
                elif ev_type == "worker.update":
                    wid = data.get("id")
                    if wid:
                        self.workers[wid] = data
                elif ev_type == "queue.update":
                    pool = data.get("pool")
                    if pool:
                        self.queues[pool] = int(data.get("length", 0))
                for cb in self._callbacks:
                    await cb(event)
    except (
        OSError,
        websockets.exceptions.InvalidStatus,
        websockets.exceptions.ConnectionClosed,
    ) as e:
        # Report the disconnection with the error
        await self._notify_connection_change(False, str(e))