Skip to content

Class peagen.tui.app.RemoteBackend

peagen.tui.app.RemoteBackend

RemoteBackend(gateway_url)
Source code in peagen/tui/app.py
135
136
137
138
139
140
def __init__(self, gateway_url: str) -> None:
    self.rpc_url = gateway_url.rstrip("/") + "/rpc"
    self.http = httpx.AsyncClient(timeout=10.0)
    self.tasks: List[dict] = []
    self.workers: Dict[str, dict] = {}
    self.last_error: str | None = None

rpc_url instance-attribute

rpc_url = rstrip('/') + '/rpc'

http instance-attribute

http = AsyncClient(timeout=10.0)

tasks instance-attribute

tasks = []

workers instance-attribute

workers = {}

last_error instance-attribute

last_error = None

refresh async

refresh(limit=None, offset=0)
Source code in peagen/tui/app.py
142
143
144
145
146
147
148
149
150
151
152
async def refresh(self, limit: int | None = None, offset: int = 0) -> bool:
    try:
        await asyncio.gather(
            self.fetch_tasks(limit=limit, offset=offset), self.fetch_workers()
        )
    except Exception as exc:
        self.last_error = str(exc)
        return False
    else:
        self.last_error = None
        return True

fetch_tasks async

fetch_tasks(limit=None, offset=0)
Source code in peagen/tui/app.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
async def fetch_tasks(self, limit: int | None = None, offset: int = 0) -> None:
    params: dict[str, int | str] = {"poolName": "default"}
    if limit is not None:
        params["limit"] = int(limit)
    if offset:
        params["offset"] = int(offset)
    payload = {
        "jsonrpc": "2.0",
        "id": "1",
        "method": "Pool.listTasks",
        "params": params,
    }
    resp = await self.http.post(self.rpc_url, json=payload)
    resp.raise_for_status()
    self.tasks = resp.json().get("result", [])

fetch_workers async

fetch_workers()
Source code in peagen/tui/app.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
async def fetch_workers(self) -> None:
    payload = {
        "jsonrpc": "2.0",
        "id": "2",
        "method": "Worker.list",
        "params": {},
    }
    resp = await self.http.post(self.rpc_url, json=payload)
    resp.raise_for_status()
    raw_workers = resp.json().get("result", [])
    workers: Dict[str, dict] = {}
    for entry in raw_workers:
        info = {**entry}
        ts_raw = info.get("last_seen")
        try:
            ts = datetime.utcfromtimestamp(int(ts_raw)) if ts_raw else None
        except (TypeError, ValueError):
            ts = None
        info["last_seen"] = ts
        for k, v in list(info.items()):
            if isinstance(v, str) and (v.startswith("{") or v.startswith("[")):
                try:
                    info[k] = json.loads(v)
                except Exception:
                    pass
        workers[info["id"]] = info
    self.workers = workers