Skip to content

Class peagen.worker.base.WorkerBase

peagen.worker.base.WorkerBase

WorkerBase(
    *,
    pool=None,
    gateway=None,
    host=None,
    port=None,
    worker_id=None,
    log_level=None,
    api_key=None,
    heartbeat_interval=5.0,
)

Minimal worker that registers with an Tigrbl-powered gateway, exposes /rpc for Work.create, executes the work locally, and reports progress via Work.update.

Source code in peagen/worker/base.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def __init__(
    self,
    *,
    pool: str | None = None,
    gateway: str | None = None,
    host: str | None = None,
    port: int | None = None,
    worker_id: str | None = None,
    log_level: str | None = None,
    api_key: str | None = None,
    heartbeat_interval: float = 5.0,
) -> None:
    # ----- env / defaults --------------------------------------
    self.pool = pool or os.getenv("PEAGEN_POOL", DEFAULT_POOL_NAME)
    self.gateway = gateway or os.getenv("PEAGEN_GATEWAY", DEFAULT_GATEWAY)
    self.worker_id = worker_id or os.getenv("PEAGEN_WORKER_ID", None)
    self.port = port or int(os.getenv("PORT", 8001))
    self.host = host or os.getenv("PEAGEN_HOST") or _local_ip()
    self.listen_at = f"http://{self.host}:{self.port}/rpc"
    self._api_key = api_key or os.getenv("PEAGEN_API_KEY")

    lvl = (log_level or os.getenv("PEAGEN_LOG_LEVEL", "INFO")).upper()
    level = getattr(logging, lvl, logging.INFO)
    self.log = Logger(name="worker", default_level=level)

    # ----- runtime objects -------------------------------------
    self.app = FastAPI(title="Peagen Worker")
    self._handlers: Dict[str, Callable[[Dict], Awaitable[Dict]]] = {}
    headers = {"x-api-key": self._api_key} if self._api_key else None
    self._client = TigrblClient(self.gateway, headers=headers)
    self._http = httpx.AsyncClient(timeout=10.0)
    self._hb_task: asyncio.Task | None = None
    self._hb_every = heartbeat_interval
    self.ready = False

    # ----- JSON-RPC dispatcher --------------------------------
    self.rpc = RPCDispatcher()

    @self.rpc.method("Work.create")
    async def _on_work_start(payload: dict) -> dict:
        asyncio.create_task(self._run_work(payload))
        return {"accepted": True}

    # ----- FastAPI routes --------------------------------------
    @self.app.post("/rpc", response_model=dict)
    async def _rpc_ep(body: dict = Body(...)):
        return await self.rpc.dispatch(body)

    @self.app.get("/healthz")
    async def _health():
        if self.ready:
            return {"status": "ok"}
        raise HTTPException(503, "starting")

    @self.app.get("/well-known")
    async def _well_known():
        return {"handlers": list(self._handlers)}

    # lifecycle hooks
    @self.app.on_event("startup")
    async def _start():
        await self._startup()

    @self.app.on_event("shutdown")
    async def _stop():
        await self._shutdown()

pool instance-attribute

pool = pool or getenv('PEAGEN_POOL', DEFAULT_POOL_NAME)

gateway instance-attribute

gateway = gateway or getenv(
    "PEAGEN_GATEWAY", DEFAULT_GATEWAY
)

worker_id instance-attribute

worker_id = worker_id or getenv('PEAGEN_WORKER_ID', None)

port instance-attribute

port = port or int(getenv('PORT', 8001))

host instance-attribute

host = host or getenv('PEAGEN_HOST') or _local_ip()

listen_at instance-attribute

listen_at = f'http://{host}:{port}/rpc'

log instance-attribute

log = Logger(name='worker', default_level=level)

app instance-attribute

app = FastAPI(title='Peagen Worker')

ready instance-attribute

ready = False

rpc instance-attribute

rpc = RPCDispatcher()

register_handler

register_handler(name, func)
Source code in peagen/worker/base.py
119
120
121
122
123
124
125
def register_handler(
    self, name: str, func: Callable[[dict], Awaitable[dict]]
) -> None:
    if not asyncio.iscoroutinefunction(func):
        raise TypeError("handler must be async")
    self._handlers[name] = func
    self.log.info("handler registered: %s", name)