Python SDK¶
The Moltler Python SDK provides a high-level Python interface for managing skills, connectors, and agents.
Installation¶
# From the sdk directory
cd elastic-script/sdk
pip install -e .
# Or install from pip (when published)
pip install moltler
Quick Start¶
from moltler import Moltler
# Connect to Moltler
client = Moltler(
host="localhost",
port=9200,
username="elastic-admin",
password="elastic-password"
)
# Test connection
client.connect()
print(f"Connected to {client.cluster_name}")
# List skills
for skill in client.skills.list():
print(f"{skill.name} v{skill.version}")
# Execute a skill
result = client.skills.execute("check_health", service="api-gateway")
print(result)
# Query a connector
issues = client.connectors.query(
"github_ops",
"issues",
where="state = 'open'",
limit=10
)
for issue in issues:
print(f"{issue['number']}: {issue['title']}")
Skills API¶
List Skills¶
skills = client.skills.list()
for skill in skills:
print(f"{skill.name} v{skill.version}: {skill.description}")
Get Skill Details¶
skill = client.skills.get("check_health")
print(f"Parameters: {skill.parameters}")
print(f"Returns: {skill.return_type}")
print(f"Tests: {len(skill.tests)} defined")
Call Skill¶
# Positional parameters
result = client.skills.call("check_health", "api-gateway")
# Named parameters
result = client.skills.call(
"analyze_logs",
index_pattern="logs-*",
time_range="24h",
min_occurrences=10
)
# Async call
import asyncio
async def check_all_services():
tasks = [
client.skills.call_async("check_health", service=s)
for s in ["api", "web", "worker"]
]
return await asyncio.gather(*tasks)
results = asyncio.run(check_all_services())
Create Skill¶
client.skills.create("""
CREATE SKILL my_python_skill
VERSION '1.0.0'
DESCRIPTION 'Created from Python'
PARAMETERS (name STRING)
RETURNS STRING
BEGIN
RETURN 'Hello, ' || name || '!';
END SKILL;
""")
Generate Skill with AI¶
skill_code = client.skills.generate(
description="Monitor disk usage and alert when above 80%",
parameters={
"threshold_percent": {"type": "number", "default": 80}
},
include_tests=True
)
print(skill_code)
client.skills.create(skill_code)
Connectors API¶
List Connectors¶
connectors = client.connectors.list()
for conn in connectors:
print(f"{conn.name} ({conn.type}): {conn.status}")
Query Connector¶
# Simple query
issues = client.connectors.query("github_org.issues")
# With filters
issues = client.connectors.query(
"github_org.issues",
where={
"state": "open",
"labels": {"contains": "bug"}
},
order_by="created_at DESC",
limit=50
)
# Iterate results
for issue in issues:
print(issue)
Execute Action¶
result = client.connectors.execute(
"github_org.issues.create",
{
"repo": "myrepo",
"title": "Issue from Python SDK",
"body": "Created programmatically",
"labels": ["automated"]
}
)
print(f"Created issue: {result['number']}")
Create Connector¶
client.connectors.create(
name="my_github",
type="github",
config={
"token": "{{secrets.github_token}}",
"org": "mycompany"
}
)
Agents API¶
List Agents¶
Start/Stop Agent¶
Trigger Agent¶
# Manual trigger
execution = client.agents.trigger("incident_responder", {
"alert_id": "alert-123",
"severity": "high"
})
print(f"Execution ID: {execution.id}")
Monitor Execution¶
execution = client.agents.get_execution("exec-abc123")
print(f"Status: {execution.status}")
print(f"Duration: {execution.duration}")
for step in execution.steps:
print(f" {step.skill}: {step.status}")
Handle Approvals¶
# Get pending approvals
approvals = client.agents.get_pending_approvals()
for approval in approvals:
print(f"Agent: {approval.agent}")
print(f"Action: {approval.action}")
print(f"Details: {approval.details}")
# Approve or reject
if input("Approve? (y/n): ").lower() == "y":
client.agents.approve(approval.id)
else:
client.agents.reject(approval.id, reason="Not appropriate")
Execute Raw elastic-script¶
# Execute any elastic-script
result = client.execute("""
DECLARE total = 0;
FOR i IN 1..10 LOOP
SET total = total + i;
END LOOP;
RETURN total;
""")
print(result) # 55
Context Manager¶
from moltler import Moltler
with Moltler(host="localhost:9200") as client:
result = client.skills.call("check_health")
# Connection automatically closed
Async Client¶
from moltler import AsyncMoltler
import asyncio
async def main():
async with AsyncMoltler(host="localhost:9200") as client:
# Parallel skill calls
results = await asyncio.gather(
client.skills.call("check_health", service="api"),
client.skills.call("check_health", service="web"),
client.skills.call("check_health", service="worker"),
)
for result in results:
print(result)
asyncio.run(main())
Events & Callbacks¶
# Subscribe to agent events
def on_approval_needed(event):
print(f"Approval needed: {event.action}")
# Auto-approve low-risk actions
if event.risk_level == "low":
event.approve()
client.events.subscribe("approval_needed", on_approval_needed)
# Subscribe to execution events
def on_execution_complete(event):
print(f"Execution {event.execution_id} completed: {event.status}")
if event.status == "failed":
# Handle failure
send_alert(event)
client.events.subscribe("execution_complete", on_execution_complete)
# Keep listening
client.events.listen()
Error Handling¶
from moltler import Moltler, MoltlerError, SkillNotFoundError, ExecutionError
try:
result = client.skills.call("nonexistent_skill")
except SkillNotFoundError as e:
print(f"Skill not found: {e.skill_name}")
except ExecutionError as e:
print(f"Execution failed: {e.message}")
print(f"Details: {e.details}")
except MoltlerError as e:
print(f"General error: {e}")
Configuration¶
Environment Variables¶
export MOLTLER_HOST=localhost:9200
export MOLTLER_USERNAME=elastic-admin
export MOLTLER_PASSWORD=elastic-password
Configuration File¶
# ~/.moltler/config.yaml
default:
host: localhost:9200
username: elastic-admin
password: ${MOLTLER_PASSWORD}
production:
host: es-prod.company.com:9200
api_key: ${MOLTLER_API_KEY}
Examples¶
Health Check Dashboard¶
from moltler import Moltler
client = Moltler()
def get_system_health():
# Get health for all services
services = ["api", "web", "worker", "db"]
results = {}
for service in services:
try:
health = client.skills.call("check_health", service=service)
results[service] = health
except Exception as e:
results[service] = {"status": "error", "error": str(e)}
return results
health = get_system_health()
for service, status in health.items():
emoji = "✅" if status.get("status") == "healthy" else "❌"
print(f"{emoji} {service}: {status.get('status')}")
Incident Response Bot¶
from moltler import Moltler
import slack_sdk
client = Moltler()
slack = slack_sdk.WebClient(token=os.environ["SLACK_TOKEN"])
def handle_incident(alert):
# Trigger incident responder agent
execution = client.agents.trigger("incident_responder", {
"alert": alert,
"auto_approve_low_risk": True
})
# Post to Slack
slack.chat_postMessage(
channel="#incidents",
text=f"🚨 Incident detected: {alert['title']}\n"
f"Agent execution started: {execution.id}"
)
# Wait for completion
while execution.status == "running":
time.sleep(5)
execution = client.agents.get_execution(execution.id)
# Report outcome
slack.chat_postMessage(
channel="#incidents",
text=f"Incident response complete: {execution.status}\n"
f"Actions taken: {len(execution.steps)}"
)
# Listen for alerts
while True:
alerts = client.connectors.query(
"datadog.monitors",
where={"status": "Alert"}
)
for alert in alerts:
if not already_handled(alert):
handle_incident(alert)
time.sleep(60)
What's Next?¶
-
VSCode Extension
IDE integration.
-
Jupyter Integration
Use Moltler in notebooks.