Source code for swarmauri.community.tools.concrete.ZapierHookTool
import json
import requests
from typing import Dict
from swarmauri.standard.tools.base.ToolBase import ToolBase
from swarmauri.standard.tools.concrete.Parameter import Parameter
[docs]
class ZapierHookTool(ToolBase):
def __init__(self, auth_token, zap_id):
parameters = [
Parameter(
name="payload",
type="string",
description="A Payload to send when triggering the Zapier webhook",
required=True
)
]
super().__init__(name="ZapierTool",
description="Tool to authenticate with Zapier and execute zaps.",
parameters=parameters)
self._auth_token = auth_token
self._zap_id = zap_id
[docs]
def authenticate(self):
"""Set up the necessary headers for authentication."""
self.headers = {
"Authorization": f"Bearer {self._auth_token}",
"Content-Type": "application/json"
}
[docs]
def execute_zap(self, payload: str):
"""Execute a zap with given payload.
Args:
zap_id (str): The unique identifier for the Zap to trigger.
payload (dict): The data payload to send to the Zap.
Returns:
dict: The response from Zapier API.
"""
self.authenticate()
response = requests.post(f'https://hooks.zapier.com/hooks/catch/{self._zap_id}/',
headers=self.headers, json={"data":payload})
# Checking the HTTP response status for success or failure
if response.status_code == 200:
return json.dumps(response.json())
else:
response.raise_for_status() # This will raise an error for non-200 responses
def __call__(self, payload: str):
"""Enable the tool to be called with zap_id and payload directly."""
return self.execute_zap(payload)