Ultra-light JSON-RPC 2.0 dispatcher.
Source code in peagen/transport/jsonrpc.py
| def __init__(self) -> None:
self._methods: Dict[str, Callable] = {}
|
method
Source code in peagen/transport/jsonrpc.py
| def method(self, name: str | None = None):
def decorator(fn: Callable):
self._methods[name or fn.__name__] = fn
return fn
return decorator
|
dispatch
async
Source code in peagen/transport/jsonrpc.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 | async def dispatch(self, req: dict | list) -> dict | list:
if isinstance(req, list):
return [await self.dispatch(r) for r in req]
if req.get("jsonrpc") != "2.0" or "method" not in req:
return self._error(-32600, "Invalid Request", req.get("id"))
fn = self._methods.get(req["method"])
if fn is None:
return self._error(-32601, "Method not found", req["id"])
try:
params = req.get("params") or {}
if isinstance(params, dict):
result = fn(**params)
else:
result = fn(params)
if inspect.isawaitable(result):
result = await result
if isinstance(result, BaseModel):
result = result.model_dump()
return {"jsonrpc": "2.0", "result": result, "id": req["id"]}
except RPCException as exc:
return {"jsonrpc": "2.0", "error": exc.as_error(), "id": req["id"]}
except Exception as exc: # noqa: BLE001
return self._error(-32000, str(exc), req["id"])
|