Source code for swarmauri.community.tools.concrete.ZapierHookTool

import json
import requests
from typing import Dict
from swarmauri.standard.tools.base.ToolBase import ToolBase
from swarmauri.standard.tools.concrete.Parameter import Parameter


[docs] class ZapierHookTool(ToolBase): def __init__(self, auth_token, zap_id): parameters = [ Parameter( name="payload", type="string", description="A Payload to send when triggering the Zapier webhook", required=True ) ] super().__init__(name="ZapierTool", description="Tool to authenticate with Zapier and execute zaps.", parameters=parameters) self._auth_token = auth_token self._zap_id = zap_id
[docs] def authenticate(self): """Set up the necessary headers for authentication.""" self.headers = { "Authorization": f"Bearer {self._auth_token}", "Content-Type": "application/json" }
[docs] def execute_zap(self, payload: str): """Execute a zap with given payload. Args: zap_id (str): The unique identifier for the Zap to trigger. payload (dict): The data payload to send to the Zap. Returns: dict: The response from Zapier API. """ self.authenticate() response = requests.post(f'https://hooks.zapier.com/hooks/catch/{self._zap_id}/', headers=self.headers, json={"data":payload}) # Checking the HTTP response status for success or failure if response.status_code == 200: return json.dumps(response.json()) else: response.raise_for_status() # This will raise an error for non-200 responses
def __call__(self, payload: str): """Enable the tool to be called with zap_id and payload directly.""" return self.execute_zap(payload)