Skip to content

Class peagen.transport.jsonrpc.RPCDispatcher

peagen.transport.jsonrpc.RPCDispatcher

RPCDispatcher()

Ultra-light JSON-RPC 2.0 dispatcher.

Source code in peagen/transport/jsonrpc.py
25
26
def __init__(self) -> None:
    self._methods: Dict[str, Callable] = {}

method

method(name=None)
Source code in peagen/transport/jsonrpc.py
28
29
30
31
32
33
def method(self, name: str | None = None):
    def decorator(fn: Callable):
        self._methods[name or fn.__name__] = fn
        return fn

    return decorator

dispatch async

dispatch(req)
Source code in peagen/transport/jsonrpc.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def dispatch(self, req: dict | list) -> dict | list:
    if isinstance(req, list):
        return [await self.dispatch(r) for r in req]

    if req.get("jsonrpc") != "2.0" or "method" not in req:
        return self._error(-32600, "Invalid Request", req.get("id"))

    fn = self._methods.get(req["method"])
    if fn is None:
        return self._error(-32601, "Method not found", req["id"])

    try:
        params = req.get("params") or {}
        if isinstance(params, dict):
            result = fn(**params)
        else:
            result = fn(params)
        if inspect.isawaitable(result):
            result = await result
        if isinstance(result, BaseModel):
            result = result.model_dump()
        return {"jsonrpc": "2.0", "result": result, "id": req["id"]}
    except RPCException as exc:
        return {"jsonrpc": "2.0", "error": exc.as_error(), "id": req["id"]}
    except Exception as exc:  # noqa: BLE001
        return self._error(-32000, str(exc), req["id"])