Skip to content

Class peagen.worker.PeagenWorker

peagen.worker.PeagenWorker

PeagenWorker()

Bases: WorkerBase

Source code in peagen/worker/__init__.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(self):
    # Let WorkerBase pick up ENV or defaults for pool/gateway/host/port
    super().__init__()
    # Register all handlers you want this worker to support:
    self.register_handler("doe", doe_handler)
    self.register_handler("doe_process", doe_process_handler)
    self.register_handler("eval", eval_handler)
    self.register_handler("fetch", fetch_handler)
    self.register_handler("process", process_handler)
    self.register_handler("sort", sort_handler)
    self.register_handler("mutate", mutate_handler)
    self.register_handler("evolve", evolve_handler)
    # self.register_handler("login", login_handler)
    self.register_handler("keys", keys_handler)
    self.register_handler("secrets", secrets_handler)

pool instance-attribute

pool = pool or getenv('PEAGEN_POOL', DEFAULT_POOL_NAME)

gateway instance-attribute

gateway = gateway or getenv(
    "PEAGEN_GATEWAY", DEFAULT_GATEWAY
)

worker_id instance-attribute

worker_id = worker_id or getenv('PEAGEN_WORKER_ID', None)

port instance-attribute

port = port or int(getenv('PORT', 8001))

host instance-attribute

host = host or getenv('PEAGEN_HOST') or _local_ip()

listen_at instance-attribute

listen_at = f'http://{host}:{port}/rpc'

log instance-attribute

log = Logger(name='worker', default_level=level)

app instance-attribute

app = FastAPI(title='Peagen Worker')

ready instance-attribute

ready = False

rpc instance-attribute

rpc = RPCDispatcher()

register_handler

register_handler(name, func)
Source code in peagen/worker/base.py
119
120
121
122
123
124
125
def register_handler(
    self, name: str, func: Callable[[dict], Awaitable[dict]]
) -> None:
    if not asyncio.iscoroutinefunction(func):
        raise TypeError("handler must be async")
    self._handlers[name] = func
    self.log.info("handler registered: %s", name)