feat: auto-escalation system + agentic loop fixes (#149) (#149)

Wire up automatic error-to-task escalation and fix the agentic loop
stopping after the first tool call.

Auto-escalation:
- Add swarm.task_queue.models with create_task() bridge to existing
  task queue SQLite DB
- Add swarm.event_log with EventType enum, log_event(), and SQLite
  persistence + WebSocket broadcast
- Wire capture_error() into request logging middleware so unhandled
  HTTP exceptions auto-create [BUG] tasks with stack traces, git
  context, and push notifications (5-min dedup window)

Agentic loop (Round 11 Bug #1):
- Wrap agent_chat() in asyncio.to_thread() to stop blocking the
  event loop (fixes Discord heartbeat warnings)
- Enable Agno's native multi-turn tool chaining via show_tool_calls
  and tool_call_limit on the Agent config
- Strengthen multi-step continuation prompts with explicit examples

Co-authored-by: Trip T <trip@local>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-03-08 03:11:14 -04:00
committed by GitHub
parent 7792ae745f
commit fd0ede0d51
9 changed files with 323 additions and 12 deletions

View File

@@ -107,6 +107,23 @@ class RequestLoggingMiddleware(BaseHTTPMiddleware):
f"- ERROR - {duration_ms:.2f}ms - {client_ip} - {str(exc)}"
)
# Auto-escalate: create bug report task from unhandled exception
try:
from infrastructure.error_capture import capture_error
capture_error(
exc,
source="http",
context={
"method": request.method,
"path": request.url.path,
"correlation_id": correlation_id,
"client_ip": client_ip,
"duration_ms": f"{duration_ms:.0f}",
},
)
except Exception:
pass # never let escalation break the request
# Re-raise the exception
raise

View File

@@ -1,3 +1,4 @@
import asyncio
import logging
from datetime import datetime
@@ -69,7 +70,7 @@ async def chat_agent(request: Request, message: str = Form(...)):
error_text = None
try:
response_text = agent_chat(message)
response_text = await asyncio.to_thread(agent_chat, message)
except Exception as exc:
logger.error("Chat error: %s", exc)
error_text = f"Chat error: {exc}"

1
src/swarm/__init__.py Normal file
View File

@@ -0,0 +1 @@
# swarm — task orchestration package

169
src/swarm/event_log.py Normal file
View File

@@ -0,0 +1,169 @@
"""Swarm event log — records system events to SQLite.
Provides EventType enum, EventLogEntry dataclass, and log_event() function
used by error_capture, thinking engine, and the event broadcaster.
"""
import logging
import sqlite3
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
DB_PATH = Path("data/events.db")
class EventType(Enum):
"""All recognised event types in the system."""
# Task lifecycle
TASK_CREATED = "task.created"
TASK_BIDDING = "task.bidding"
TASK_ASSIGNED = "task.assigned"
TASK_STARTED = "task.started"
TASK_COMPLETED = "task.completed"
TASK_FAILED = "task.failed"
# Agent lifecycle
AGENT_JOINED = "agent.joined"
AGENT_LEFT = "agent.left"
AGENT_STATUS_CHANGED = "agent.status_changed"
# Bids
BID_SUBMITTED = "bid.submitted"
AUCTION_CLOSED = "auction.closed"
# Tools
TOOL_CALLED = "tool.called"
TOOL_COMPLETED = "tool.completed"
TOOL_FAILED = "tool.failed"
# System
SYSTEM_ERROR = "system.error"
SYSTEM_WARNING = "system.warning"
SYSTEM_INFO = "system.info"
# Error capture
ERROR_CAPTURED = "error.captured"
BUG_REPORT_CREATED = "bug_report.created"
# Thinking
TIMMY_THOUGHT = "timmy.thought"
@dataclass
class EventLogEntry:
"""Single event in the log, used by the broadcaster for display."""
id: str
event_type: EventType
source: str
timestamp: str
data: dict = field(default_factory=dict)
task_id: str = ""
agent_id: str = ""
def _ensure_db() -> sqlite3.Connection:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
event_type TEXT NOT NULL,
source TEXT DEFAULT '',
task_id TEXT DEFAULT '',
agent_id TEXT DEFAULT '',
data TEXT DEFAULT '{}',
timestamp TEXT NOT NULL
)
""")
conn.commit()
return conn
def log_event(
event_type: EventType,
source: str = "",
data: Optional[dict] = None,
task_id: str = "",
agent_id: str = "",
) -> EventLogEntry:
"""Record an event and return the entry.
Also broadcasts to WebSocket clients via the event broadcaster
(lazy import to avoid circular deps).
"""
import json
entry = EventLogEntry(
id=str(uuid.uuid4()),
event_type=event_type,
source=source,
timestamp=datetime.now(timezone.utc).isoformat(),
data=data or {},
task_id=task_id,
agent_id=agent_id,
)
# Persist to SQLite
try:
db = _ensure_db()
try:
db.execute(
"INSERT INTO events (id, event_type, source, task_id, agent_id, data, timestamp) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(entry.id, event_type.value, source, task_id, agent_id,
json.dumps(data or {}), entry.timestamp),
)
db.commit()
finally:
db.close()
except Exception as exc:
logger.debug("Failed to persist event: %s", exc)
# Broadcast to WebSocket clients (non-blocking)
try:
from infrastructure.events.broadcaster import event_broadcaster
event_broadcaster.broadcast_sync(entry)
except Exception:
pass
return entry
def get_task_events(task_id: str, limit: int = 50) -> list[EventLogEntry]:
"""Retrieve events for a specific task."""
import json
db = _ensure_db()
try:
rows = db.execute(
"SELECT * FROM events WHERE task_id=? ORDER BY timestamp DESC LIMIT ?",
(task_id, limit),
).fetchall()
finally:
db.close()
entries = []
for r in rows:
try:
et = EventType(r["event_type"])
except ValueError:
et = EventType.SYSTEM_INFO
entries.append(EventLogEntry(
id=r["id"],
event_type=et,
source=r["source"],
timestamp=r["timestamp"],
data=json.loads(r["data"]) if r["data"] else {},
task_id=r["task_id"],
agent_id=r["agent_id"],
))
return entries

View File

@@ -0,0 +1 @@
# swarm.task_queue — task queue bridge

View File

@@ -0,0 +1,114 @@
"""Bridge module: exposes create_task() for programmatic task creation.
Used by infrastructure.error_capture to auto-create bug report tasks
in the same SQLite database the dashboard routes use.
"""
import logging
import sqlite3
import uuid
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
logger = logging.getLogger(__name__)
DB_PATH = Path("data/tasks.db")
@dataclass
class TaskRecord:
"""Lightweight return value from create_task()."""
id: str
title: str
status: str
def _ensure_db() -> sqlite3.Connection:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
status TEXT DEFAULT 'pending_approval',
priority TEXT DEFAULT 'normal',
assigned_to TEXT DEFAULT '',
created_by TEXT DEFAULT 'operator',
result TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT
)
""")
conn.commit()
return conn
def create_task(
title: str,
description: str = "",
assigned_to: str = "default",
created_by: str = "system",
priority: str = "normal",
requires_approval: bool = True,
auto_approve: bool = False,
task_type: str = "",
) -> TaskRecord:
"""Insert a task into the SQLite task queue and return a TaskRecord.
Args:
title: Task title (e.g. "[BUG] ConnectionError: ...")
description: Markdown body with error details / stack trace
assigned_to: Agent or queue to assign to
created_by: Who created the task ("system", "operator", etc.)
priority: "low" | "normal" | "high" | "urgent"
requires_approval: If False and auto_approve, skip pending_approval
auto_approve: If True, set status to "approved" immediately
task_type: Optional tag (e.g. "bug_report")
Returns:
TaskRecord with the new task's id, title, and status.
"""
valid_priorities = {"low", "normal", "high", "urgent"}
if priority not in valid_priorities:
priority = "normal"
status = "approved" if (auto_approve and not requires_approval) else "pending_approval"
task_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat()
# Store task_type in description header if provided
if task_type:
description = f"**Type:** {task_type}\n{description}"
db = _ensure_db()
try:
db.execute(
"INSERT INTO tasks (id, title, description, status, priority, assigned_to, created_by, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(task_id, title, description, status, priority, assigned_to, created_by, now),
)
db.commit()
finally:
db.close()
logger.info("Task created: %s%s [%s]", task_id[:8], title[:60], status)
return TaskRecord(id=task_id, title=title, status=status)
def get_task_summary_for_briefing() -> dict:
"""Return a summary of task counts by status for the morning briefing."""
db = _ensure_db()
try:
rows = db.execute(
"SELECT status, COUNT(*) as cnt FROM tasks GROUP BY status"
).fetchall()
finally:
db.close()
summary = {r["status"]: r["cnt"] for r in rows}
summary["total"] = sum(summary.values())
return summary

View File

@@ -282,6 +282,8 @@ def create_timmy(
num_history_runs=20,
markdown=True,
tools=[tools] if tools else None,
show_tool_calls=True if use_tools else False,
tool_call_limit=settings.max_agent_steps if use_tools else None,
telemetry=settings.telemetry_enabled,
)

View File

@@ -81,16 +81,21 @@ When faced with uncertainty, complexity, or ambiguous requests:
## Multi-Step Task Execution
CRITICAL RULE: When a task requires multiple tool calls, you MUST call each
tool in sequence. Do NOT stop after one tool call and report partial results.
When a task requires multiple tool calls:
1. Call the first tool and wait for results
2. Evaluate: is the task complete? If not, call the next tool
3. Continue until the task is fully done
2. After receiving results, immediately call the next required tool
3. Keep calling tools until the ENTIRE task is complete
4. If a tool fails, try an alternative approach
5. Summarize what you accomplished at the end
5. Only after ALL steps are done, summarize what you accomplished
IMPORTANT: Do NOT stop after one tool call unless the task is truly complete.
If you used web_search and the user also asked you to write results to a file,
call write_file next — don't just report the search results.
Example: "Search for AI news and save to a file"
- Step 1: Call web_search → get results
- Step 2: Call write_file with the results → confirm saved
- Step 3: THEN respond to the user with a summary
DO NOT stop after Step 1 and just show search results.
For complex tasks with 3+ steps that may take time, use the plan_and_execute
tool to run them in the background with progress tracking.

View File

@@ -253,8 +253,8 @@ def test_create_timmy_includes_tools_for_large_model():
assert kwargs["tools"] == [mock_toolkit]
def test_create_timmy_no_show_tool_calls():
"""show_tool_calls must NOT be passed — Agno 2.5.3 doesn't support it."""
def test_create_timmy_show_tool_calls_matches_tool_capability():
"""show_tool_calls should be True when tools are enabled, False otherwise."""
with patch("timmy.agent.Agent") as MockAgent, \
patch("timmy.agent.Ollama"), \
patch("timmy.agent.SqliteDb"):
@@ -263,4 +263,5 @@ def test_create_timmy_no_show_tool_calls():
create_timmy()
kwargs = MockAgent.call_args.kwargs
assert "show_tool_calls" not in kwargs
# show_tool_calls is set based on whether tools are enabled
assert "show_tool_calls" in kwargs