WorkerBase(
*,
pool=None,
gateway=None,
host=None,
port=None,
worker_id=None,
log_level=None,
api_key=None,
heartbeat_interval=5.0,
)
Minimal worker that registers with an Tigrbl-powered gateway,
exposes /rpc for Work.create
, executes the work locally, and
reports progress via Work.update
.
Source code in peagen/worker/base.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 | def __init__(
self,
*,
pool: str | None = None,
gateway: str | None = None,
host: str | None = None,
port: int | None = None,
worker_id: str | None = None,
log_level: str | None = None,
api_key: str | None = None,
heartbeat_interval: float = 5.0,
) -> None:
# ----- env / defaults --------------------------------------
self.pool = pool or os.getenv("PEAGEN_POOL", DEFAULT_POOL_NAME)
self.gateway = gateway or os.getenv("PEAGEN_GATEWAY", DEFAULT_GATEWAY)
self.worker_id = worker_id or os.getenv("PEAGEN_WORKER_ID", None)
self.port = port or int(os.getenv("PORT", 8001))
self.host = host or os.getenv("PEAGEN_HOST") or _local_ip()
self.listen_at = f"http://{self.host}:{self.port}/rpc"
self._api_key = api_key or os.getenv("PEAGEN_API_KEY")
lvl = (log_level or os.getenv("PEAGEN_LOG_LEVEL", "INFO")).upper()
level = getattr(logging, lvl, logging.INFO)
self.log = Logger(name="worker", default_level=level)
# ----- runtime objects -------------------------------------
self.app = FastAPI(title="Peagen Worker")
self._handlers: Dict[str, Callable[[Dict], Awaitable[Dict]]] = {}
headers = {"x-api-key": self._api_key} if self._api_key else None
self._client = TigrblClient(self.gateway, headers=headers)
self._http = httpx.AsyncClient(timeout=10.0)
self._hb_task: asyncio.Task | None = None
self._hb_every = heartbeat_interval
self.ready = False
# ----- JSON-RPC dispatcher --------------------------------
self.rpc = RPCDispatcher()
@self.rpc.method("Work.create")
async def _on_work_start(payload: dict) -> dict:
asyncio.create_task(self._run_work(payload))
return {"accepted": True}
# ----- FastAPI routes --------------------------------------
@self.app.post("/rpc", response_model=dict)
async def _rpc_ep(body: dict = Body(...)):
return await self.rpc.dispatch(body)
@self.app.get("/healthz")
async def _health():
if self.ready:
return {"status": "ok"}
raise HTTPException(503, "starting")
@self.app.get("/well-known")
async def _well_known():
return {"handlers": list(self._handlers)}
# lifecycle hooks
@self.app.on_event("startup")
async def _start():
await self._startup()
@self.app.on_event("shutdown")
async def _stop():
await self._shutdown()
|
pool
instance-attribute
pool = pool or getenv('PEAGEN_POOL', DEFAULT_POOL_NAME)
gateway
instance-attribute
gateway = gateway or getenv(
"PEAGEN_GATEWAY", DEFAULT_GATEWAY
)
worker_id
instance-attribute
worker_id = worker_id or getenv('PEAGEN_WORKER_ID', None)
port
instance-attribute
port = port or int(getenv('PORT', 8001))
host
instance-attribute
host = host or getenv('PEAGEN_HOST') or _local_ip()
listen_at
instance-attribute
listen_at = f'http://{host}:{port}/rpc'
log
instance-attribute
log = Logger(name='worker', default_level=level)
app
instance-attribute
app = FastAPI(title='Peagen Worker')
register_handler
register_handler(name, func)
Source code in peagen/worker/base.py
119
120
121
122
123
124
125 | def register_handler(
self, name: str, func: Callable[[dict], Awaitable[dict]]
) -> None:
if not asyncio.iscoroutinefunction(func):
raise TypeError("handler must be async")
self._handlers[name] = func
self.log.info("handler registered: %s", name)
|