Skip to content

Class peagen.plugins.PluginManager

peagen.plugins.PluginManager

PluginManager(cfg)

Centralised plugin loader for peagen plugins.

Source code in peagen/plugins/__init__.py
200
201
202
203
204
205
def __init__(self, cfg: Dict[str, Any]) -> None:
    plugins_cfg = cfg.get("plugins", {})
    mode = plugins_cfg.get("mode", "fan-out")
    switch_map = plugins_cfg.get("switch", {})
    discover_and_register_plugins(mode=mode, switch_map=switch_map)
    self.cfg = cfg

GROUP_CONFIG class-attribute instance-attribute

GROUP_CONFIG = {
    "storage_adapters": {
        "section": "storage",
        "items": "adapters",
        "default": "default_storage_adapter",
    },
    "git_filters": {
        "section": "storage",
        "items": "filters",
        "default": "default_filter",
    },
    "publishers": {
        "section": "publishers",
        "items": "adapters",
        "default": "default_publisher",
    },
    "queues": {
        "section": "queues",
        "items": "adapters",
        "default": "default_queue",
    },
    "evaluators": {
        "section": "evaluation",
        "items": "evaluators",
        "default": "default_evaluator",
    },
    "evaluator_pools": {
        "section": "evaluation",
        "single": "pool",
        "default": None,
    },
    "template_sets": {
        "section": "workspace",
        "items": "template_sets",
        "default": "template_set",
    },
    "mutators": {
        "section": "mutation",
        "items": "mutators",
        "default": "default_mutator",
    },
    "programs": {
        "section": "programs",
        "items": "plugins",
        "default": "default_program",
    },
    "vcs": {
        "section": "vcs",
        "items": "adapters",
        "default": "default_vcs",
    },
    "selectors": {
        "section": "selectors",
        "items": "plugins",
        "default": "default_selector",
    },
    "llms": {
        "section": "llm",
        "default": "default_provider",
    },
    "cryptos": {
        "section": "cryptos",
        "items": "adapters",
        "default": "default_crypto",
    },
    "key_providers": {
        "section": "key_providers",
        "items": "providers",
        "default": "default_provider",
    },
}

cfg instance-attribute

cfg = cfg

get

get(group, name=None)

Return a single plugin instance from group.

Source code in peagen/plugins/__init__.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def get(self, group: str, name: Optional[str] = None) -> Any:
    """Return a single plugin instance from ``group``."""

    layout = self._layout(group)
    cfg = self._group_cfg(group)

    if group == "llms":
        provider = name or cfg.get("default_provider")
        if not provider:
            raise KeyError("No plugin name provided for group 'llms'")
        params = cfg.get(provider, {})
        api_key = params.get("API_KEY") or params.get("api_key")
        model_name = cfg.get("default_model_name")
        temperature = cfg.get("default_temperature")
        max_tokens = cfg.get("default_max_tokens")
        from peagen.core._llm import GenericLLM

        llm_mgr = GenericLLM()
        kwargs: Dict[str, Any] = {}
        if temperature is not None:
            kwargs["temperature"] = temperature
        if max_tokens is not None:
            kwargs["max_tokens"] = max_tokens
        return llm_mgr.get_llm(
            provider=provider,
            api_key=api_key,
            model_name=model_name,
            **kwargs,
        )

    if "single" in layout:
        ref = name or cfg.get(layout["single"])
        if not ref:
            raise KeyError(f"No plugin configured for group '{group}'")
        params = cfg.get(f"{layout['single']}_params", {})
        return self._instantiate(self._resolve_spec(group, ref), params)

    items = cfg.get(layout.get("items", "plugins"), {})
    if name is None:
        default_key = layout.get("default")
        if default_key and default_key in cfg:
            name = cfg.get(default_key)
            if name is None:
                return None
        else:
            name = cfg.get(default_key) if default_key else None
    if not name:
        raise KeyError(f"No plugin name provided for group '{group}'")
    params = items.get(name, {})
    return self._instantiate(self._resolve_spec(group, name), params)

all

all(group)

Instantiate every configured plugin for group.

Source code in peagen/plugins/__init__.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def all(self, group: str) -> Dict[str, Any]:
    """Instantiate every configured plugin for ``group``."""

    layout = self._layout(group)
    cfg = self._group_cfg(group)

    if "single" in layout:
        ref = cfg.get(layout["single"])
        if not ref:
            return {}
        params = cfg.get(f"{layout['single']}_params", {})
        return {ref: self._instantiate(self._resolve_spec(group, ref), params)}

    items = cfg.get(layout.get("items", "plugins"), {})
    out: Dict[str, Any] = {}
    for name, params in items.items():
        out[name] = self._instantiate(self._resolve_spec(group, name), params)
    return out