WorkerBase(
    *,
    pool=None,
    gateway=None,
    host=None,
    port=None,
    worker_id=None,
    log_level=None,
    api_key=None,
    heartbeat_interval=5.0,
)
        Minimal worker that registers with an Tigrbl-powered gateway,
exposes /rpc for Work.create, executes the work locally, and
reports progress via Work.update.
                  
                    Source code in peagen/worker/base.py
                    |  51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 | def __init__(
    self,
    *,
    pool: str | None = None,
    gateway: str | None = None,
    host: str | None = None,
    port: int | None = None,
    worker_id: str | None = None,
    log_level: str | None = None,
    api_key: str | None = None,
    heartbeat_interval: float = 5.0,
) -> None:
    # ----- env / defaults --------------------------------------
    self.pool = pool or os.getenv("PEAGEN_POOL", DEFAULT_POOL_NAME)
    self.gateway = gateway or os.getenv("PEAGEN_GATEWAY", DEFAULT_GATEWAY)
    self.worker_id = worker_id or os.getenv("PEAGEN_WORKER_ID", None)
    self.port = port or int(os.getenv("PORT", 8001))
    self.host = host or os.getenv("PEAGEN_HOST") or _local_ip()
    self.listen_at = f"http://{self.host}:{self.port}/rpc"
    self._api_key = api_key or os.getenv("PEAGEN_API_KEY")
    lvl = (log_level or os.getenv("PEAGEN_LOG_LEVEL", "INFO")).upper()
    level = getattr(logging, lvl, logging.INFO)
    self.log = Logger(name="worker", default_level=level)
    # ----- runtime objects -------------------------------------
    self.app = FastAPI(title="Peagen Worker")
    self._handlers: Dict[str, Callable[[Dict], Awaitable[Dict]]] = {}
    headers = {"x-api-key": self._api_key} if self._api_key else None
    self._client = TigrblClient(self.gateway, headers=headers)
    self._http = httpx.AsyncClient(timeout=10.0)
    self._hb_task: asyncio.Task | None = None
    self._hb_every = heartbeat_interval
    self.ready = False
    # ----- JSON-RPC dispatcher --------------------------------
    self.rpc = RPCDispatcher()
    @self.rpc.method("Work.create")
    async def _on_work_start(payload: dict) -> dict:
        asyncio.create_task(self._run_work(payload))
        return {"accepted": True}
    # ----- FastAPI routes --------------------------------------
    @self.app.post("/rpc", response_model=dict)
    async def _rpc_ep(body: dict = Body(...)):
        return await self.rpc.dispatch(body)
    @self.app.get("/healthz")
    async def _health():
        if self.ready:
            return {"status": "ok"}
        raise HTTPException(503, "starting")
    @self.app.get("/well-known")
    async def _well_known():
        return {"handlers": list(self._handlers)}
    # lifecycle hooks
    @self.app.on_event("startup")
    async def _start():
        await self._startup()
    @self.app.on_event("shutdown")
    async def _stop():
        await self._shutdown()
 | 
 
  
instance-attribute
  
pool = pool or getenv('PEAGEN_POOL', DEFAULT_POOL_NAME)
    
 
instance-attribute
  
gateway = gateway or getenv(
    "PEAGEN_GATEWAY", DEFAULT_GATEWAY
)
    
 
instance-attribute
  
worker_id = worker_id or getenv('PEAGEN_WORKER_ID', None)
    
 
instance-attribute
  
port = port or int(getenv('PORT', 8001))
    
 
instance-attribute
  
host = host or getenv('PEAGEN_HOST') or _local_ip()
    
 
instance-attribute
  
listen_at = f'http://{host}:{port}/rpc'
    
 
instance-attribute
  
log = Logger(name='worker', default_level=level)
    
 
instance-attribute
  
app = FastAPI(title='Peagen Worker')
    
 
register_handler(name, func)
            
              Source code in peagen/worker/base.py
              | 119
120
121
122
123
124
125 | def register_handler(
    self, name: str, func: Callable[[dict], Awaitable[dict]]
) -> None:
    if not asyncio.iscoroutinefunction(func):
        raise TypeError("handler must be async")
    self._handlers[name] = func
    self.log.info("handler registered: %s", name)
 |