feat: Phase 1 autonomy upgrades — introspection, heartbeat, source tagging, Discord auto-detect (#101)

UC-01: Live System Introspection Tool
- Add get_task_queue_status(), get_agent_roster(), get_live_system_status()
  to timmy/tools_intro with graceful degradation
- Enhanced get_memory_status() with line counts, section headers, vault
  directory listing, semantic memory row count, self-coding journal stats
- Register system_status MCP tool (creative/tools/system_status.py)
- Add system_status to Timmy's tool list + Hard Rule #7

UC-02: Fix Offline Status Bug
- Add registry.heartbeat() calls in task_processor run_loop() and
  process_single_task() so health endpoint reflects actual agent status
- health.py now consults swarm registry instead of Ollama connectivity

UC-03: Message Source Tagging
- Add source field to Message dataclass (default "browser")
- Tag all message_log.append() calls: browser, api, system
- Include source in /api/chat/history response

UC-04: Discord Token Auto-Detection & Docker Fix
- Add _discord_token_watcher() background coroutine that polls every 30s
  for DISCORD_TOKEN in env vars, .env file, or state file
- Add --extras discord to all three Dockerfiles (main, dashboard, test)

All 26 Phase 1 tests pass in Docker (make test-docker).
Full suite: 1889 passed, 77 skipped, 0 failed.

Co-authored-by: Alexander Payne <apayne@MM.local>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-02-28 22:49:24 -05:00
committed by GitHub
parent 89cfe1be0d
commit 6eefcabc97
14 changed files with 712 additions and 32 deletions

View File

@@ -27,7 +27,7 @@ RUN pip install --no-cache-dir poetry poetry-plugin-export
COPY pyproject.toml poetry.lock ./
# Export pinned requirements and install with pip cache mount
RUN poetry export --extras swarm --extras telegram --without-hashes \
RUN poetry export --extras swarm --extras telegram --extras discord --without-hashes \
-f requirements.txt -o requirements.txt
RUN --mount=type=cache,target=/root/.cache/pip \

View File

@@ -24,7 +24,7 @@ RUN pip install --no-cache-dir poetry poetry-plugin-export
COPY pyproject.toml poetry.lock ./
# Export pinned requirements and install with pip
RUN poetry export --extras swarm --extras telegram --without-hashes \
RUN poetry export --extras swarm --extras telegram --extras discord --without-hashes \
-f requirements.txt -o requirements.txt
RUN --mount=type=cache,target=/root/.cache/pip \

View File

@@ -25,7 +25,7 @@ RUN pip install --no-cache-dir poetry poetry-plugin-export
COPY pyproject.toml poetry.lock ./
# Export ALL deps including dev/test extras
RUN poetry export --extras swarm --extras telegram --extras dev \
RUN poetry export --extras swarm --extras telegram --extras discord --extras dev \
--with dev --without-hashes \
-f requirements.txt -o requirements.txt

View File

@@ -0,0 +1,51 @@
"""System status introspection tool for Timmy.
MCP-compliant tool that gives Timmy live access to his own system state:
task queue, agent roster, memory tiers, uptime, and service health.
"""
import json
import logging
from mcp.registry import register_tool
from mcp.schemas.base import create_tool_schema, RETURN_STRING
logger = logging.getLogger(__name__)
SYSTEM_STATUS_SCHEMA = create_tool_schema(
name="system_status",
description=(
"Get live system status including task queue counts, agent roster, "
"memory tier health, uptime, and service connectivity. "
"Use this when asked about your status, what you're working on, "
"agent health, or system metrics. Never guess — always call this tool."
),
parameters={},
required=[],
returns=RETURN_STRING,
)
def system_status() -> str:
"""Return comprehensive live system status as formatted text.
Returns:
JSON-formatted string with system, task_queue, agents, memory sections.
"""
try:
from timmy.tools_intro import get_live_system_status
status = get_live_system_status()
return json.dumps(status, indent=2, default=str)
except Exception as exc:
logger.error("system_status tool failed: %s", exc)
return json.dumps({"error": str(exc)})
# Register with MCP
register_tool(
name="system_status",
schema=SYSTEM_STATUS_SCHEMA,
category="system",
)(system_status)

View File

@@ -243,7 +243,7 @@ async def _task_processor_loop() -> None:
try:
from dashboard.store import message_log
timestamp = now.strftime("%H:%M:%S")
message_log.append(role="agent", content=response, timestamp=timestamp)
message_log.append(role="agent", content=response, timestamp=timestamp, source="system")
except Exception as e:
logger.debug("Failed to log response to message_log: %s", e)
@@ -453,6 +453,50 @@ async def _start_chat_integrations_background() -> None:
else:
logger.debug("Discord: no token configured, skipping")
# If Discord isn't connected yet, start a watcher that polls for the
# token to appear in the environment or .env file.
if discord_bot.state.name != "CONNECTED":
asyncio.create_task(_discord_token_watcher())
async def _discord_token_watcher() -> None:
"""Poll for DISCORD_TOKEN appearing in env or .env and auto-start Discord bot."""
from integrations.chat_bridge.vendors.discord import discord_bot
while True:
await asyncio.sleep(30)
if discord_bot.state.name == "CONNECTED":
return # Already running — stop watching
# 1. Check live environment variable
token = os.environ.get("DISCORD_TOKEN", "")
# 2. Re-read .env file for hot-reload
if not token:
try:
from dotenv import dotenv_values
env_path = Path(settings.repo_root) / ".env"
if env_path.exists():
vals = dotenv_values(env_path)
token = vals.get("DISCORD_TOKEN", "")
except ImportError:
pass # python-dotenv not installed
# 3. Check state file (written by /discord/setup)
if not token:
token = discord_bot.load_token() or ""
if token:
try:
success = await discord_bot.start(token=token)
if success:
logger.info("Discord bot auto-started (token detected)")
return # Done — stop watching
except Exception as exc:
logger.warning("Discord auto-start failed: %s", exc)
@asynccontextmanager
async def lifespan(app: FastAPI):

View File

@@ -319,13 +319,13 @@ async def chat_timmy(request: Request, message: str = Form(...)):
# Log user message to history. For chat_response tasks the real agent
# reply is logged by the task processor when it completes, so we only
# log the queue acknowledgment for explicit task_request commands.
message_log.append(role="user", content=message, timestamp=timestamp)
message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
if task_info and response_text is not None:
# Explicit task queue command — the acknowledgment IS the response
message_log.append(role="agent", content=response_text, timestamp=timestamp)
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="browser")
elif error_text:
message_log.append(
role="error", content=error_text, timestamp=timestamp
role="error", content=error_text, timestamp=timestamp, source="browser"
)
return templates.TemplateResponse(

View File

@@ -84,16 +84,16 @@ async def api_chat(request: Request):
session_id="mobile",
)
message_log.append(role="user", content=last_user_msg, timestamp=timestamp)
message_log.append(role="agent", content=response_text, timestamp=timestamp)
message_log.append(role="user", content=last_user_msg, timestamp=timestamp, source="api")
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="api")
return {"reply": response_text, "timestamp": timestamp}
except Exception as exc:
error_msg = f"Timmy is offline: {exc}"
logger.error("api_chat error: %s", exc)
message_log.append(role="user", content=last_user_msg, timestamp=timestamp)
message_log.append(role="error", content=error_msg, timestamp=timestamp)
message_log.append(role="user", content=last_user_msg, timestamp=timestamp, source="api")
message_log.append(role="error", content=error_msg, timestamp=timestamp, source="api")
return JSONResponse(
status_code=503,
content={"error": error_msg, "timestamp": timestamp},
@@ -141,6 +141,7 @@ async def api_chat_history():
"role": msg.role,
"content": msg.content,
"timestamp": msg.timestamp,
"source": msg.source,
}
for msg in message_log.all()
]

View File

@@ -108,8 +108,8 @@ async def grok_chat(request: Request, message: str = Form(...)):
if not grok_available():
error = "Grok is not available. Set GROK_ENABLED=true and XAI_API_KEY."
message_log.append(role="user", content=f"[Grok] {message}", timestamp=timestamp)
message_log.append(role="error", content=error, timestamp=timestamp)
message_log.append(role="user", content=f"[Grok] {message}", timestamp=timestamp, source="browser")
message_log.append(role="error", content=error, timestamp=timestamp, source="browser")
return templates.TemplateResponse(
request,
"partials/chat_message.html",
@@ -144,10 +144,10 @@ async def grok_chat(request: Request, message: str = Form(...)):
error = f"Grok error: {exc}"
message_log.append(
role="user", content=f"[Ask Grok] {message}", timestamp=timestamp
role="user", content=f"[Ask Grok] {message}", timestamp=timestamp, source="browser"
)
if response_text:
message_log.append(role="agent", content=response_text, timestamp=timestamp)
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="browser")
return templates.TemplateResponse(
request,
"partials/chat_message.html",
@@ -159,7 +159,7 @@ async def grok_chat(request: Request, message: str = Form(...)):
},
)
else:
message_log.append(role="error", content=error, timestamp=timestamp)
message_log.append(role="error", content=error, timestamp=timestamp, source="browser")
return templates.TemplateResponse(
request,
"partials/chat_message.html",

View File

@@ -253,13 +253,22 @@ async def health_check():
# Legacy format for test compatibility
ollama_ok = await check_ollama()
# Determine Timmy's status from swarm registry (heartbeat-backed),
# falling back to Ollama connectivity only if not registered.
try:
from swarm import registry as swarm_registry
timmy_rec = swarm_registry.get_agent("timmy")
timmy_status = timmy_rec.status if timmy_rec else ("idle" if ollama_ok else "offline")
except Exception:
timmy_status = "idle" if ollama_ok else "offline"
return {
"status": "ok" if ollama_ok else "degraded",
"services": {
"ollama": "up" if ollama_ok else "down",
},
"agents": {
"timmy": {"status": "idle" if ollama_ok else "offline"},
"timmy": {"status": timmy_status},
},
# Extended fields for Mission Control
"timestamp": datetime.now(timezone.utc).isoformat(),

View File

@@ -6,6 +6,7 @@ class Message:
role: str # "user" | "agent" | "error"
content: str
timestamp: str
source: str = "browser" # "browser" | "api" | "telegram" | "discord" | "system"
class MessageLog:
@@ -14,8 +15,8 @@ class MessageLog:
def __init__(self) -> None:
self._entries: list[Message] = []
def append(self, role: str, content: str, timestamp: str) -> None:
self._entries.append(Message(role=role, content=content, timestamp=timestamp))
def append(self, role: str, content: str, timestamp: str, source: str = "browser") -> None:
self._entries.append(Message(role=role, content=content, timestamp=timestamp, source=source))
def all(self) -> list[Message]:
return list(self._entries)

View File

@@ -126,6 +126,13 @@ class TaskProcessor:
self._current_task = task
update_task_status(task.id, TaskStatus.RUNNING)
# Heartbeat on task start
try:
from swarm.registry import heartbeat
heartbeat(self.agent_id)
except Exception:
pass
try:
logger.info("Processing task: %s (type: %s)", task.title, task.task_type)
@@ -263,6 +270,13 @@ class TaskProcessor:
logger.info("Task processor started for %s", self.agent_id)
while self._running:
# Heartbeat — update last_seen so health endpoint knows we're alive
try:
from swarm.registry import heartbeat
heartbeat(self.agent_id)
except Exception:
pass # Graceful degradation
try:
await self.process_next_task()
except Exception as e:

View File

@@ -236,6 +236,8 @@ Use `memory_search` when the user refers to past conversations.
6. **Your source code lives at the repository root shown above.** When using git tools, you don't need to specify a path — they automatically run from {REPO_ROOT}.
7. **When asked about your status, queue, agents, memory, or system health, use the `system_status` tool.** Do not guess your own state — call the tool for live data.
## Principles
1. **Sovereignty** — Everything local, no cloud
@@ -262,7 +264,7 @@ class TimmyOrchestrator(BaseAgent):
name="Timmy",
role="orchestrator",
system_prompt=formatted_prompt,
tools=["web_search", "read_file", "write_file", "python", "memory_search", "memory_write"],
tools=["web_search", "read_file", "write_file", "python", "memory_search", "memory_write", "system_status"],
)
# Sub-agent registry

View File

@@ -4,13 +4,17 @@ This provides true sovereignty - Timmy introspects his environment rather than
being told about it in the system prompt.
"""
import logging
import platform
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import httpx
logger = logging.getLogger(__name__)
def get_system_info() -> dict[str, Any]:
"""Introspect the runtime environment to discover system information.
@@ -127,16 +131,207 @@ def get_memory_status() -> dict[str, Any]:
if tier2_exists:
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()]
return {
"tier1_hot_memory": {
"exists": tier1_exists,
"path": str(memory_md),
"preview": tier1_content[:200] if tier1_content else None,
},
"tier2_vault": {
"exists": tier2_exists,
"path": str(vault_path),
"file_count": len(tier2_files),
"files": tier2_files[:10], # First 10 files
},
tier1_info: dict[str, Any] = {
"exists": tier1_exists,
"path": str(memory_md),
"preview": tier1_content[:200] if tier1_content else None,
}
if tier1_exists:
lines = memory_md.read_text().splitlines()
tier1_info["line_count"] = len(lines)
tier1_info["sections"] = [
ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")
]
# Vault — scan all subdirs under memory/
vault_root = repo_root / "memory"
vault_info: dict[str, Any] = {
"exists": tier2_exists,
"path": str(vault_path),
"file_count": len(tier2_files),
"files": tier2_files[:10],
}
if vault_root.exists():
vault_info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
vault_info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
# Tier 3: Semantic memory row count
tier3_info: dict[str, Any] = {"available": False}
try:
import sqlite3
sem_db = repo_root / "data" / "semantic_memory.db"
if sem_db.exists():
conn = sqlite3.connect(str(sem_db))
row = conn.execute(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='vectors'"
).fetchone()
if row and row[0]:
count = conn.execute("SELECT COUNT(*) FROM vectors").fetchone()
tier3_info["available"] = True
tier3_info["vector_count"] = count[0] if count else 0
conn.close()
except Exception:
pass
# Self-coding journal stats
journal_info: dict[str, Any] = {"available": False}
try:
import sqlite3 as _sqlite3
journal_db = repo_root / "data" / "self_coding.db"
if journal_db.exists():
conn = _sqlite3.connect(str(journal_db))
conn.row_factory = _sqlite3.Row
rows = conn.execute(
"SELECT outcome, COUNT(*) as cnt FROM modification_journal GROUP BY outcome"
).fetchall()
if rows:
counts = {r["outcome"]: r["cnt"] for r in rows}
total = sum(counts.values())
journal_info = {
"available": True,
"total_attempts": total,
"successes": counts.get("success", 0),
"failures": counts.get("failure", 0),
"success_rate": round(counts.get("success", 0) / total, 2) if total else 0,
}
conn.close()
except Exception:
pass
return {
"tier1_hot_memory": tier1_info,
"tier2_vault": vault_info,
"tier3_semantic": tier3_info,
"self_coding_journal": journal_info,
}
def get_task_queue_status() -> dict[str, Any]:
"""Get current task queue status for Timmy.
Returns:
Dict with queue counts by status and current task info.
"""
try:
from swarm.task_queue.models import (
get_counts_by_status,
get_current_task_for_agent,
)
counts = get_counts_by_status()
current = get_current_task_for_agent("timmy")
result: dict[str, Any] = {
"counts": counts,
"total": sum(counts.values()),
}
if current:
result["current_task"] = {
"id": current.id,
"title": current.title,
"type": current.task_type,
"started_at": current.started_at,
}
else:
result["current_task"] = None
return result
except Exception as exc:
logger.debug("Task queue status unavailable: %s", exc)
return {"error": str(exc)}
def get_agent_roster() -> dict[str, Any]:
"""Get the swarm agent roster with last-seen ages.
Returns:
Dict with agent list and summary.
"""
try:
from swarm.registry import list_agents
agents = list_agents()
now = datetime.now(timezone.utc)
roster = []
for agent in agents:
last_seen = agent.last_seen
try:
ts = datetime.fromisoformat(last_seen)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age_seconds = int((now - ts).total_seconds())
except Exception:
age_seconds = -1
roster.append({
"id": agent.id,
"name": agent.name,
"status": agent.status,
"capabilities": agent.capabilities,
"last_seen_seconds_ago": age_seconds,
})
return {
"agents": roster,
"total": len(roster),
"idle": sum(1 for a in roster if a["status"] == "idle"),
"busy": sum(1 for a in roster if a["status"] == "busy"),
"offline": sum(1 for a in roster if a["status"] == "offline"),
}
except Exception as exc:
logger.debug("Agent roster unavailable: %s", exc)
return {"error": str(exc)}
def get_live_system_status() -> dict[str, Any]:
"""Comprehensive live system status — Timmy's primary introspection tool.
Combines system info, task queue, agent roster, and memory status
into a single snapshot. Each subsystem degrades gracefully.
Returns:
Dict with system, task_queue, agents, memory, and uptime sections.
"""
result: dict[str, Any] = {}
# System info
try:
result["system"] = get_system_info()
except Exception as exc:
result["system"] = {"error": str(exc)}
# Task queue
result["task_queue"] = get_task_queue_status()
# Agent roster
result["agents"] = get_agent_roster()
# Memory status
try:
result["memory"] = get_memory_status()
except Exception as exc:
result["memory"] = {"error": str(exc)}
# Uptime
try:
from dashboard.routes.health import _START_TIME
uptime = (datetime.now(timezone.utc) - _START_TIME).total_seconds()
result["uptime_seconds"] = int(uptime)
except Exception:
result["uptime_seconds"] = None
# Discord status
try:
from integrations.chat_bridge.vendors.discord import discord_bot
result["discord"] = {"state": discord_bot.state.name}
except Exception:
result["discord"] = {"state": "unknown"}
result["timestamp"] = datetime.now(timezone.utc).isoformat()
return result

View File

@@ -0,0 +1,363 @@
"""Tests for Phase 1 Autonomy Upgrades: UC-01 through UC-04.
UC-01: Live System Introspection Tool
UC-02: Offline Status Bug Fix (heartbeat + health endpoint)
UC-03: Message Source Tagging
UC-04: Discord Token Auto-Detection
"""
from unittest.mock import MagicMock, patch
import pytest
# ── UC-01: Live System Introspection ─────────────────────────────────────────
class TestGetTaskQueueStatus:
"""Test the task queue introspection function."""
def test_returns_counts_and_total(self):
from timmy.tools_intro import get_task_queue_status
result = get_task_queue_status()
assert "counts" in result or "error" in result
if "counts" in result:
assert "total" in result
assert isinstance(result["total"], int)
def test_current_task_none_when_idle(self):
from timmy.tools_intro import get_task_queue_status
result = get_task_queue_status()
if "counts" in result:
assert result["current_task"] is None
def test_graceful_degradation_on_import_error(self):
"""Should return an error dict, not raise."""
import sys
from timmy.tools_intro import get_task_queue_status
# Temporarily block the swarm.task_queue.models import to force the
# except branch. Setting sys.modules[key] = None causes ImportError.
saved = sys.modules.pop("swarm.task_queue.models", "MISSING")
sys.modules["swarm.task_queue.models"] = None # type: ignore[assignment]
try:
result = get_task_queue_status()
assert isinstance(result, dict)
assert "error" in result
finally:
# Restore the real module
del sys.modules["swarm.task_queue.models"]
if saved != "MISSING":
sys.modules["swarm.task_queue.models"] = saved
class TestGetAgentRoster:
"""Test the agent roster introspection function."""
def test_returns_roster_with_counts(self):
from swarm.registry import register
from timmy.tools_intro import get_agent_roster
register(name="TestAgent", capabilities="test", agent_id="test-agent-1")
result = get_agent_roster()
assert "agents" in result
assert "total" in result
assert result["total"] >= 1
def test_agent_has_last_seen_age(self):
from swarm.registry import register
from timmy.tools_intro import get_agent_roster
register(name="AgeTest", capabilities="test", agent_id="age-test-1")
result = get_agent_roster()
agents = result["agents"]
assert len(agents) >= 1
agent = next(a for a in agents if a["id"] == "age-test-1")
assert "last_seen_seconds_ago" in agent
assert agent["last_seen_seconds_ago"] >= 0
def test_summary_counts(self):
from timmy.tools_intro import get_agent_roster
result = get_agent_roster()
assert "idle" in result
assert "busy" in result
assert "offline" in result
class TestGetLiveSystemStatus:
"""Test the composite introspection function."""
def test_returns_all_sections(self):
from timmy.tools_intro import get_live_system_status
result = get_live_system_status()
assert "system" in result
assert "task_queue" in result
assert "agents" in result
assert "memory" in result
assert "timestamp" in result
def test_uptime_present(self):
from timmy.tools_intro import get_live_system_status
result = get_live_system_status()
assert "uptime_seconds" in result
def test_discord_status_present(self):
from timmy.tools_intro import get_live_system_status
result = get_live_system_status()
assert "discord" in result
assert "state" in result["discord"]
class TestSystemStatusMCPTool:
"""Test the MCP-registered system_status tool."""
def test_tool_returns_json_string(self):
import json
from creative.tools.system_status import system_status
result = system_status()
# Should be valid JSON
parsed = json.loads(result)
assert isinstance(parsed, dict)
assert "system" in parsed or "error" in parsed
# ── UC-02: Offline Status Bug Fix ────────────────────────────────────────────
class TestHeartbeat:
"""Test that the heartbeat mechanism updates last_seen."""
def test_heartbeat_updates_last_seen(self):
from swarm.registry import get_agent, heartbeat, register
register(name="HeartbeatTest", capabilities="test", agent_id="hb-test-1")
initial = get_agent("hb-test-1")
assert initial is not None
import time
time.sleep(0.01)
heartbeat("hb-test-1")
updated = get_agent("hb-test-1")
assert updated is not None
assert updated.last_seen >= initial.last_seen
class TestHealthEndpointStatus:
"""Test that /health reflects registry status, not just Ollama."""
def test_health_returns_timmy_status(self, client):
"""Health endpoint should include agents.timmy.status."""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert "agents" in data
assert "timmy" in data["agents"]
assert "status" in data["agents"]["timmy"]
def test_health_status_from_registry(self, client):
"""Timmy's status should come from the swarm registry."""
from swarm.registry import register
# Register Timmy as idle (happens on app startup too)
register(name="Timmy", capabilities="chat", agent_id="timmy")
response = client.get("/health")
data = response.json()
# Should be "idle" from registry, not "offline"
assert data["agents"]["timmy"]["status"] in ("idle", "busy")
# ── UC-03: Message Source Tagging ────────────────────────────────────────────
class TestMessageSourceField:
"""Test that the Message dataclass has a source field."""
def test_message_has_source_field(self):
from dashboard.store import Message
msg = Message(role="user", content="hello", timestamp="12:00:00")
assert hasattr(msg, "source")
assert msg.source == "browser" # Default
def test_message_custom_source(self):
from dashboard.store import Message
msg = Message(
role="user", content="hello", timestamp="12:00:00", source="api"
)
assert msg.source == "api"
class TestMessageLogSource:
"""Test that MessageLog.append() accepts and stores source."""
def test_append_with_source(self):
from dashboard.store import message_log
message_log.append(
role="user", content="hello", timestamp="12:00:00", source="api"
)
entries = message_log.all()
assert len(entries) == 1
assert entries[0].source == "api"
def test_append_default_source(self):
from dashboard.store import message_log
message_log.append(role="user", content="hello", timestamp="12:00:00")
entries = message_log.all()
assert len(entries) == 1
assert entries[0].source == "browser"
def test_multiple_sources(self):
from dashboard.store import message_log
message_log.append(
role="user", content="from browser", timestamp="12:00:00", source="browser"
)
message_log.append(
role="user", content="from api", timestamp="12:00:01", source="api"
)
message_log.append(
role="agent", content="response", timestamp="12:00:02", source="system"
)
entries = message_log.all()
assert len(entries) == 3
assert entries[0].source == "browser"
assert entries[1].source == "api"
assert entries[2].source == "system"
class TestChatHistoryIncludesSource:
"""Test that the /api/chat/history endpoint includes source."""
def test_history_includes_source_field(self, client):
from dashboard.store import message_log
message_log.append(
role="user", content="test msg", timestamp="12:00:00", source="api"
)
response = client.get("/api/chat/history")
assert response.status_code == 200
data = response.json()
assert len(data["messages"]) == 1
assert data["messages"][0]["source"] == "api"
class TestBrowserChatLogsSource:
"""Test that the browser chat route logs with source='browser'."""
def test_browser_chat_source(self, client):
with patch("swarm.task_queue.models.create_task") as mock_create:
mock_task = MagicMock()
mock_task.id = "test-id"
mock_task.title = "hello from browser"
mock_task.status = MagicMock(value="approved")
mock_task.priority = MagicMock(value="normal")
mock_task.assigned_to = "timmy"
mock_create.return_value = mock_task
with patch(
"swarm.task_queue.models.get_queue_status_for_task",
return_value={"position": 1, "total": 1, "percent_ahead": 0},
):
response = client.post(
"/agents/timmy/chat",
data={"message": "hello from browser"},
)
from dashboard.store import message_log
entries = message_log.all()
assert len(entries) >= 1
assert entries[0].source == "browser"
class TestAPIChatLogsSource:
"""Test that the API chat route logs with source='api'."""
def test_api_chat_source(self, client):
with patch(
"dashboard.routes.chat_api.timmy_chat", return_value="Hi from Timmy"
):
response = client.post(
"/api/chat",
json={"messages": [{"role": "user", "content": "hello from api"}]},
)
assert response.status_code == 200
from dashboard.store import message_log
entries = message_log.all()
assert len(entries) == 2 # user + agent
assert entries[0].source == "api"
assert entries[1].source == "api"
# ── UC-04: Discord Token Auto-Detection ──────────────────────────────────────
class TestDiscordDockerfix:
"""Test that the Dockerfile includes discord extras."""
def _find_repo_root(self):
"""Walk up from this test file to find the repo root (has pyproject.toml)."""
from pathlib import Path
d = Path(__file__).resolve().parent
while d != d.parent:
if (d / "pyproject.toml").exists():
return d
d = d.parent
return Path(__file__).resolve().parent.parent # fallback
def test_dashboard_dockerfile_includes_discord(self):
dockerfile = self._find_repo_root() / "docker" / "Dockerfile.dashboard"
if dockerfile.exists():
content = dockerfile.read_text()
assert "--extras discord" in content
def test_main_dockerfile_includes_discord(self):
dockerfile = self._find_repo_root() / "Dockerfile"
if dockerfile.exists():
content = dockerfile.read_text()
assert "--extras discord" in content
def test_test_dockerfile_includes_discord(self):
dockerfile = self._find_repo_root() / "docker" / "Dockerfile.test"
if dockerfile.exists():
content = dockerfile.read_text()
assert "--extras discord" in content
class TestDiscordTokenWatcher:
"""Test the Discord token watcher function exists and is wired."""
def test_watcher_function_exists(self):
from dashboard.app import _discord_token_watcher
assert callable(_discord_token_watcher)
def test_watcher_is_coroutine(self):
import asyncio
from dashboard.app import _discord_token_watcher
assert asyncio.iscoroutinefunction(_discord_token_watcher)