Skip to content

Class swarmauri_base.transports.runnable_mixin.RunnableMixin

swarmauri_base.transports.runnable_mixin.RunnableMixin

Bases: IRunnable, BaseModel

Adds synchronous and asynchronous run helpers to a transport.

model_config class-attribute instance-attribute

model_config = ConfigDict(
    arbitrary_types_allowed=True, extra="ignore"
)

run

run(
    *,
    role,
    handler=None,
    forever=False,
    stop_event=None,
    handler_kwargs=None,
    **ctx_kwargs,
)
Source code in swarmauri_base/transports/runnable_mixin.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def run(
    self,
    *,
    role: Role,
    handler: Handler | None = None,
    forever: bool = False,
    stop_event: Optional[asyncio.Event] = None,
    handler_kwargs: Optional[dict[str, Any]] = None,
    **ctx_kwargs: Any,
) -> None:
    asyncio.run(
        self._run_async(
            role=role,
            handler=handler,
            forever=forever,
            stop_event=stop_event,
            handler_kwargs=handler_kwargs,
            **ctx_kwargs,
        )
    )