Skip to content

Class swarmauri_core.task_mgmt_strategies.ITaskMgmtStrategy.ITaskMgmtStrategy

swarmauri_core.task_mgmt_strategies.ITaskMgmtStrategy.ITaskMgmtStrategy

Bases: ABC

Abstract base class for TaskStrategy.

assign_task abstractmethod

assign_task(task, agent_factory, service_registry)

Abstract method to assign a task to a service.

Source code in swarmauri_core/task_mgmt_strategies/ITaskMgmtStrategy.py
 8
 9
10
11
12
13
14
15
@abstractmethod
def assign_task(
    self, task: Dict[str, Any], agent_factory: Callable, service_registry: Callable
) -> str:
    """
    Abstract method to assign a task to a service.
    """
    pass

add_task abstractmethod

add_task(task)

Abstract method to add a task to the task queue.

Source code in swarmauri_core/task_mgmt_strategies/ITaskMgmtStrategy.py
17
18
19
20
21
22
@abstractmethod
def add_task(self, task: Dict[str, Any]) -> None:
    """
    Abstract method to add a task to the task queue.
    """
    pass

remove_task abstractmethod

remove_task(task_id)

Abstract method to remove a task from the task queue.

Source code in swarmauri_core/task_mgmt_strategies/ITaskMgmtStrategy.py
24
25
26
27
28
29
@abstractmethod
def remove_task(self, task_id: str) -> None:
    """
    Abstract method to remove a task from the task queue.
    """
    pass

get_task abstractmethod

get_task(task_id)

Abstract method to get a task from the task queue.

Source code in swarmauri_core/task_mgmt_strategies/ITaskMgmtStrategy.py
31
32
33
34
35
36
@abstractmethod
def get_task(self, task_id: str) -> Dict[str, Any]:
    """
    Abstract method to get a task from the task queue.
    """
    pass

process_tasks abstractmethod

process_tasks(task)

Abstract method to process a task.

Source code in swarmauri_core/task_mgmt_strategies/ITaskMgmtStrategy.py
38
39
40
41
42
43
@abstractmethod
def process_tasks(self, task: Dict[str, Any]) -> None:
    """
    Abstract method to process a task.
    """
    pass