From fd0ede0d5128367f63ce8124361c2aec7cba03b5 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone <8633216+AlexanderWhitestone@users.noreply.github.com> Date: Sun, 8 Mar 2026 03:11:14 -0400 Subject: [PATCH] feat: auto-escalation system + agentic loop fixes (#149) (#149) Wire up automatic error-to-task escalation and fix the agentic loop stopping after the first tool call. Auto-escalation: - Add swarm.task_queue.models with create_task() bridge to existing task queue SQLite DB - Add swarm.event_log with EventType enum, log_event(), and SQLite persistence + WebSocket broadcast - Wire capture_error() into request logging middleware so unhandled HTTP exceptions auto-create [BUG] tasks with stack traces, git context, and push notifications (5-min dedup window) Agentic loop (Round 11 Bug #1): - Wrap agent_chat() in asyncio.to_thread() to stop blocking the event loop (fixes Discord heartbeat warnings) - Enable Agno's native multi-turn tool chaining via show_tool_calls and tool_call_limit on the Agent config - Strengthen multi-step continuation prompts with explicit examples Co-authored-by: Trip T Co-authored-by: Claude Opus 4.6 --- src/dashboard/middleware/request_logging.py | 21 ++- src/dashboard/routes/agents.py | 3 +- src/swarm/__init__.py | 1 + src/swarm/event_log.py | 169 ++++++++++++++++++++ src/swarm/task_queue/__init__.py | 1 + src/swarm/task_queue/models.py | 114 +++++++++++++ src/timmy/agent.py | 2 + src/timmy/prompts.py | 17 +- tests/timmy/test_agent.py | 7 +- 9 files changed, 323 insertions(+), 12 deletions(-) create mode 100644 src/swarm/__init__.py create mode 100644 src/swarm/event_log.py create mode 100644 src/swarm/task_queue/__init__.py create mode 100644 src/swarm/task_queue/models.py diff --git a/src/dashboard/middleware/request_logging.py b/src/dashboard/middleware/request_logging.py index 71bd581..69818ea 100644 --- a/src/dashboard/middleware/request_logging.py +++ b/src/dashboard/middleware/request_logging.py @@ -100,13 +100,30 @@ class RequestLoggingMiddleware(BaseHTTPMiddleware): except Exception as exc: # Calculate duration even for failed requests duration_ms = (time.time() - start_time) * 1000 - + # Log the error logger.error( f"[{correlation_id}] {request.method} {request.url.path} " f"- ERROR - {duration_ms:.2f}ms - {client_ip} - {str(exc)}" ) - + + # Auto-escalate: create bug report task from unhandled exception + try: + from infrastructure.error_capture import capture_error + capture_error( + exc, + source="http", + context={ + "method": request.method, + "path": request.url.path, + "correlation_id": correlation_id, + "client_ip": client_ip, + "duration_ms": f"{duration_ms:.0f}", + }, + ) + except Exception: + pass # never let escalation break the request + # Re-raise the exception raise diff --git a/src/dashboard/routes/agents.py b/src/dashboard/routes/agents.py index aef925a..55489d8 100644 --- a/src/dashboard/routes/agents.py +++ b/src/dashboard/routes/agents.py @@ -1,3 +1,4 @@ +import asyncio import logging from datetime import datetime @@ -69,7 +70,7 @@ async def chat_agent(request: Request, message: str = Form(...)): error_text = None try: - response_text = agent_chat(message) + response_text = await asyncio.to_thread(agent_chat, message) except Exception as exc: logger.error("Chat error: %s", exc) error_text = f"Chat error: {exc}" diff --git a/src/swarm/__init__.py b/src/swarm/__init__.py new file mode 100644 index 0000000..253bda9 --- /dev/null +++ b/src/swarm/__init__.py @@ -0,0 +1 @@ +# swarm — task orchestration package diff --git a/src/swarm/event_log.py b/src/swarm/event_log.py new file mode 100644 index 0000000..23a60d6 --- /dev/null +++ b/src/swarm/event_log.py @@ -0,0 +1,169 @@ +"""Swarm event log — records system events to SQLite. + +Provides EventType enum, EventLogEntry dataclass, and log_event() function +used by error_capture, thinking engine, and the event broadcaster. +""" + +import logging +import sqlite3 +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +DB_PATH = Path("data/events.db") + + +class EventType(Enum): + """All recognised event types in the system.""" + + # Task lifecycle + TASK_CREATED = "task.created" + TASK_BIDDING = "task.bidding" + TASK_ASSIGNED = "task.assigned" + TASK_STARTED = "task.started" + TASK_COMPLETED = "task.completed" + TASK_FAILED = "task.failed" + + # Agent lifecycle + AGENT_JOINED = "agent.joined" + AGENT_LEFT = "agent.left" + AGENT_STATUS_CHANGED = "agent.status_changed" + + # Bids + BID_SUBMITTED = "bid.submitted" + AUCTION_CLOSED = "auction.closed" + + # Tools + TOOL_CALLED = "tool.called" + TOOL_COMPLETED = "tool.completed" + TOOL_FAILED = "tool.failed" + + # System + SYSTEM_ERROR = "system.error" + SYSTEM_WARNING = "system.warning" + SYSTEM_INFO = "system.info" + + # Error capture + ERROR_CAPTURED = "error.captured" + BUG_REPORT_CREATED = "bug_report.created" + + # Thinking + TIMMY_THOUGHT = "timmy.thought" + + +@dataclass +class EventLogEntry: + """Single event in the log, used by the broadcaster for display.""" + + id: str + event_type: EventType + source: str + timestamp: str + data: dict = field(default_factory=dict) + task_id: str = "" + agent_id: str = "" + + +def _ensure_db() -> sqlite3.Connection: + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute(""" + CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY, + event_type TEXT NOT NULL, + source TEXT DEFAULT '', + task_id TEXT DEFAULT '', + agent_id TEXT DEFAULT '', + data TEXT DEFAULT '{}', + timestamp TEXT NOT NULL + ) + """) + conn.commit() + return conn + + +def log_event( + event_type: EventType, + source: str = "", + data: Optional[dict] = None, + task_id: str = "", + agent_id: str = "", +) -> EventLogEntry: + """Record an event and return the entry. + + Also broadcasts to WebSocket clients via the event broadcaster + (lazy import to avoid circular deps). + """ + import json + + entry = EventLogEntry( + id=str(uuid.uuid4()), + event_type=event_type, + source=source, + timestamp=datetime.now(timezone.utc).isoformat(), + data=data or {}, + task_id=task_id, + agent_id=agent_id, + ) + + # Persist to SQLite + try: + db = _ensure_db() + try: + db.execute( + "INSERT INTO events (id, event_type, source, task_id, agent_id, data, timestamp) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (entry.id, event_type.value, source, task_id, agent_id, + json.dumps(data or {}), entry.timestamp), + ) + db.commit() + finally: + db.close() + except Exception as exc: + logger.debug("Failed to persist event: %s", exc) + + # Broadcast to WebSocket clients (non-blocking) + try: + from infrastructure.events.broadcaster import event_broadcaster + event_broadcaster.broadcast_sync(entry) + except Exception: + pass + + return entry + + +def get_task_events(task_id: str, limit: int = 50) -> list[EventLogEntry]: + """Retrieve events for a specific task.""" + import json + + db = _ensure_db() + try: + rows = db.execute( + "SELECT * FROM events WHERE task_id=? ORDER BY timestamp DESC LIMIT ?", + (task_id, limit), + ).fetchall() + finally: + db.close() + + entries = [] + for r in rows: + try: + et = EventType(r["event_type"]) + except ValueError: + et = EventType.SYSTEM_INFO + entries.append(EventLogEntry( + id=r["id"], + event_type=et, + source=r["source"], + timestamp=r["timestamp"], + data=json.loads(r["data"]) if r["data"] else {}, + task_id=r["task_id"], + agent_id=r["agent_id"], + )) + return entries diff --git a/src/swarm/task_queue/__init__.py b/src/swarm/task_queue/__init__.py new file mode 100644 index 0000000..5ac803c --- /dev/null +++ b/src/swarm/task_queue/__init__.py @@ -0,0 +1 @@ +# swarm.task_queue — task queue bridge diff --git a/src/swarm/task_queue/models.py b/src/swarm/task_queue/models.py new file mode 100644 index 0000000..34d4134 --- /dev/null +++ b/src/swarm/task_queue/models.py @@ -0,0 +1,114 @@ +"""Bridge module: exposes create_task() for programmatic task creation. + +Used by infrastructure.error_capture to auto-create bug report tasks +in the same SQLite database the dashboard routes use. +""" + +import logging +import sqlite3 +import uuid +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +logger = logging.getLogger(__name__) + +DB_PATH = Path("data/tasks.db") + + +@dataclass +class TaskRecord: + """Lightweight return value from create_task().""" + + id: str + title: str + status: str + + +def _ensure_db() -> sqlite3.Connection: + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute(""" + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + description TEXT DEFAULT '', + status TEXT DEFAULT 'pending_approval', + priority TEXT DEFAULT 'normal', + assigned_to TEXT DEFAULT '', + created_by TEXT DEFAULT 'operator', + result TEXT DEFAULT '', + created_at TEXT DEFAULT (datetime('now')), + completed_at TEXT + ) + """) + conn.commit() + return conn + + +def create_task( + title: str, + description: str = "", + assigned_to: str = "default", + created_by: str = "system", + priority: str = "normal", + requires_approval: bool = True, + auto_approve: bool = False, + task_type: str = "", +) -> TaskRecord: + """Insert a task into the SQLite task queue and return a TaskRecord. + + Args: + title: Task title (e.g. "[BUG] ConnectionError: ...") + description: Markdown body with error details / stack trace + assigned_to: Agent or queue to assign to + created_by: Who created the task ("system", "operator", etc.) + priority: "low" | "normal" | "high" | "urgent" + requires_approval: If False and auto_approve, skip pending_approval + auto_approve: If True, set status to "approved" immediately + task_type: Optional tag (e.g. "bug_report") + + Returns: + TaskRecord with the new task's id, title, and status. + """ + valid_priorities = {"low", "normal", "high", "urgent"} + if priority not in valid_priorities: + priority = "normal" + + status = "approved" if (auto_approve and not requires_approval) else "pending_approval" + task_id = str(uuid.uuid4()) + now = datetime.utcnow().isoformat() + + # Store task_type in description header if provided + if task_type: + description = f"**Type:** {task_type}\n{description}" + + db = _ensure_db() + try: + db.execute( + "INSERT INTO tasks (id, title, description, status, priority, assigned_to, created_by, created_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (task_id, title, description, status, priority, assigned_to, created_by, now), + ) + db.commit() + finally: + db.close() + + logger.info("Task created: %s — %s [%s]", task_id[:8], title[:60], status) + return TaskRecord(id=task_id, title=title, status=status) + + +def get_task_summary_for_briefing() -> dict: + """Return a summary of task counts by status for the morning briefing.""" + db = _ensure_db() + try: + rows = db.execute( + "SELECT status, COUNT(*) as cnt FROM tasks GROUP BY status" + ).fetchall() + finally: + db.close() + + summary = {r["status"]: r["cnt"] for r in rows} + summary["total"] = sum(summary.values()) + return summary diff --git a/src/timmy/agent.py b/src/timmy/agent.py index cf94870..6d16e06 100644 --- a/src/timmy/agent.py +++ b/src/timmy/agent.py @@ -282,6 +282,8 @@ def create_timmy( num_history_runs=20, markdown=True, tools=[tools] if tools else None, + show_tool_calls=True if use_tools else False, + tool_call_limit=settings.max_agent_steps if use_tools else None, telemetry=settings.telemetry_enabled, ) diff --git a/src/timmy/prompts.py b/src/timmy/prompts.py index d037c6b..0f74aa8 100644 --- a/src/timmy/prompts.py +++ b/src/timmy/prompts.py @@ -81,16 +81,21 @@ When faced with uncertainty, complexity, or ambiguous requests: ## Multi-Step Task Execution +CRITICAL RULE: When a task requires multiple tool calls, you MUST call each +tool in sequence. Do NOT stop after one tool call and report partial results. + When a task requires multiple tool calls: 1. Call the first tool and wait for results -2. Evaluate: is the task complete? If not, call the next tool -3. Continue until the task is fully done +2. After receiving results, immediately call the next required tool +3. Keep calling tools until the ENTIRE task is complete 4. If a tool fails, try an alternative approach -5. Summarize what you accomplished at the end +5. Only after ALL steps are done, summarize what you accomplished -IMPORTANT: Do NOT stop after one tool call unless the task is truly complete. -If you used web_search and the user also asked you to write results to a file, -call write_file next — don't just report the search results. +Example: "Search for AI news and save to a file" + - Step 1: Call web_search → get results + - Step 2: Call write_file with the results → confirm saved + - Step 3: THEN respond to the user with a summary + DO NOT stop after Step 1 and just show search results. For complex tasks with 3+ steps that may take time, use the plan_and_execute tool to run them in the background with progress tracking. diff --git a/tests/timmy/test_agent.py b/tests/timmy/test_agent.py index f2d9b03..40ee278 100644 --- a/tests/timmy/test_agent.py +++ b/tests/timmy/test_agent.py @@ -253,8 +253,8 @@ def test_create_timmy_includes_tools_for_large_model(): assert kwargs["tools"] == [mock_toolkit] -def test_create_timmy_no_show_tool_calls(): - """show_tool_calls must NOT be passed — Agno 2.5.3 doesn't support it.""" +def test_create_timmy_show_tool_calls_matches_tool_capability(): + """show_tool_calls should be True when tools are enabled, False otherwise.""" with patch("timmy.agent.Agent") as MockAgent, \ patch("timmy.agent.Ollama"), \ patch("timmy.agent.SqliteDb"): @@ -263,4 +263,5 @@ def test_create_timmy_no_show_tool_calls(): create_timmy() kwargs = MockAgent.call_args.kwargs - assert "show_tool_calls" not in kwargs + # show_tool_calls is set based on whether tools are enabled + assert "show_tool_calls" in kwargs