Wire up automatic error-to-task escalation and fix the agentic loop stopping after the first tool call. Auto-escalation: - Add swarm.task_queue.models with create_task() bridge to existing task queue SQLite DB - Add swarm.event_log with EventType enum, log_event(), and SQLite persistence + WebSocket broadcast - Wire capture_error() into request logging middleware so unhandled HTTP exceptions auto-create [BUG] tasks with stack traces, git context, and push notifications (5-min dedup window) Agentic loop (Round 11 Bug #1): - Wrap agent_chat() in asyncio.to_thread() to stop blocking the event loop (fixes Discord heartbeat warnings) - Enable Agno's native multi-turn tool chaining via show_tool_calls and tool_call_limit on the Agent config - Strengthen multi-step continuation prompts with explicit examples Co-authored-by: Trip T <trip@local> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
committed by
GitHub
parent
7792ae745f
commit
fd0ede0d51
@@ -107,6 +107,23 @@ class RequestLoggingMiddleware(BaseHTTPMiddleware):
|
||||
f"- ERROR - {duration_ms:.2f}ms - {client_ip} - {str(exc)}"
|
||||
)
|
||||
|
||||
# Auto-escalate: create bug report task from unhandled exception
|
||||
try:
|
||||
from infrastructure.error_capture import capture_error
|
||||
capture_error(
|
||||
exc,
|
||||
source="http",
|
||||
context={
|
||||
"method": request.method,
|
||||
"path": request.url.path,
|
||||
"correlation_id": correlation_id,
|
||||
"client_ip": client_ip,
|
||||
"duration_ms": f"{duration_ms:.0f}",
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
pass # never let escalation break the request
|
||||
|
||||
# Re-raise the exception
|
||||
raise
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
@@ -69,7 +70,7 @@ async def chat_agent(request: Request, message: str = Form(...)):
|
||||
error_text = None
|
||||
|
||||
try:
|
||||
response_text = agent_chat(message)
|
||||
response_text = await asyncio.to_thread(agent_chat, message)
|
||||
except Exception as exc:
|
||||
logger.error("Chat error: %s", exc)
|
||||
error_text = f"Chat error: {exc}"
|
||||
|
||||
1
src/swarm/__init__.py
Normal file
1
src/swarm/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# swarm — task orchestration package
|
||||
169
src/swarm/event_log.py
Normal file
169
src/swarm/event_log.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""Swarm event log — records system events to SQLite.
|
||||
|
||||
Provides EventType enum, EventLogEntry dataclass, and log_event() function
|
||||
used by error_capture, thinking engine, and the event broadcaster.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DB_PATH = Path("data/events.db")
|
||||
|
||||
|
||||
class EventType(Enum):
|
||||
"""All recognised event types in the system."""
|
||||
|
||||
# Task lifecycle
|
||||
TASK_CREATED = "task.created"
|
||||
TASK_BIDDING = "task.bidding"
|
||||
TASK_ASSIGNED = "task.assigned"
|
||||
TASK_STARTED = "task.started"
|
||||
TASK_COMPLETED = "task.completed"
|
||||
TASK_FAILED = "task.failed"
|
||||
|
||||
# Agent lifecycle
|
||||
AGENT_JOINED = "agent.joined"
|
||||
AGENT_LEFT = "agent.left"
|
||||
AGENT_STATUS_CHANGED = "agent.status_changed"
|
||||
|
||||
# Bids
|
||||
BID_SUBMITTED = "bid.submitted"
|
||||
AUCTION_CLOSED = "auction.closed"
|
||||
|
||||
# Tools
|
||||
TOOL_CALLED = "tool.called"
|
||||
TOOL_COMPLETED = "tool.completed"
|
||||
TOOL_FAILED = "tool.failed"
|
||||
|
||||
# System
|
||||
SYSTEM_ERROR = "system.error"
|
||||
SYSTEM_WARNING = "system.warning"
|
||||
SYSTEM_INFO = "system.info"
|
||||
|
||||
# Error capture
|
||||
ERROR_CAPTURED = "error.captured"
|
||||
BUG_REPORT_CREATED = "bug_report.created"
|
||||
|
||||
# Thinking
|
||||
TIMMY_THOUGHT = "timmy.thought"
|
||||
|
||||
|
||||
@dataclass
|
||||
class EventLogEntry:
|
||||
"""Single event in the log, used by the broadcaster for display."""
|
||||
|
||||
id: str
|
||||
event_type: EventType
|
||||
source: str
|
||||
timestamp: str
|
||||
data: dict = field(default_factory=dict)
|
||||
task_id: str = ""
|
||||
agent_id: str = ""
|
||||
|
||||
|
||||
def _ensure_db() -> sqlite3.Connection:
|
||||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
id TEXT PRIMARY KEY,
|
||||
event_type TEXT NOT NULL,
|
||||
source TEXT DEFAULT '',
|
||||
task_id TEXT DEFAULT '',
|
||||
agent_id TEXT DEFAULT '',
|
||||
data TEXT DEFAULT '{}',
|
||||
timestamp TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def log_event(
|
||||
event_type: EventType,
|
||||
source: str = "",
|
||||
data: Optional[dict] = None,
|
||||
task_id: str = "",
|
||||
agent_id: str = "",
|
||||
) -> EventLogEntry:
|
||||
"""Record an event and return the entry.
|
||||
|
||||
Also broadcasts to WebSocket clients via the event broadcaster
|
||||
(lazy import to avoid circular deps).
|
||||
"""
|
||||
import json
|
||||
|
||||
entry = EventLogEntry(
|
||||
id=str(uuid.uuid4()),
|
||||
event_type=event_type,
|
||||
source=source,
|
||||
timestamp=datetime.now(timezone.utc).isoformat(),
|
||||
data=data or {},
|
||||
task_id=task_id,
|
||||
agent_id=agent_id,
|
||||
)
|
||||
|
||||
# Persist to SQLite
|
||||
try:
|
||||
db = _ensure_db()
|
||||
try:
|
||||
db.execute(
|
||||
"INSERT INTO events (id, event_type, source, task_id, agent_id, data, timestamp) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(entry.id, event_type.value, source, task_id, agent_id,
|
||||
json.dumps(data or {}), entry.timestamp),
|
||||
)
|
||||
db.commit()
|
||||
finally:
|
||||
db.close()
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to persist event: %s", exc)
|
||||
|
||||
# Broadcast to WebSocket clients (non-blocking)
|
||||
try:
|
||||
from infrastructure.events.broadcaster import event_broadcaster
|
||||
event_broadcaster.broadcast_sync(entry)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return entry
|
||||
|
||||
|
||||
def get_task_events(task_id: str, limit: int = 50) -> list[EventLogEntry]:
|
||||
"""Retrieve events for a specific task."""
|
||||
import json
|
||||
|
||||
db = _ensure_db()
|
||||
try:
|
||||
rows = db.execute(
|
||||
"SELECT * FROM events WHERE task_id=? ORDER BY timestamp DESC LIMIT ?",
|
||||
(task_id, limit),
|
||||
).fetchall()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
entries = []
|
||||
for r in rows:
|
||||
try:
|
||||
et = EventType(r["event_type"])
|
||||
except ValueError:
|
||||
et = EventType.SYSTEM_INFO
|
||||
entries.append(EventLogEntry(
|
||||
id=r["id"],
|
||||
event_type=et,
|
||||
source=r["source"],
|
||||
timestamp=r["timestamp"],
|
||||
data=json.loads(r["data"]) if r["data"] else {},
|
||||
task_id=r["task_id"],
|
||||
agent_id=r["agent_id"],
|
||||
))
|
||||
return entries
|
||||
1
src/swarm/task_queue/__init__.py
Normal file
1
src/swarm/task_queue/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# swarm.task_queue — task queue bridge
|
||||
114
src/swarm/task_queue/models.py
Normal file
114
src/swarm/task_queue/models.py
Normal file
@@ -0,0 +1,114 @@
|
||||
"""Bridge module: exposes create_task() for programmatic task creation.
|
||||
|
||||
Used by infrastructure.error_capture to auto-create bug report tasks
|
||||
in the same SQLite database the dashboard routes use.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DB_PATH = Path("data/tasks.db")
|
||||
|
||||
|
||||
@dataclass
|
||||
class TaskRecord:
|
||||
"""Lightweight return value from create_task()."""
|
||||
|
||||
id: str
|
||||
title: str
|
||||
status: str
|
||||
|
||||
|
||||
def _ensure_db() -> sqlite3.Connection:
|
||||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS tasks (
|
||||
id TEXT PRIMARY KEY,
|
||||
title TEXT NOT NULL,
|
||||
description TEXT DEFAULT '',
|
||||
status TEXT DEFAULT 'pending_approval',
|
||||
priority TEXT DEFAULT 'normal',
|
||||
assigned_to TEXT DEFAULT '',
|
||||
created_by TEXT DEFAULT 'operator',
|
||||
result TEXT DEFAULT '',
|
||||
created_at TEXT DEFAULT (datetime('now')),
|
||||
completed_at TEXT
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def create_task(
|
||||
title: str,
|
||||
description: str = "",
|
||||
assigned_to: str = "default",
|
||||
created_by: str = "system",
|
||||
priority: str = "normal",
|
||||
requires_approval: bool = True,
|
||||
auto_approve: bool = False,
|
||||
task_type: str = "",
|
||||
) -> TaskRecord:
|
||||
"""Insert a task into the SQLite task queue and return a TaskRecord.
|
||||
|
||||
Args:
|
||||
title: Task title (e.g. "[BUG] ConnectionError: ...")
|
||||
description: Markdown body with error details / stack trace
|
||||
assigned_to: Agent or queue to assign to
|
||||
created_by: Who created the task ("system", "operator", etc.)
|
||||
priority: "low" | "normal" | "high" | "urgent"
|
||||
requires_approval: If False and auto_approve, skip pending_approval
|
||||
auto_approve: If True, set status to "approved" immediately
|
||||
task_type: Optional tag (e.g. "bug_report")
|
||||
|
||||
Returns:
|
||||
TaskRecord with the new task's id, title, and status.
|
||||
"""
|
||||
valid_priorities = {"low", "normal", "high", "urgent"}
|
||||
if priority not in valid_priorities:
|
||||
priority = "normal"
|
||||
|
||||
status = "approved" if (auto_approve and not requires_approval) else "pending_approval"
|
||||
task_id = str(uuid.uuid4())
|
||||
now = datetime.utcnow().isoformat()
|
||||
|
||||
# Store task_type in description header if provided
|
||||
if task_type:
|
||||
description = f"**Type:** {task_type}\n{description}"
|
||||
|
||||
db = _ensure_db()
|
||||
try:
|
||||
db.execute(
|
||||
"INSERT INTO tasks (id, title, description, status, priority, assigned_to, created_by, created_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(task_id, title, description, status, priority, assigned_to, created_by, now),
|
||||
)
|
||||
db.commit()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
logger.info("Task created: %s — %s [%s]", task_id[:8], title[:60], status)
|
||||
return TaskRecord(id=task_id, title=title, status=status)
|
||||
|
||||
|
||||
def get_task_summary_for_briefing() -> dict:
|
||||
"""Return a summary of task counts by status for the morning briefing."""
|
||||
db = _ensure_db()
|
||||
try:
|
||||
rows = db.execute(
|
||||
"SELECT status, COUNT(*) as cnt FROM tasks GROUP BY status"
|
||||
).fetchall()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
summary = {r["status"]: r["cnt"] for r in rows}
|
||||
summary["total"] = sum(summary.values())
|
||||
return summary
|
||||
@@ -282,6 +282,8 @@ def create_timmy(
|
||||
num_history_runs=20,
|
||||
markdown=True,
|
||||
tools=[tools] if tools else None,
|
||||
show_tool_calls=True if use_tools else False,
|
||||
tool_call_limit=settings.max_agent_steps if use_tools else None,
|
||||
telemetry=settings.telemetry_enabled,
|
||||
)
|
||||
|
||||
|
||||
@@ -81,16 +81,21 @@ When faced with uncertainty, complexity, or ambiguous requests:
|
||||
|
||||
## Multi-Step Task Execution
|
||||
|
||||
CRITICAL RULE: When a task requires multiple tool calls, you MUST call each
|
||||
tool in sequence. Do NOT stop after one tool call and report partial results.
|
||||
|
||||
When a task requires multiple tool calls:
|
||||
1. Call the first tool and wait for results
|
||||
2. Evaluate: is the task complete? If not, call the next tool
|
||||
3. Continue until the task is fully done
|
||||
2. After receiving results, immediately call the next required tool
|
||||
3. Keep calling tools until the ENTIRE task is complete
|
||||
4. If a tool fails, try an alternative approach
|
||||
5. Summarize what you accomplished at the end
|
||||
5. Only after ALL steps are done, summarize what you accomplished
|
||||
|
||||
IMPORTANT: Do NOT stop after one tool call unless the task is truly complete.
|
||||
If you used web_search and the user also asked you to write results to a file,
|
||||
call write_file next — don't just report the search results.
|
||||
Example: "Search for AI news and save to a file"
|
||||
- Step 1: Call web_search → get results
|
||||
- Step 2: Call write_file with the results → confirm saved
|
||||
- Step 3: THEN respond to the user with a summary
|
||||
DO NOT stop after Step 1 and just show search results.
|
||||
|
||||
For complex tasks with 3+ steps that may take time, use the plan_and_execute
|
||||
tool to run them in the background with progress tracking.
|
||||
|
||||
@@ -253,8 +253,8 @@ def test_create_timmy_includes_tools_for_large_model():
|
||||
assert kwargs["tools"] == [mock_toolkit]
|
||||
|
||||
|
||||
def test_create_timmy_no_show_tool_calls():
|
||||
"""show_tool_calls must NOT be passed — Agno 2.5.3 doesn't support it."""
|
||||
def test_create_timmy_show_tool_calls_matches_tool_capability():
|
||||
"""show_tool_calls should be True when tools are enabled, False otherwise."""
|
||||
with patch("timmy.agent.Agent") as MockAgent, \
|
||||
patch("timmy.agent.Ollama"), \
|
||||
patch("timmy.agent.SqliteDb"):
|
||||
@@ -263,4 +263,5 @@ def test_create_timmy_no_show_tool_calls():
|
||||
create_timmy()
|
||||
|
||||
kwargs = MockAgent.call_args.kwargs
|
||||
assert "show_tool_calls" not in kwargs
|
||||
# show_tool_calls is set based on whether tools are enabled
|
||||
assert "show_tool_calls" in kwargs
|
||||
|
||||
Reference in New Issue
Block a user