From 1ab26d30ad433016d10205c688c6be6dd363cfb8 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 24 Feb 2026 15:51:15 +0000 Subject: [PATCH] feat: integrate Spark Intelligence into Timmy swarm system MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a self-evolving cognitive layer inspired by vibeship-spark-intelligence, adapted for Timmy's agent architecture. Spark captures swarm events, runs EIDOS prediction-evaluation loops, consolidates memories, and generates advisory recommendations — all backed by SQLite consistent with existing patterns. New modules: - spark/memory.py — event capture with importance scoring + memory consolidation - spark/eidos.py — EIDOS cognitive loop (predict → observe → evaluate → learn) - spark/advisor.py — ranked advisory generation from accumulated intelligence - spark/engine.py — top-level API wiring all subsystems together Dashboard: - /spark/ui — full Spark Intelligence dashboard (3-column: status/advisories, predictions/memories, event timeline) with HTMX auto-refresh - /spark — JSON API for programmatic access - SPARK link added to navigation header Integration: - Coordinator hooks emit Spark events on task post, bid, assign, complete, fail - EIDOS predictions generated when tasks are posted, evaluated on completion - Memory consolidation triggers when agents accumulate enough outcomes - SPARK_ENABLED config toggle (default: true) Tests: 47 new tests covering all Spark subsystems + dashboard routes. Full suite: 538 tests passing. https://claude.ai/code/session_01KJm6jQkNi3aA3yoQJn636c --- pyproject.toml | 1 + src/config.py | 6 + src/dashboard/app.py | 7 + src/dashboard/routes/spark.py | 147 +++++ src/dashboard/templates/base.html | 1 + .../templates/partials/spark_insights.html | 32 + .../templates/partials/spark_timeline.html | 19 + src/dashboard/templates/spark.html | 556 ++++++++++++++++++ src/spark/__init__.py | 0 src/spark/advisor.py | 278 +++++++++ src/spark/eidos.py | 304 ++++++++++ src/spark/engine.py | 288 +++++++++ src/spark/memory.py | 301 ++++++++++ src/swarm/coordinator.py | 53 +- tests/test_spark.py | 431 ++++++++++++++ 15 files changed, 2420 insertions(+), 4 deletions(-) create mode 100644 src/dashboard/routes/spark.py create mode 100644 src/dashboard/templates/partials/spark_insights.html create mode 100644 src/dashboard/templates/partials/spark_timeline.html create mode 100644 src/dashboard/templates/spark.html create mode 100644 src/spark/__init__.py create mode 100644 src/spark/advisor.py create mode 100644 src/spark/eidos.py create mode 100644 src/spark/engine.py create mode 100644 src/spark/memory.py create mode 100644 tests/test_spark.py diff --git a/pyproject.toml b/pyproject.toml index 1364e6a..6b5344b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,7 @@ include = [ "src/notifications", "src/shortcuts", "src/telegram_bot", + "src/spark", ] [tool.pytest.ini_options] diff --git a/src/config.py b/src/config.py index 506e643..4197bd1 100644 --- a/src/config.py +++ b/src/config.py @@ -28,6 +28,12 @@ class Settings(BaseSettings): # 8b ~16 GB | 70b ~140 GB | 405b ~810 GB airllm_model_size: Literal["8b", "70b", "405b"] = "70b" + # ── Spark Intelligence ──────────────────────────────────────────────── + # Enable/disable the Spark cognitive layer. + # When enabled, Spark captures swarm events, runs EIDOS predictions, + # consolidates memories, and generates advisory recommendations. + spark_enabled: bool = True + model_config = SettingsConfigDict( env_file=".env", env_file_encoding="utf-8", diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 78e7be2..729c7cd 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -23,6 +23,7 @@ from dashboard.routes.briefing import router as briefing_router from dashboard.routes.telegram import router as telegram_router from dashboard.routes.swarm_internal import router as swarm_internal_router from dashboard.routes.tools import router as tools_router +from dashboard.routes.spark import router as spark_router logging.basicConfig( level=logging.INFO, @@ -97,6 +98,11 @@ async def lifespan(app: FastAPI): except Exception as exc: logger.error("Failed to spawn persona agents: %s", exc) + # Initialise Spark Intelligence engine + from spark.engine import spark_engine + if spark_engine.enabled: + logger.info("Spark Intelligence active — event capture enabled") + # Auto-start Telegram bot if a token is configured from telegram_bot.bot import telegram_bot await telegram_bot.start() @@ -136,6 +142,7 @@ app.include_router(briefing_router) app.include_router(telegram_router) app.include_router(swarm_internal_router) app.include_router(tools_router) +app.include_router(spark_router) @app.get("/", response_class=HTMLResponse) diff --git a/src/dashboard/routes/spark.py b/src/dashboard/routes/spark.py new file mode 100644 index 0000000..f998050 --- /dev/null +++ b/src/dashboard/routes/spark.py @@ -0,0 +1,147 @@ +"""Spark Intelligence dashboard routes. + +GET /spark — JSON status (API) +GET /spark/ui — HTML Spark Intelligence dashboard +GET /spark/timeline — HTMX partial: recent event timeline +GET /spark/insights — HTMX partial: advisories and insights +GET /spark/predictions — HTMX partial: EIDOS predictions +""" + +import json +import logging +from pathlib import Path + +from fastapi import APIRouter, Request +from fastapi.responses import HTMLResponse +from fastapi.templating import Jinja2Templates + +from spark.engine import spark_engine + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/spark", tags=["spark"]) +templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templates")) + + +@router.get("/ui", response_class=HTMLResponse) +async def spark_ui(request: Request): + """Render the Spark Intelligence dashboard page.""" + status = spark_engine.status() + advisories = spark_engine.get_advisories() + timeline = spark_engine.get_timeline(limit=20) + predictions = spark_engine.get_predictions(limit=10) + memories = spark_engine.get_memories(limit=10) + + # Parse event data JSON for template display + timeline_enriched = [] + for ev in timeline: + entry = { + "id": ev.id, + "event_type": ev.event_type, + "agent_id": ev.agent_id, + "task_id": ev.task_id, + "description": ev.description, + "importance": ev.importance, + "created_at": ev.created_at, + } + try: + entry["data"] = json.loads(ev.data) + except (json.JSONDecodeError, TypeError): + entry["data"] = {} + timeline_enriched.append(entry) + + # Enrich predictions for display + predictions_enriched = [] + for p in predictions: + entry = { + "id": p.id, + "task_id": p.task_id, + "prediction_type": p.prediction_type, + "accuracy": p.accuracy, + "created_at": p.created_at, + "evaluated_at": p.evaluated_at, + } + try: + entry["predicted"] = json.loads(p.predicted_value) + except (json.JSONDecodeError, TypeError): + entry["predicted"] = {} + try: + entry["actual"] = json.loads(p.actual_value) if p.actual_value else None + except (json.JSONDecodeError, TypeError): + entry["actual"] = None + predictions_enriched.append(entry) + + return templates.TemplateResponse( + request, + "spark.html", + { + "status": status, + "advisories": advisories, + "timeline": timeline_enriched, + "predictions": predictions_enriched, + "memories": memories, + }, + ) + + +@router.get("", response_class=HTMLResponse) +async def spark_status_json(): + """Return Spark Intelligence status as JSON.""" + from fastapi.responses import JSONResponse + status = spark_engine.status() + advisories = spark_engine.get_advisories() + return JSONResponse({ + "status": status, + "advisories": [ + { + "category": a.category, + "priority": a.priority, + "title": a.title, + "detail": a.detail, + "suggested_action": a.suggested_action, + "subject": a.subject, + "evidence_count": a.evidence_count, + } + for a in advisories + ], + }) + + +@router.get("/timeline", response_class=HTMLResponse) +async def spark_timeline(request: Request): + """HTMX partial: recent event timeline.""" + timeline = spark_engine.get_timeline(limit=20) + timeline_enriched = [] + for ev in timeline: + entry = { + "id": ev.id, + "event_type": ev.event_type, + "agent_id": ev.agent_id, + "task_id": ev.task_id, + "description": ev.description, + "importance": ev.importance, + "created_at": ev.created_at, + } + try: + entry["data"] = json.loads(ev.data) + except (json.JSONDecodeError, TypeError): + entry["data"] = {} + timeline_enriched.append(entry) + + return templates.TemplateResponse( + request, + "partials/spark_timeline.html", + {"timeline": timeline_enriched}, + ) + + +@router.get("/insights", response_class=HTMLResponse) +async def spark_insights(request: Request): + """HTMX partial: advisories and consolidated memories.""" + advisories = spark_engine.get_advisories() + memories = spark_engine.get_memories(limit=10) + return templates.TemplateResponse( + request, + "partials/spark_insights.html", + {"advisories": advisories, "memories": memories}, + ) diff --git a/src/dashboard/templates/base.html b/src/dashboard/templates/base.html index 4d92db3..1fc5c2a 100644 --- a/src/dashboard/templates/base.html +++ b/src/dashboard/templates/base.html @@ -23,6 +23,7 @@
BRIEFING SWARM + SPARK MARKET TOOLS MOBILE diff --git a/src/dashboard/templates/partials/spark_insights.html b/src/dashboard/templates/partials/spark_insights.html new file mode 100644 index 0000000..108b5d1 --- /dev/null +++ b/src/dashboard/templates/partials/spark_insights.html @@ -0,0 +1,32 @@ +{% if advisories %} + {% for adv in advisories %} +
+
+ {{ adv.category | replace("_", " ") | upper }} + {{ "%.0f"|format(adv.priority * 100) }}% +
+
{{ adv.title }}
+
{{ adv.detail }}
+
{{ adv.suggested_action }}
+
+ {% endfor %} +{% else %} +
No advisories yet. Run more tasks to build intelligence.
+{% endif %} + +{% if memories %} +
+
CONSOLIDATED MEMORIES
+ {% for mem in memories %} +
+
+ {{ mem.memory_type | upper }} + {{ "%.0f"|format(mem.confidence * 100) }}% conf +
+
{{ mem.content }}
+
+ {{ mem.source_events }} events • {{ mem.created_at[:10] }} +
+
+ {% endfor %} +{% endif %} diff --git a/src/dashboard/templates/partials/spark_timeline.html b/src/dashboard/templates/partials/spark_timeline.html new file mode 100644 index 0000000..ead0178 --- /dev/null +++ b/src/dashboard/templates/partials/spark_timeline.html @@ -0,0 +1,19 @@ +{% if timeline %} + {% for ev in timeline %} +
+
+ {{ ev.event_type | replace("_", " ") | upper }} + + {% if ev.importance >= 0.8 %}●●●{% elif ev.importance >= 0.5 %}●●{% else %}●{% endif %} + +
+
{{ ev.description }}
+ {% if ev.task_id %} +
task: {{ ev.task_id[:8] }}{% if ev.agent_id %} • agent: {{ ev.agent_id[:8] }}{% endif %}
+ {% endif %} +
{{ ev.created_at[:19] }}
+
+ {% endfor %} +{% else %} +
No events captured yet.
+{% endif %} diff --git a/src/dashboard/templates/spark.html b/src/dashboard/templates/spark.html new file mode 100644 index 0000000..d6464d5 --- /dev/null +++ b/src/dashboard/templates/spark.html @@ -0,0 +1,556 @@ +{% extends "base.html" %} + +{% block title %}Timmy Time — Spark Intelligence{% endblock %} + +{% block content %} +
+ + +
+
SPARK INTELLIGENCE
+
+ Self-evolving cognitive layer — + {{ status.events_captured }} events captured, + {{ status.memories_stored }} memories, + {{ status.predictions.evaluated }} predictions evaluated +
+
+ +
+ + +
+ + +
+
// EIDOS LOOP
+
+
+
+ PREDICTIONS + {{ status.predictions.total_predictions }} +
+
+ EVALUATED + {{ status.predictions.evaluated }} +
+
+ PENDING + {{ status.predictions.pending }} +
+
+ ACCURACY + + {{ "%.0f"|format(status.predictions.avg_accuracy * 100) }}% + +
+
+
+
+ + +
+
// EVENT PIPELINE
+
+ {% for event_type, count in status.event_types.items() %} +
+ {{ event_type | replace("_", " ") | upper }} + {{ count }} +
+ {% endfor %} +
+
+ + +
+
+ // ADVISORIES + {{ advisories | length }} +
+
+ {% if advisories %} + {% for adv in advisories %} +
+
+ {{ adv.category | replace("_", " ") | upper }} + {{ "%.0f"|format(adv.priority * 100) }}% +
+
{{ adv.title }}
+
{{ adv.detail }}
+
{{ adv.suggested_action }}
+
+ {% endfor %} + {% else %} +
No advisories yet. Run more tasks to build intelligence.
+ {% endif %} +
+
+
+ + +
+ + +
+
// EIDOS PREDICTIONS
+
+ {% if predictions %} + {% for pred in predictions %} +
+
+ {{ pred.task_id[:8] }}... + {% if pred.accuracy is not none %} + + {{ "%.0f"|format(pred.accuracy * 100) }}% + + {% else %} + PENDING + {% endif %} +
+
+ {% if pred.predicted %} +
+ Winner: + {{ (pred.predicted.likely_winner or "?")[:8] }} +
+
+ Success: + {{ "%.0f"|format((pred.predicted.success_probability or 0) * 100) }}% +
+
+ Bid range: + {{ pred.predicted.estimated_bid_range | join("–") }} sats +
+ {% endif %} + {% if pred.actual %} +
+ Actual: + {% if pred.actual.succeeded %}completed{% else %}failed{% endif %} + by {{ (pred.actual.winner or "?")[:8] }} + {% if pred.actual.winning_bid %} at {{ pred.actual.winning_bid }} sats{% endif %} +
+ {% endif %} +
+
{{ pred.created_at[:19] }}
+
+ {% endfor %} + {% else %} +
No predictions yet. Post tasks to activate the EIDOS loop.
+ {% endif %} +
+
+ + +
+
// MEMORIES
+
+ {% if memories %} + {% for mem in memories %} +
+
+ {{ mem.memory_type | upper }} + {{ "%.0f"|format(mem.confidence * 100) }}% conf +
+
{{ mem.content }}
+
+ {{ mem.source_events }} events • {{ mem.created_at[:10] }} +
+
+ {% endfor %} + {% else %} +
Memories will form as patterns emerge.
+ {% endif %} +
+
+
+ + +
+ +
+
+ // EVENT TIMELINE + {{ status.events_captured }} total +
+
+ {% if timeline %} + {% for ev in timeline %} +
+
+ {{ ev.event_type | replace("_", " ") | upper }} + + {% if ev.importance >= 0.8 %}●●●{% elif ev.importance >= 0.5 %}●●{% else %}●{% endif %} + +
+
{{ ev.description }}
+ {% if ev.task_id %} +
task: {{ ev.task_id[:8] }}{% if ev.agent_id %} • agent: {{ ev.agent_id[:8] }}{% endif %}
+ {% endif %} +
{{ ev.created_at[:19] }}
+
+ {% endfor %} + {% else %} +
No events captured yet.
+ {% endif %} +
+
+
+ +
+
+ + +{% endblock %} diff --git a/src/spark/__init__.py b/src/spark/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/spark/advisor.py b/src/spark/advisor.py new file mode 100644 index 0000000..a0bc465 --- /dev/null +++ b/src/spark/advisor.py @@ -0,0 +1,278 @@ +"""Spark advisor — generates ranked recommendations from accumulated intelligence. + +The advisor examines Spark's event history, consolidated memories, and EIDOS +prediction accuracy to produce actionable recommendations for the swarm. + +Categories +---------- +- agent_performance — "Agent X excels at Y, consider routing more Y tasks" +- bid_optimization — "Bids on Z tasks are consistently high, room to save" +- failure_prevention — "Agent A has failed 3 recent tasks, investigate" +- system_health — "No events in 30 min, swarm may be idle" +""" + +import json +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Optional + +from spark import memory as spark_memory +from spark import eidos as spark_eidos + +logger = logging.getLogger(__name__) + +# Minimum events before the advisor starts generating recommendations +_MIN_EVENTS = 3 + + +@dataclass +class Advisory: + """A single ranked recommendation.""" + category: str # agent_performance, bid_optimization, etc. + priority: float # 0.0–1.0 (higher = more urgent) + title: str # Short headline + detail: str # Longer explanation + suggested_action: str # What to do about it + subject: Optional[str] = None # agent_id or None for system-level + evidence_count: int = 0 # Number of supporting events + + +def generate_advisories() -> list[Advisory]: + """Analyse Spark data and produce ranked recommendations. + + Returns advisories sorted by priority (highest first). + """ + advisories: list[Advisory] = [] + + event_count = spark_memory.count_events() + if event_count < _MIN_EVENTS: + advisories.append(Advisory( + category="system_health", + priority=0.3, + title="Insufficient data", + detail=f"Only {event_count} events captured. " + f"Spark needs at least {_MIN_EVENTS} events to generate insights.", + suggested_action="Run more swarm tasks to build intelligence.", + evidence_count=event_count, + )) + return advisories + + advisories.extend(_check_failure_patterns()) + advisories.extend(_check_agent_performance()) + advisories.extend(_check_bid_patterns()) + advisories.extend(_check_prediction_accuracy()) + advisories.extend(_check_system_activity()) + + advisories.sort(key=lambda a: a.priority, reverse=True) + return advisories + + +def _check_failure_patterns() -> list[Advisory]: + """Detect agents with recent failure streaks.""" + results: list[Advisory] = [] + failures = spark_memory.get_events(event_type="task_failed", limit=50) + + # Group failures by agent + agent_failures: dict[str, int] = {} + for ev in failures: + aid = ev.agent_id + if aid: + agent_failures[aid] = agent_failures.get(aid, 0) + 1 + + for aid, count in agent_failures.items(): + if count >= 2: + results.append(Advisory( + category="failure_prevention", + priority=min(1.0, 0.5 + count * 0.15), + title=f"Agent {aid[:8]} has {count} failures", + detail=f"Agent {aid[:8]}... has failed {count} recent tasks. " + f"This pattern may indicate a capability mismatch or " + f"configuration issue.", + suggested_action=f"Review task types assigned to {aid[:8]}... " + f"and consider adjusting routing preferences.", + subject=aid, + evidence_count=count, + )) + + return results + + +def _check_agent_performance() -> list[Advisory]: + """Identify top-performing and underperforming agents.""" + results: list[Advisory] = [] + completions = spark_memory.get_events(event_type="task_completed", limit=100) + failures = spark_memory.get_events(event_type="task_failed", limit=100) + + # Build success/failure counts per agent + agent_success: dict[str, int] = {} + agent_fail: dict[str, int] = {} + + for ev in completions: + aid = ev.agent_id + if aid: + agent_success[aid] = agent_success.get(aid, 0) + 1 + + for ev in failures: + aid = ev.agent_id + if aid: + agent_fail[aid] = agent_fail.get(aid, 0) + 1 + + all_agents = set(agent_success) | set(agent_fail) + for aid in all_agents: + wins = agent_success.get(aid, 0) + fails = agent_fail.get(aid, 0) + total = wins + fails + if total < 2: + continue + + rate = wins / total + if rate >= 0.8 and total >= 3: + results.append(Advisory( + category="agent_performance", + priority=0.6, + title=f"Agent {aid[:8]} excels ({rate:.0%} success)", + detail=f"Agent {aid[:8]}... has completed {wins}/{total} tasks " + f"successfully. Consider routing more tasks to this agent.", + suggested_action="Increase task routing weight for this agent.", + subject=aid, + evidence_count=total, + )) + elif rate <= 0.3 and total >= 3: + results.append(Advisory( + category="agent_performance", + priority=0.75, + title=f"Agent {aid[:8]} struggling ({rate:.0%} success)", + detail=f"Agent {aid[:8]}... has only succeeded on {wins}/{total} tasks. " + f"May need different task types or capability updates.", + suggested_action="Review this agent's capabilities and assigned task types.", + subject=aid, + evidence_count=total, + )) + + return results + + +def _check_bid_patterns() -> list[Advisory]: + """Detect bid optimization opportunities.""" + results: list[Advisory] = [] + bids = spark_memory.get_events(event_type="bid_submitted", limit=100) + + if len(bids) < 5: + return results + + # Extract bid amounts + bid_amounts: list[int] = [] + for ev in bids: + try: + data = json.loads(ev.data) + sats = data.get("bid_sats", 0) + if sats > 0: + bid_amounts.append(sats) + except (json.JSONDecodeError, TypeError): + continue + + if not bid_amounts: + return results + + avg_bid = sum(bid_amounts) / len(bid_amounts) + max_bid = max(bid_amounts) + min_bid = min(bid_amounts) + spread = max_bid - min_bid + + if spread > avg_bid * 1.5: + results.append(Advisory( + category="bid_optimization", + priority=0.5, + title=f"Wide bid spread ({min_bid}–{max_bid} sats)", + detail=f"Bids range from {min_bid} to {max_bid} sats " + f"(avg {avg_bid:.0f}). Large spread may indicate " + f"inefficient auction dynamics.", + suggested_action="Review agent bid strategies for consistency.", + evidence_count=len(bid_amounts), + )) + + if avg_bid > 70: + results.append(Advisory( + category="bid_optimization", + priority=0.45, + title=f"High average bid ({avg_bid:.0f} sats)", + detail=f"The swarm average bid is {avg_bid:.0f} sats across " + f"{len(bid_amounts)} bids. This may be above optimal.", + suggested_action="Consider adjusting base bid rates for persona agents.", + evidence_count=len(bid_amounts), + )) + + return results + + +def _check_prediction_accuracy() -> list[Advisory]: + """Report on EIDOS prediction accuracy.""" + results: list[Advisory] = [] + stats = spark_eidos.get_accuracy_stats() + + if stats["evaluated"] < 3: + return results + + avg = stats["avg_accuracy"] + if avg < 0.4: + results.append(Advisory( + category="system_health", + priority=0.65, + title=f"Low prediction accuracy ({avg:.0%})", + detail=f"EIDOS predictions have averaged {avg:.0%} accuracy " + f"over {stats['evaluated']} evaluations. The learning " + f"model needs more data or the swarm behaviour is changing.", + suggested_action="Continue running tasks; accuracy should improve " + "as the model accumulates more training data.", + evidence_count=stats["evaluated"], + )) + elif avg >= 0.75: + results.append(Advisory( + category="system_health", + priority=0.3, + title=f"Strong prediction accuracy ({avg:.0%})", + detail=f"EIDOS predictions are performing well at {avg:.0%} " + f"average accuracy over {stats['evaluated']} evaluations.", + suggested_action="No action needed. Spark intelligence is learning effectively.", + evidence_count=stats["evaluated"], + )) + + return results + + +def _check_system_activity() -> list[Advisory]: + """Check for system idle patterns.""" + results: list[Advisory] = [] + recent = spark_memory.get_events(limit=5) + + if not recent: + results.append(Advisory( + category="system_health", + priority=0.4, + title="No swarm activity detected", + detail="Spark has not captured any events. " + "The swarm may be idle or Spark event capture is not active.", + suggested_action="Post a task to the swarm to activate the pipeline.", + )) + return results + + # Check event type distribution + types = [e.event_type for e in spark_memory.get_events(limit=100)] + type_counts = {} + for t in types: + type_counts[t] = type_counts.get(t, 0) + 1 + + if "task_completed" not in type_counts and "task_failed" not in type_counts: + if type_counts.get("task_posted", 0) > 3: + results.append(Advisory( + category="system_health", + priority=0.6, + title="Tasks posted but none completing", + detail=f"{type_counts.get('task_posted', 0)} tasks posted " + f"but no completions or failures recorded.", + suggested_action="Check agent availability and auction configuration.", + evidence_count=type_counts.get("task_posted", 0), + )) + + return results diff --git a/src/spark/eidos.py b/src/spark/eidos.py new file mode 100644 index 0000000..0377d40 --- /dev/null +++ b/src/spark/eidos.py @@ -0,0 +1,304 @@ +"""EIDOS cognitive loop — prediction, evaluation, and learning. + +Implements the core Spark learning cycle: +1. PREDICT — Before a task is assigned, predict the outcome +2. OBSERVE — Watch what actually happens +3. EVALUATE — Compare prediction vs reality +4. LEARN — Update internal models based on accuracy + +All predictions and evaluations are stored in SQLite for +transparency and audit. The loop runs passively, recording +predictions when tasks are posted and evaluating them when +tasks complete. +""" + +import json +import logging +import sqlite3 +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +DB_PATH = Path("data/spark.db") + + +@dataclass +class Prediction: + """A prediction made by the EIDOS loop.""" + id: str + task_id: str + prediction_type: str # outcome, best_agent, bid_range + predicted_value: str # JSON-encoded prediction + actual_value: Optional[str] # JSON-encoded actual (filled on evaluation) + accuracy: Optional[float] # 0.0–1.0 (filled on evaluation) + created_at: str + evaluated_at: Optional[str] + + +def _get_conn() -> sqlite3.Connection: + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute( + """ + CREATE TABLE IF NOT EXISTS spark_predictions ( + id TEXT PRIMARY KEY, + task_id TEXT NOT NULL, + prediction_type TEXT NOT NULL, + predicted_value TEXT NOT NULL, + actual_value TEXT, + accuracy REAL, + created_at TEXT NOT NULL, + evaluated_at TEXT + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_pred_task ON spark_predictions(task_id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_pred_type ON spark_predictions(prediction_type)" + ) + conn.commit() + return conn + + +# ── Prediction phase ──────────────────────────────────────────────────────── + +def predict_task_outcome( + task_id: str, + task_description: str, + candidate_agents: list[str], + agent_history: Optional[dict] = None, +) -> dict: + """Predict the outcome of a task before it's assigned. + + Returns a prediction dict with: + - likely_winner: agent_id most likely to win the auction + - success_probability: 0.0–1.0 chance the task succeeds + - estimated_bid_range: (low, high) sats range + """ + # Default prediction when no history exists + prediction = { + "likely_winner": candidate_agents[0] if candidate_agents else None, + "success_probability": 0.7, + "estimated_bid_range": [20, 80], + "reasoning": "baseline prediction (no history)", + } + + if agent_history: + # Adjust based on historical success rates + best_agent = None + best_rate = 0.0 + for aid, metrics in agent_history.items(): + if aid not in candidate_agents: + continue + rate = metrics.get("success_rate", 0.0) + if rate > best_rate: + best_rate = rate + best_agent = aid + + if best_agent: + prediction["likely_winner"] = best_agent + prediction["success_probability"] = round( + min(1.0, 0.5 + best_rate * 0.4), 2 + ) + prediction["reasoning"] = ( + f"agent {best_agent[:8]} has {best_rate:.0%} success rate" + ) + + # Adjust bid range from history + all_bids = [] + for metrics in agent_history.values(): + avg = metrics.get("avg_winning_bid", 0) + if avg > 0: + all_bids.append(avg) + if all_bids: + prediction["estimated_bid_range"] = [ + max(1, int(min(all_bids) * 0.8)), + int(max(all_bids) * 1.2), + ] + + # Store prediction + pred_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc).isoformat() + conn = _get_conn() + conn.execute( + """ + INSERT INTO spark_predictions + (id, task_id, prediction_type, predicted_value, created_at) + VALUES (?, ?, ?, ?, ?) + """, + (pred_id, task_id, "outcome", json.dumps(prediction), now), + ) + conn.commit() + conn.close() + + prediction["prediction_id"] = pred_id + return prediction + + +# ── Evaluation phase ──────────────────────────────────────────────────────── + +def evaluate_prediction( + task_id: str, + actual_winner: Optional[str], + task_succeeded: bool, + winning_bid: Optional[int] = None, +) -> Optional[dict]: + """Evaluate a stored prediction against actual outcomes. + + Returns the evaluation result or None if no prediction exists. + """ + conn = _get_conn() + row = conn.execute( + """ + SELECT * FROM spark_predictions + WHERE task_id = ? AND prediction_type = 'outcome' AND evaluated_at IS NULL + ORDER BY created_at DESC LIMIT 1 + """, + (task_id,), + ).fetchone() + + if not row: + conn.close() + return None + + predicted = json.loads(row["predicted_value"]) + actual = { + "winner": actual_winner, + "succeeded": task_succeeded, + "winning_bid": winning_bid, + } + + # Calculate accuracy + accuracy = _compute_accuracy(predicted, actual) + now = datetime.now(timezone.utc).isoformat() + + conn.execute( + """ + UPDATE spark_predictions + SET actual_value = ?, accuracy = ?, evaluated_at = ? + WHERE id = ? + """, + (json.dumps(actual), accuracy, now, row["id"]), + ) + conn.commit() + conn.close() + + return { + "prediction_id": row["id"], + "predicted": predicted, + "actual": actual, + "accuracy": accuracy, + } + + +def _compute_accuracy(predicted: dict, actual: dict) -> float: + """Score prediction accuracy from 0.0–1.0. + + Components: + - Winner prediction: 0.4 weight (correct = 1.0, wrong = 0.0) + - Success prediction: 0.4 weight (how close) + - Bid range: 0.2 weight (was actual bid in predicted range) + """ + score = 0.0 + weights = 0.0 + + # Winner accuracy + pred_winner = predicted.get("likely_winner") + actual_winner = actual.get("winner") + if pred_winner and actual_winner: + score += 0.4 * (1.0 if pred_winner == actual_winner else 0.0) + weights += 0.4 + + # Success probability accuracy + pred_success = predicted.get("success_probability", 0.5) + actual_success = 1.0 if actual.get("succeeded") else 0.0 + success_error = abs(pred_success - actual_success) + score += 0.4 * (1.0 - success_error) + weights += 0.4 + + # Bid range accuracy + bid_range = predicted.get("estimated_bid_range", [20, 80]) + actual_bid = actual.get("winning_bid") + if actual_bid is not None and len(bid_range) == 2: + low, high = bid_range + if low <= actual_bid <= high: + score += 0.2 + else: + # Partial credit: how far outside the range + distance = min(abs(actual_bid - low), abs(actual_bid - high)) + range_size = max(1, high - low) + score += 0.2 * max(0, 1.0 - distance / range_size) + weights += 0.2 + + return round(score / max(weights, 0.01), 2) + + +# ── Query helpers ────────────────────────────────────────────────────────── + +def get_predictions( + task_id: Optional[str] = None, + evaluated_only: bool = False, + limit: int = 50, +) -> list[Prediction]: + """Query stored predictions.""" + conn = _get_conn() + query = "SELECT * FROM spark_predictions WHERE 1=1" + params: list = [] + + if task_id: + query += " AND task_id = ?" + params.append(task_id) + if evaluated_only: + query += " AND evaluated_at IS NOT NULL" + + query += " ORDER BY created_at DESC LIMIT ?" + params.append(limit) + + rows = conn.execute(query, params).fetchall() + conn.close() + return [ + Prediction( + id=r["id"], + task_id=r["task_id"], + prediction_type=r["prediction_type"], + predicted_value=r["predicted_value"], + actual_value=r["actual_value"], + accuracy=r["accuracy"], + created_at=r["created_at"], + evaluated_at=r["evaluated_at"], + ) + for r in rows + ] + + +def get_accuracy_stats() -> dict: + """Return aggregate accuracy statistics for the EIDOS loop.""" + conn = _get_conn() + row = conn.execute( + """ + SELECT + COUNT(*) AS total_predictions, + COUNT(evaluated_at) AS evaluated, + AVG(CASE WHEN accuracy IS NOT NULL THEN accuracy END) AS avg_accuracy, + MIN(CASE WHEN accuracy IS NOT NULL THEN accuracy END) AS min_accuracy, + MAX(CASE WHEN accuracy IS NOT NULL THEN accuracy END) AS max_accuracy + FROM spark_predictions + """ + ).fetchone() + conn.close() + + return { + "total_predictions": row["total_predictions"] or 0, + "evaluated": row["evaluated"] or 0, + "pending": (row["total_predictions"] or 0) - (row["evaluated"] or 0), + "avg_accuracy": round(row["avg_accuracy"] or 0.0, 2), + "min_accuracy": round(row["min_accuracy"] or 0.0, 2), + "max_accuracy": round(row["max_accuracy"] or 0.0, 2), + } diff --git a/src/spark/engine.py b/src/spark/engine.py new file mode 100644 index 0000000..15bd5b3 --- /dev/null +++ b/src/spark/engine.py @@ -0,0 +1,288 @@ +"""Spark Intelligence engine — the top-level API for Spark integration. + +The engine is the single entry point used by the swarm coordinator and +dashboard routes. It wires together memory capture, EIDOS predictions, +memory consolidation, and the advisory system. + +Usage +----- + from spark.engine import spark_engine + + # Capture a swarm event + spark_engine.on_task_posted(task_id, description) + spark_engine.on_bid_submitted(task_id, agent_id, bid_sats) + spark_engine.on_task_completed(task_id, agent_id, result) + spark_engine.on_task_failed(task_id, agent_id, reason) + + # Query Spark intelligence + spark_engine.status() + spark_engine.get_advisories() + spark_engine.get_timeline() +""" + +import json +import logging +from typing import Optional + +from spark import advisor as spark_advisor +from spark import eidos as spark_eidos +from spark import memory as spark_memory +from spark.advisor import Advisory +from spark.memory import SparkEvent, SparkMemory + +logger = logging.getLogger(__name__) + + +class SparkEngine: + """Top-level Spark Intelligence controller.""" + + def __init__(self, enabled: bool = True) -> None: + self._enabled = enabled + if enabled: + logger.info("Spark Intelligence engine initialised") + + @property + def enabled(self) -> bool: + return self._enabled + + # ── Event capture (called by coordinator) ──────────────────────────────── + + def on_task_posted( + self, + task_id: str, + description: str, + candidate_agents: Optional[list[str]] = None, + ) -> Optional[str]: + """Capture a task-posted event and generate a prediction.""" + if not self._enabled: + return None + + event_id = spark_memory.record_event( + event_type="task_posted", + description=description, + task_id=task_id, + data=json.dumps({"candidates": candidate_agents or []}), + ) + + # Generate EIDOS prediction + if candidate_agents: + spark_eidos.predict_task_outcome( + task_id=task_id, + task_description=description, + candidate_agents=candidate_agents, + ) + + logger.debug("Spark: captured task_posted %s", task_id[:8]) + return event_id + + def on_bid_submitted( + self, task_id: str, agent_id: str, bid_sats: int, + ) -> Optional[str]: + """Capture a bid event.""" + if not self._enabled: + return None + + event_id = spark_memory.record_event( + event_type="bid_submitted", + description=f"Agent {agent_id[:8]} bid {bid_sats} sats", + agent_id=agent_id, + task_id=task_id, + data=json.dumps({"bid_sats": bid_sats}), + ) + + logger.debug("Spark: captured bid %s→%s (%d sats)", + agent_id[:8], task_id[:8], bid_sats) + return event_id + + def on_task_assigned( + self, task_id: str, agent_id: str, + ) -> Optional[str]: + """Capture a task-assigned event.""" + if not self._enabled: + return None + + event_id = spark_memory.record_event( + event_type="task_assigned", + description=f"Task assigned to {agent_id[:8]}", + agent_id=agent_id, + task_id=task_id, + ) + + logger.debug("Spark: captured assignment %s→%s", + task_id[:8], agent_id[:8]) + return event_id + + def on_task_completed( + self, + task_id: str, + agent_id: str, + result: str, + winning_bid: Optional[int] = None, + ) -> Optional[str]: + """Capture a task-completed event and evaluate EIDOS prediction.""" + if not self._enabled: + return None + + event_id = spark_memory.record_event( + event_type="task_completed", + description=f"Task completed by {agent_id[:8]}", + agent_id=agent_id, + task_id=task_id, + data=json.dumps({ + "result_length": len(result), + "winning_bid": winning_bid, + }), + ) + + # Evaluate EIDOS prediction + evaluation = spark_eidos.evaluate_prediction( + task_id=task_id, + actual_winner=agent_id, + task_succeeded=True, + winning_bid=winning_bid, + ) + if evaluation: + accuracy = evaluation["accuracy"] + spark_memory.record_event( + event_type="prediction_result", + description=f"Prediction accuracy: {accuracy:.0%}", + task_id=task_id, + data=json.dumps(evaluation, default=str), + importance=0.7, + ) + + # Consolidate memory if enough events for this agent + self._maybe_consolidate(agent_id) + + logger.debug("Spark: captured completion %s by %s", + task_id[:8], agent_id[:8]) + return event_id + + def on_task_failed( + self, + task_id: str, + agent_id: str, + reason: str, + ) -> Optional[str]: + """Capture a task-failed event and evaluate EIDOS prediction.""" + if not self._enabled: + return None + + event_id = spark_memory.record_event( + event_type="task_failed", + description=f"Task failed by {agent_id[:8]}: {reason[:80]}", + agent_id=agent_id, + task_id=task_id, + data=json.dumps({"reason": reason}), + ) + + # Evaluate EIDOS prediction + spark_eidos.evaluate_prediction( + task_id=task_id, + actual_winner=agent_id, + task_succeeded=False, + ) + + # Failures always worth consolidating + self._maybe_consolidate(agent_id) + + logger.debug("Spark: captured failure %s by %s", + task_id[:8], agent_id[:8]) + return event_id + + def on_agent_joined(self, agent_id: str, name: str) -> Optional[str]: + """Capture an agent-joined event.""" + if not self._enabled: + return None + + return spark_memory.record_event( + event_type="agent_joined", + description=f"Agent {name} ({agent_id[:8]}) joined the swarm", + agent_id=agent_id, + ) + + # ── Memory consolidation ──────────────────────────────────────────────── + + def _maybe_consolidate(self, agent_id: str) -> None: + """Consolidate events into memories when enough data exists.""" + agent_events = spark_memory.get_events(agent_id=agent_id, limit=50) + if len(agent_events) < 5: + return + + completions = [e for e in agent_events if e.event_type == "task_completed"] + failures = [e for e in agent_events if e.event_type == "task_failed"] + total = len(completions) + len(failures) + + if total < 3: + return + + success_rate = len(completions) / total if total else 0 + + if success_rate >= 0.8: + spark_memory.store_memory( + memory_type="pattern", + subject=agent_id, + content=f"Agent {agent_id[:8]} has a strong track record: " + f"{len(completions)}/{total} tasks completed successfully.", + confidence=min(0.95, 0.6 + total * 0.05), + source_events=total, + ) + elif success_rate <= 0.3: + spark_memory.store_memory( + memory_type="anomaly", + subject=agent_id, + content=f"Agent {agent_id[:8]} is struggling: only " + f"{len(completions)}/{total} tasks completed.", + confidence=min(0.95, 0.6 + total * 0.05), + source_events=total, + ) + + # ── Query API ──────────────────────────────────────────────────────────── + + def status(self) -> dict: + """Return a summary of Spark Intelligence state.""" + eidos_stats = spark_eidos.get_accuracy_stats() + return { + "enabled": self._enabled, + "events_captured": spark_memory.count_events(), + "memories_stored": spark_memory.count_memories(), + "predictions": eidos_stats, + "event_types": { + "task_posted": spark_memory.count_events("task_posted"), + "bid_submitted": spark_memory.count_events("bid_submitted"), + "task_assigned": spark_memory.count_events("task_assigned"), + "task_completed": spark_memory.count_events("task_completed"), + "task_failed": spark_memory.count_events("task_failed"), + "agent_joined": spark_memory.count_events("agent_joined"), + }, + } + + def get_advisories(self) -> list[Advisory]: + """Generate current advisories based on accumulated intelligence.""" + if not self._enabled: + return [] + return spark_advisor.generate_advisories() + + def get_timeline(self, limit: int = 50) -> list[SparkEvent]: + """Return recent events as a timeline.""" + return spark_memory.get_events(limit=limit) + + def get_memories(self, limit: int = 50) -> list[SparkMemory]: + """Return consolidated memories.""" + return spark_memory.get_memories(limit=limit) + + def get_predictions(self, limit: int = 20) -> list: + """Return recent EIDOS predictions.""" + return spark_eidos.get_predictions(limit=limit) + + +# Module-level singleton — respects SPARK_ENABLED config +def _create_engine() -> SparkEngine: + try: + from config import settings + return SparkEngine(enabled=settings.spark_enabled) + except Exception: + return SparkEngine(enabled=True) + + +spark_engine = _create_engine() diff --git a/src/spark/memory.py b/src/spark/memory.py new file mode 100644 index 0000000..238d4f3 --- /dev/null +++ b/src/spark/memory.py @@ -0,0 +1,301 @@ +"""Spark memory — SQLite-backed event capture and memory consolidation. + +Captures swarm events (tasks posted, bids, assignments, completions, +failures) and distills them into higher-level memories with importance +scoring. This is the persistence layer for Spark Intelligence. + +Tables +------ +spark_events — raw event log (every swarm event) +spark_memories — consolidated insights extracted from event patterns +""" + +import sqlite3 +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +DB_PATH = Path("data/spark.db") + +# Importance thresholds +IMPORTANCE_LOW = 0.3 +IMPORTANCE_MEDIUM = 0.6 +IMPORTANCE_HIGH = 0.8 + + +@dataclass +class SparkEvent: + """A single captured swarm event.""" + id: str + event_type: str # task_posted, bid, assignment, completion, failure + agent_id: Optional[str] + task_id: Optional[str] + description: str + data: str # JSON payload + importance: float # 0.0–1.0 + created_at: str + + +@dataclass +class SparkMemory: + """A consolidated memory distilled from event patterns.""" + id: str + memory_type: str # pattern, insight, anomaly + subject: str # agent_id or "system" + content: str # Human-readable insight + confidence: float # 0.0–1.0 + source_events: int # How many events contributed + created_at: str + expires_at: Optional[str] + + +def _get_conn() -> sqlite3.Connection: + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute( + """ + CREATE TABLE IF NOT EXISTS spark_events ( + id TEXT PRIMARY KEY, + event_type TEXT NOT NULL, + agent_id TEXT, + task_id TEXT, + description TEXT NOT NULL DEFAULT '', + data TEXT NOT NULL DEFAULT '{}', + importance REAL NOT NULL DEFAULT 0.5, + created_at TEXT NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS spark_memories ( + id TEXT PRIMARY KEY, + memory_type TEXT NOT NULL, + subject TEXT NOT NULL DEFAULT 'system', + content TEXT NOT NULL, + confidence REAL NOT NULL DEFAULT 0.5, + source_events INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL, + expires_at TEXT + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_events_type ON spark_events(event_type)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_events_agent ON spark_events(agent_id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_events_task ON spark_events(task_id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_memories_subject ON spark_memories(subject)" + ) + conn.commit() + return conn + + +# ── Importance scoring ────────────────────────────────────────────────────── + +def score_importance(event_type: str, data: dict) -> float: + """Compute importance score for an event (0.0–1.0). + + High-importance events: failures, large bids, first-time patterns. + Low-importance events: routine bids, repeated successful completions. + """ + base_scores = { + "task_posted": 0.4, + "bid_submitted": 0.2, + "task_assigned": 0.5, + "task_completed": 0.6, + "task_failed": 0.9, + "agent_joined": 0.5, + "prediction_result": 0.7, + } + score = base_scores.get(event_type, 0.5) + + # Boost for failures (always important to learn from) + if event_type == "task_failed": + score = min(1.0, score + 0.1) + + # Boost for high-value bids + bid_sats = data.get("bid_sats", 0) + if bid_sats and bid_sats > 80: + score = min(1.0, score + 0.15) + + return round(score, 2) + + +# ── Event recording ───────────────────────────────────────────────────────── + +def record_event( + event_type: str, + description: str, + agent_id: Optional[str] = None, + task_id: Optional[str] = None, + data: str = "{}", + importance: Optional[float] = None, +) -> str: + """Record a swarm event. Returns the event id.""" + import json + event_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc).isoformat() + + if importance is None: + try: + parsed = json.loads(data) if isinstance(data, str) else data + except (json.JSONDecodeError, TypeError): + parsed = {} + importance = score_importance(event_type, parsed) + + conn = _get_conn() + conn.execute( + """ + INSERT INTO spark_events + (id, event_type, agent_id, task_id, description, data, importance, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + (event_id, event_type, agent_id, task_id, description, data, importance, now), + ) + conn.commit() + conn.close() + return event_id + + +def get_events( + event_type: Optional[str] = None, + agent_id: Optional[str] = None, + task_id: Optional[str] = None, + limit: int = 100, + min_importance: float = 0.0, +) -> list[SparkEvent]: + """Query events with optional filters.""" + conn = _get_conn() + query = "SELECT * FROM spark_events WHERE importance >= ?" + params: list = [min_importance] + + if event_type: + query += " AND event_type = ?" + params.append(event_type) + if agent_id: + query += " AND agent_id = ?" + params.append(agent_id) + if task_id: + query += " AND task_id = ?" + params.append(task_id) + + query += " ORDER BY created_at DESC LIMIT ?" + params.append(limit) + + rows = conn.execute(query, params).fetchall() + conn.close() + return [ + SparkEvent( + id=r["id"], + event_type=r["event_type"], + agent_id=r["agent_id"], + task_id=r["task_id"], + description=r["description"], + data=r["data"], + importance=r["importance"], + created_at=r["created_at"], + ) + for r in rows + ] + + +def count_events(event_type: Optional[str] = None) -> int: + """Count events, optionally filtered by type.""" + conn = _get_conn() + if event_type: + row = conn.execute( + "SELECT COUNT(*) FROM spark_events WHERE event_type = ?", + (event_type,), + ).fetchone() + else: + row = conn.execute("SELECT COUNT(*) FROM spark_events").fetchone() + conn.close() + return row[0] + + +# ── Memory consolidation ─────────────────────────────────────────────────── + +def store_memory( + memory_type: str, + subject: str, + content: str, + confidence: float = 0.5, + source_events: int = 0, + expires_at: Optional[str] = None, +) -> str: + """Store a consolidated memory. Returns the memory id.""" + mem_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc).isoformat() + conn = _get_conn() + conn.execute( + """ + INSERT INTO spark_memories + (id, memory_type, subject, content, confidence, source_events, created_at, expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + (mem_id, memory_type, subject, content, confidence, source_events, now, expires_at), + ) + conn.commit() + conn.close() + return mem_id + + +def get_memories( + memory_type: Optional[str] = None, + subject: Optional[str] = None, + min_confidence: float = 0.0, + limit: int = 50, +) -> list[SparkMemory]: + """Query memories with optional filters.""" + conn = _get_conn() + query = "SELECT * FROM spark_memories WHERE confidence >= ?" + params: list = [min_confidence] + + if memory_type: + query += " AND memory_type = ?" + params.append(memory_type) + if subject: + query += " AND subject = ?" + params.append(subject) + + query += " ORDER BY created_at DESC LIMIT ?" + params.append(limit) + + rows = conn.execute(query, params).fetchall() + conn.close() + return [ + SparkMemory( + id=r["id"], + memory_type=r["memory_type"], + subject=r["subject"], + content=r["content"], + confidence=r["confidence"], + source_events=r["source_events"], + created_at=r["created_at"], + expires_at=r["expires_at"], + ) + for r in rows + ] + + +def count_memories(memory_type: Optional[str] = None) -> int: + """Count memories, optionally filtered by type.""" + conn = _get_conn() + if memory_type: + row = conn.execute( + "SELECT COUNT(*) FROM spark_memories WHERE memory_type = ?", + (memory_type,), + ).fetchone() + else: + row = conn.execute("SELECT COUNT(*) FROM spark_memories").fetchone() + conn.close() + return row[0] diff --git a/src/swarm/coordinator.py b/src/swarm/coordinator.py index 1107bdb..c6d68b2 100644 --- a/src/swarm/coordinator.py +++ b/src/swarm/coordinator.py @@ -29,6 +29,15 @@ from swarm.tasks import ( update_task, ) +# Spark Intelligence integration — lazy import to avoid circular deps +def _get_spark(): + """Lazily import the Spark engine singleton.""" + try: + from spark.engine import spark_engine + return spark_engine + except Exception: + return None + logger = logging.getLogger(__name__) @@ -100,6 +109,10 @@ class SwarmCoordinator: ) # Broadcast bid via WebSocket self._broadcast(self._broadcast_bid, task_id, aid, bid_sats) + # Spark: capture bid event + spark = _get_spark() + if spark: + spark.on_bid_submitted(task_id, aid, bid_sats) self.comms.subscribe("swarm:tasks", _bid_and_register) @@ -109,15 +122,20 @@ class SwarmCoordinator: capabilities=meta["capabilities"], agent_id=aid, ) - + # Register capability manifest with routing engine swarm_routing.routing_engine.register_persona(persona_id, aid) - + self._in_process_nodes.append(node) logger.info("Spawned persona %s (%s)", node.name, aid) - + # Broadcast agent join via WebSocket self._broadcast(self._broadcast_agent_joined, aid, node.name) + + # Spark: capture agent join + spark = _get_spark() + if spark: + spark.on_agent_joined(aid, node.name) return { "agent_id": aid, @@ -193,6 +211,11 @@ class SwarmCoordinator: logger.info("Task posted: %s (%s)", task.id, description[:50]) # Broadcast task posted via WebSocket self._broadcast(self._broadcast_task_posted, task.id, description) + # Spark: capture task-posted event with candidate agents + spark = _get_spark() + if spark: + candidates = [a.id for a in registry.list_agents()] + spark.on_task_posted(task.id, description, candidates) return task async def run_auction_and_assign(self, task_id: str) -> Optional[Bid]: @@ -259,6 +282,10 @@ class SwarmCoordinator: ) # Broadcast task assigned via WebSocket self._broadcast(self._broadcast_task_assigned, task_id, winner.agent_id) + # Spark: capture assignment + spark = _get_spark() + if spark: + spark.on_task_assigned(task_id, winner.agent_id) else: update_task(task_id, status=TaskStatus.FAILED) logger.warning("Task %s: no bids received, marked as failed", task_id) @@ -286,6 +313,10 @@ class SwarmCoordinator: self._broadcast_task_completed, task_id, task.assigned_agent, result ) + # Spark: capture completion + spark = _get_spark() + if spark: + spark.on_task_completed(task_id, task.assigned_agent, result) return updated def fail_task(self, task_id: str, reason: str = "") -> Optional[Task]: @@ -304,6 +335,10 @@ class SwarmCoordinator: registry.update_status(task.assigned_agent, "idle") # Record failure in learner swarm_learner.record_task_result(task_id, task.assigned_agent, succeeded=False) + # Spark: capture failure + spark = _get_spark() + if spark: + spark.on_task_failed(task_id, task.assigned_agent, reason) return updated def get_task(self, task_id: str) -> Optional[Task]: @@ -377,7 +412,7 @@ class SwarmCoordinator: """Return a summary of the swarm state.""" agents = registry.list_agents() tasks = list_tasks() - return { + status = { "agents": len(agents), "agents_idle": sum(1 for a in agents if a.status == "idle"), "agents_busy": sum(1 for a in agents if a.status == "busy"), @@ -388,6 +423,16 @@ class SwarmCoordinator: "active_auctions": len(self.auctions.active_auctions), "routing_manifests": len(swarm_routing.routing_engine._manifests), } + # Include Spark Intelligence summary if available + spark = _get_spark() + if spark and spark.enabled: + spark_status = spark.status() + status["spark"] = { + "events_captured": spark_status["events_captured"], + "memories_stored": spark_status["memories_stored"], + "prediction_accuracy": spark_status["predictions"]["avg_accuracy"], + } + return status def get_routing_decisions(self, task_id: Optional[str] = None, limit: int = 100) -> list: """Get routing decision history for audit. diff --git a/tests/test_spark.py b/tests/test_spark.py new file mode 100644 index 0000000..ce046af --- /dev/null +++ b/tests/test_spark.py @@ -0,0 +1,431 @@ +"""Tests for the Spark Intelligence integration. + +Covers: +- spark.memory: event capture, memory consolidation, importance scoring +- spark.eidos: predictions, evaluations, accuracy stats +- spark.advisor: advisory generation from patterns +- spark.engine: top-level engine wiring all subsystems +- dashboard.routes.spark: HTTP endpoints +""" + +import json +from pathlib import Path + +import pytest + + +# ── Fixtures ──────────────────────────────────────────────────────────────── + +@pytest.fixture(autouse=True) +def tmp_spark_db(tmp_path, monkeypatch): + """Redirect all Spark SQLite writes to a temp directory.""" + db_path = tmp_path / "spark.db" + monkeypatch.setattr("spark.memory.DB_PATH", db_path) + monkeypatch.setattr("spark.eidos.DB_PATH", db_path) + yield db_path + + +# ── spark.memory ──────────────────────────────────────────────────────────── + + +class TestImportanceScoring: + def test_failure_scores_high(self): + from spark.memory import score_importance + score = score_importance("task_failed", {}) + assert score >= 0.9 + + def test_bid_scores_low(self): + from spark.memory import score_importance + score = score_importance("bid_submitted", {}) + assert score <= 0.3 + + def test_high_bid_boosts_score(self): + from spark.memory import score_importance + low = score_importance("bid_submitted", {"bid_sats": 10}) + high = score_importance("bid_submitted", {"bid_sats": 100}) + assert high > low + + def test_unknown_event_default(self): + from spark.memory import score_importance + score = score_importance("unknown_type", {}) + assert score == 0.5 + + +class TestEventRecording: + def test_record_and_query(self): + from spark.memory import record_event, get_events + eid = record_event("task_posted", "Test task", task_id="t1") + assert eid + events = get_events(task_id="t1") + assert len(events) == 1 + assert events[0].event_type == "task_posted" + assert events[0].description == "Test task" + + def test_record_with_agent(self): + from spark.memory import record_event, get_events + record_event("bid_submitted", "Agent bid", agent_id="a1", task_id="t2", + data='{"bid_sats": 50}') + events = get_events(agent_id="a1") + assert len(events) == 1 + assert events[0].agent_id == "a1" + + def test_filter_by_event_type(self): + from spark.memory import record_event, get_events + record_event("task_posted", "posted", task_id="t3") + record_event("task_completed", "completed", task_id="t3") + posted = get_events(event_type="task_posted") + assert len(posted) == 1 + + def test_filter_by_min_importance(self): + from spark.memory import record_event, get_events + record_event("bid_submitted", "low", importance=0.1) + record_event("task_failed", "high", importance=0.9) + high_events = get_events(min_importance=0.5) + assert len(high_events) == 1 + assert high_events[0].event_type == "task_failed" + + def test_count_events(self): + from spark.memory import record_event, count_events + record_event("task_posted", "a") + record_event("task_posted", "b") + record_event("task_completed", "c") + assert count_events() == 3 + assert count_events("task_posted") == 2 + + def test_limit_results(self): + from spark.memory import record_event, get_events + for i in range(10): + record_event("bid_submitted", f"bid {i}") + events = get_events(limit=3) + assert len(events) == 3 + + +class TestMemoryConsolidation: + def test_store_and_query_memory(self): + from spark.memory import store_memory, get_memories + mid = store_memory("pattern", "agent-x", "Strong performer", confidence=0.8) + assert mid + memories = get_memories(subject="agent-x") + assert len(memories) == 1 + assert memories[0].content == "Strong performer" + + def test_filter_by_type(self): + from spark.memory import store_memory, get_memories + store_memory("pattern", "system", "Good pattern") + store_memory("anomaly", "system", "Bad anomaly") + patterns = get_memories(memory_type="pattern") + assert len(patterns) == 1 + assert patterns[0].memory_type == "pattern" + + def test_filter_by_confidence(self): + from spark.memory import store_memory, get_memories + store_memory("pattern", "a", "Low conf", confidence=0.2) + store_memory("pattern", "b", "High conf", confidence=0.9) + high = get_memories(min_confidence=0.5) + assert len(high) == 1 + assert high[0].content == "High conf" + + def test_count_memories(self): + from spark.memory import store_memory, count_memories + store_memory("pattern", "a", "X") + store_memory("anomaly", "b", "Y") + assert count_memories() == 2 + assert count_memories("pattern") == 1 + + +# ── spark.eidos ───────────────────────────────────────────────────────────── + + +class TestPredictions: + def test_predict_stores_prediction(self): + from spark.eidos import predict_task_outcome, get_predictions + result = predict_task_outcome("t1", "Fix the bug", ["agent-a", "agent-b"]) + assert "prediction_id" in result + assert result["likely_winner"] == "agent-a" + preds = get_predictions(task_id="t1") + assert len(preds) == 1 + + def test_predict_with_history(self): + from spark.eidos import predict_task_outcome + history = { + "agent-a": {"success_rate": 0.3, "avg_winning_bid": 40}, + "agent-b": {"success_rate": 0.9, "avg_winning_bid": 30}, + } + result = predict_task_outcome( + "t2", "Research topic", ["agent-a", "agent-b"], + agent_history=history, + ) + assert result["likely_winner"] == "agent-b" + assert result["success_probability"] > 0.5 + + def test_predict_empty_candidates(self): + from spark.eidos import predict_task_outcome + result = predict_task_outcome("t3", "No agents", []) + assert result["likely_winner"] is None + + +class TestEvaluation: + def test_evaluate_correct_prediction(self): + from spark.eidos import predict_task_outcome, evaluate_prediction + predict_task_outcome("t4", "Task", ["agent-a"]) + result = evaluate_prediction("t4", "agent-a", task_succeeded=True, winning_bid=30) + assert result is not None + assert result["accuracy"] > 0.0 + + def test_evaluate_wrong_prediction(self): + from spark.eidos import predict_task_outcome, evaluate_prediction + predict_task_outcome("t5", "Task", ["agent-a"]) + result = evaluate_prediction("t5", "agent-b", task_succeeded=False) + assert result is not None + # Wrong winner + failed = lower accuracy + assert result["accuracy"] < 1.0 + + def test_evaluate_no_prediction_returns_none(self): + from spark.eidos import evaluate_prediction + result = evaluate_prediction("no-task", "agent-a", task_succeeded=True) + assert result is None + + def test_double_evaluation_returns_none(self): + from spark.eidos import predict_task_outcome, evaluate_prediction + predict_task_outcome("t6", "Task", ["agent-a"]) + evaluate_prediction("t6", "agent-a", task_succeeded=True) + # Second evaluation should return None (already evaluated) + result = evaluate_prediction("t6", "agent-a", task_succeeded=True) + assert result is None + + +class TestAccuracyStats: + def test_empty_stats(self): + from spark.eidos import get_accuracy_stats + stats = get_accuracy_stats() + assert stats["total_predictions"] == 0 + assert stats["evaluated"] == 0 + assert stats["avg_accuracy"] == 0.0 + + def test_stats_after_evaluations(self): + from spark.eidos import predict_task_outcome, evaluate_prediction, get_accuracy_stats + for i in range(3): + predict_task_outcome(f"task-{i}", "Description", ["agent-a"]) + evaluate_prediction(f"task-{i}", "agent-a", task_succeeded=True, winning_bid=30) + stats = get_accuracy_stats() + assert stats["total_predictions"] == 3 + assert stats["evaluated"] == 3 + assert stats["pending"] == 0 + assert stats["avg_accuracy"] > 0.0 + + +class TestComputeAccuracy: + def test_perfect_prediction(self): + from spark.eidos import _compute_accuracy + predicted = { + "likely_winner": "agent-a", + "success_probability": 1.0, + "estimated_bid_range": [20, 40], + } + actual = {"winner": "agent-a", "succeeded": True, "winning_bid": 30} + acc = _compute_accuracy(predicted, actual) + assert acc == pytest.approx(1.0, abs=0.01) + + def test_all_wrong(self): + from spark.eidos import _compute_accuracy + predicted = { + "likely_winner": "agent-a", + "success_probability": 1.0, + "estimated_bid_range": [10, 20], + } + actual = {"winner": "agent-b", "succeeded": False, "winning_bid": 100} + acc = _compute_accuracy(predicted, actual) + assert acc < 0.5 + + def test_partial_credit(self): + from spark.eidos import _compute_accuracy + predicted = { + "likely_winner": "agent-a", + "success_probability": 0.5, + "estimated_bid_range": [20, 40], + } + actual = {"winner": "agent-b", "succeeded": True, "winning_bid": 30} + acc = _compute_accuracy(predicted, actual) + # Wrong winner but right success and in bid range → partial + assert 0.2 < acc < 0.8 + + +# ── spark.advisor ─────────────────────────────────────────────────────────── + + +class TestAdvisor: + def test_insufficient_data(self): + from spark.advisor import generate_advisories + advisories = generate_advisories() + assert len(advisories) >= 1 + assert advisories[0].category == "system_health" + assert "Insufficient" in advisories[0].title + + def test_failure_detection(self): + from spark.memory import record_event + from spark.advisor import generate_advisories + # Record enough events to pass the minimum threshold + for i in range(5): + record_event("task_failed", f"Failed task {i}", + agent_id="agent-bad", task_id=f"t-{i}") + advisories = generate_advisories() + failure_advisories = [a for a in advisories if a.category == "failure_prevention"] + assert len(failure_advisories) >= 1 + assert "agent-ba" in failure_advisories[0].title + + def test_advisories_sorted_by_priority(self): + from spark.memory import record_event + from spark.advisor import generate_advisories + for i in range(4): + record_event("task_posted", f"posted {i}", task_id=f"p-{i}") + record_event("task_completed", f"done {i}", + agent_id="agent-good", task_id=f"p-{i}") + advisories = generate_advisories() + if len(advisories) >= 2: + assert advisories[0].priority >= advisories[-1].priority + + def test_no_activity_advisory(self): + from spark.advisor import _check_system_activity + advisories = _check_system_activity() + assert len(advisories) >= 1 + assert "No swarm activity" in advisories[0].title + + +# ── spark.engine ──────────────────────────────────────────────────────────── + + +class TestSparkEngine: + def test_engine_enabled(self): + from spark.engine import SparkEngine + engine = SparkEngine(enabled=True) + assert engine.enabled + + def test_engine_disabled(self): + from spark.engine import SparkEngine + engine = SparkEngine(enabled=False) + result = engine.on_task_posted("t1", "Ignored task") + assert result is None + + def test_on_task_posted(self): + from spark.engine import SparkEngine + from spark.memory import get_events + engine = SparkEngine(enabled=True) + eid = engine.on_task_posted("t1", "Test task", ["agent-a"]) + assert eid is not None + events = get_events(task_id="t1") + assert len(events) == 1 + + def test_on_bid_submitted(self): + from spark.engine import SparkEngine + from spark.memory import get_events + engine = SparkEngine(enabled=True) + eid = engine.on_bid_submitted("t1", "agent-a", 50) + assert eid is not None + events = get_events(event_type="bid_submitted") + assert len(events) == 1 + + def test_on_task_assigned(self): + from spark.engine import SparkEngine + from spark.memory import get_events + engine = SparkEngine(enabled=True) + eid = engine.on_task_assigned("t1", "agent-a") + assert eid is not None + events = get_events(event_type="task_assigned") + assert len(events) == 1 + + def test_on_task_completed_evaluates_prediction(self): + from spark.engine import SparkEngine + from spark.eidos import get_predictions + engine = SparkEngine(enabled=True) + engine.on_task_posted("t1", "Fix bug", ["agent-a"]) + eid = engine.on_task_completed("t1", "agent-a", "Fixed it") + assert eid is not None + preds = get_predictions(task_id="t1") + # Should have prediction(s) evaluated + assert len(preds) >= 1 + + def test_on_task_failed(self): + from spark.engine import SparkEngine + from spark.memory import get_events + engine = SparkEngine(enabled=True) + engine.on_task_posted("t1", "Deploy server", ["agent-a"]) + eid = engine.on_task_failed("t1", "agent-a", "Connection timeout") + assert eid is not None + events = get_events(event_type="task_failed") + assert len(events) == 1 + + def test_on_agent_joined(self): + from spark.engine import SparkEngine + from spark.memory import get_events + engine = SparkEngine(enabled=True) + eid = engine.on_agent_joined("agent-a", "Echo") + assert eid is not None + events = get_events(event_type="agent_joined") + assert len(events) == 1 + + def test_status(self): + from spark.engine import SparkEngine + engine = SparkEngine(enabled=True) + engine.on_task_posted("t1", "Test", ["agent-a"]) + engine.on_bid_submitted("t1", "agent-a", 30) + status = engine.status() + assert status["enabled"] is True + assert status["events_captured"] >= 2 + assert "predictions" in status + assert "event_types" in status + + def test_get_advisories(self): + from spark.engine import SparkEngine + engine = SparkEngine(enabled=True) + advisories = engine.get_advisories() + assert isinstance(advisories, list) + + def test_get_advisories_disabled(self): + from spark.engine import SparkEngine + engine = SparkEngine(enabled=False) + advisories = engine.get_advisories() + assert advisories == [] + + def test_get_timeline(self): + from spark.engine import SparkEngine + engine = SparkEngine(enabled=True) + engine.on_task_posted("t1", "Task 1") + engine.on_task_posted("t2", "Task 2") + timeline = engine.get_timeline(limit=10) + assert len(timeline) == 2 + + def test_memory_consolidation(self): + from spark.engine import SparkEngine + from spark.memory import get_memories + engine = SparkEngine(enabled=True) + # Generate enough completions to trigger consolidation (>=5 events, >=3 outcomes) + for i in range(6): + engine.on_task_completed(f"t-{i}", "agent-star", f"Result {i}") + memories = get_memories(subject="agent-star") + # Should have at least one consolidated memory about strong performance + assert len(memories) >= 1 + + +# ── Dashboard routes ──────────────────────────────────────────────────────── + + +class TestSparkRoutes: + def test_spark_json(self, client): + resp = client.get("/spark") + assert resp.status_code == 200 + data = resp.json() + assert "status" in data + assert "advisories" in data + + def test_spark_ui(self, client): + resp = client.get("/spark/ui") + assert resp.status_code == 200 + assert "SPARK INTELLIGENCE" in resp.text + + def test_spark_timeline(self, client): + resp = client.get("/spark/timeline") + assert resp.status_code == 200 + + def test_spark_insights(self, client): + resp = client.get("/spark/insights") + assert resp.status_code == 200