feat: Automation sprint — webhooks, auto-research, model fallback, SensoryBus (#77, #78, #79, #80)

Block 2 — Gitea webhooks → gateway (#77):
- New server/webhooks.py: translates push/issue/PR/comment events to Matrix messages
- Gateway integration: POST /api/webhook/gitea endpoint
- Bot filtering (hermes, kimi, manus), HMAC signature verification
- 17/17 tests pass

Block 3 — Self-triggering research (#78):
- _evaluate_research_trigger() in bridge.py
- Pattern matching for question-like thoughts (I wonder, How does, etc.)
- Cooldown (10min), seed type filter, active-lock safeguards
- _extract_research_topic() extracts concise topic from thought content
- 6 new tests in test_bridge.py (14 → 17 total)

Block 4 — Model fallback chain (#79):
- New server/ollama_client.py: resilient Ollama client
- Configurable model_chain with auto-retry and model health tracking
- Integrated into ResearchEngine (replaces raw httpx, backward compatible)
- health_check() and status() for monitoring
- 11/11 tests pass, 21/21 research tests still pass

Block 5 — Bridge as SensoryBus subscriber (#80):
- register_on_bus() subscribes to 7 SensoryBus event types
- Adapter methods translate SensoryEvent → Matrix protocol messages
- Ready for Timmy dashboard integration via get_sensory_bus()
- 3 new bus integration tests in test_bridge.py (17 total)

PROTOCOL.md updated with all new capabilities.
This commit is contained in:
Perplexity Computer
2026-03-20 19:05:25 +00:00
parent 9120be3794
commit 647c8e9272
9 changed files with 1882 additions and 1 deletions

View File

@@ -788,3 +788,109 @@ On completion:
- `research_complete` — full report sent
- `scene_add` — research artifact placed in the world
- `bark` — summary announcement
## Gitea Webhooks (Issue #77)
The gateway accepts Gitea webhook POST events and translates them into Matrix
protocol messages. Configure a webhook in Gitea pointing to:
```
POST http://<gateway-host>:<rest-port>/api/webhook/gitea
```
### Supported Events
| Gitea Event | Matrix Messages Generated |
|-------------|--------------------------|
| `push` | `bark` (push summary) + `task_created` per commit (max 3) |
| `issues` (opened) | `bark` + `task_created` |
| `issues` (closed) | `task_update` (completed) + `bark` |
| `pull_request` (opened) | `bark` + `task_created` |
| `pull_request` (closed/merged) | `task_update` + `bark` |
| `issue_comment` (created) | `bark` with comment excerpt |
### Bot Filtering
Events from bot users (`hermes`, `kimi`, `manus`) are automatically filtered
to prevent notification loops.
### HMAC Verification
Optional webhook secret can be configured. The endpoint verifies the
`X-Gitea-Signature` header using HMAC-SHA256.
## Self-Triggering Research (Issue #78)
The cognitive bridge evaluates each thought from `on_thought()` against a set
of research trigger patterns. When a thought contains question-like language
("I wonder...", "How does...", "What is the best..."), the bridge automatically
dispatches a lighter research request (2 loops instead of 3).
### Safeguards
- **Cooldown**: 10-minute minimum between auto-triggered research
- **Seed filter**: Only triggers from research-oriented seed types
(`existential`, `creative`, `observation`, `freeform`, `sovereignty`)
- **Active lock**: Won't trigger while another research is already running
- **Minimum length**: Extracted topic must be at least 10 characters
## Model Fallback Chain (Issue #79)
The `OllamaClient` wraps Ollama's `/api/generate` with automatic model
fallback. When the primary model (hermes3) fails or times out, it tries the
next model in the chain.
### Configuration
```python
from server.ollama_client import OllamaClient, OllamaConfig
client = OllamaClient(OllamaConfig(
ollama_url="http://localhost:11434",
model_chain=["hermes3", "mistral", "llama3.2"],
timeout=120.0,
max_retries=2,
))
```
### Behavior
1. Tries primary model with configured retries
2. On persistent failure, marks model as unhealthy (skipped for 60s)
3. Falls back to next model in chain
4. `health_check()` queries Ollama for available models
5. `status()` returns current health state for monitoring
## SensoryBus Integration (Issue #80)
The bridge can subscribe to Timmy's `SensoryBus` for real-time event
translation. Call `bridge.register_on_bus(bus)` to wire up all handlers.
### Event Mapping
| SensoryBus Event | Bridge Handler |
|-----------------|----------------|
| `thought_completed` | `on_thought()` |
| `cognitive_state_changed` | `on_state_change()` |
| `gitea.push` | bark (push summary) |
| `gitea.issue.opened` | `on_issue_filed()` |
| `gitea.pull_request` | bark (PR action) |
| `visitor.entered` | `on_visitor_enter()` |
| `visitor.left` | `on_visitor_leave()` |
### Usage in Timmy's Dashboard
```python
from timmy.event_bus import get_sensory_bus
from server.bridge import CognitiveBridge
bus = get_sensory_bus()
bridge = CognitiveBridge(gateway_url="ws://localhost:8765")
await bridge.connect()
bridge.register_on_bus(bus)
# Now all SensoryBus events flow into the Matrix automatically
```

View File

@@ -33,6 +33,7 @@ import asyncio
import json
import logging
import random
import re
import time
import uuid
from dataclasses import dataclass, field
@@ -92,6 +93,34 @@ PONDER_SPOTS = [
WANDER_RANGE = 8 # max units from origin
# ── Research trigger patterns (#78) ──────────────────────────────
# Keywords in thoughts that signal research-worthy topics
RESEARCH_TRIGGERS = [
r"\bwhat is\b",
r"\bhow does\b",
r"\bhow do\b",
r"\bhow to\b",
r"\bwhy does\b",
r"\bwhy do\b",
r"\bI wonder\b",
r"\bI should research\b",
r"\bI need to learn\b",
r"\bI want to understand\b",
r"\binvestigate\b",
r"\bexplore the\b",
r"\bwhat are the best\b",
r"\bcompare\b.*\bvs\b",
r"\barchitecture of\b",
r"\bstate of the art\b",
r"\blatest\b.*\bdevelopments\b",
]
RESEARCH_TRIGGER_PATTERNS = [re.compile(p, re.IGNORECASE) for p in RESEARCH_TRIGGERS]
# Minimum seconds between auto-triggered research
RESEARCH_COOLDOWN = 600 # 10 minutes
@dataclass
class BridgeState:
@@ -105,6 +134,7 @@ class BridgeState:
connected: bool = False
researching: bool = False
research_topic: str = ""
last_auto_research_at: float = 0.0
class CognitiveBridge:
@@ -266,6 +296,17 @@ class CognitiveBridge:
log.info(f"Thought → ponder + bark ({seed_type}): {excerpt[:60]}...")
# 4. Evaluate whether this thought should trigger auto-research (#78)
research_topic = self._evaluate_research_trigger(content, seed_type)
if research_topic:
log.info(f"Auto-research triggered: {research_topic[:60]}")
request_id = f"auto-{str(uuid.uuid4())[:6]}"
await self.on_research_request({
"topic": research_topic,
"max_loops": 2, # lighter loop for auto-triggered
"request_id": request_id,
})
async def on_state_change(self, cognitive_state: dict):
"""
Handle a cognitive state change from CognitiveTracker.
@@ -729,6 +770,174 @@ class CognitiveBridge:
log.warning("Gateway connection closed")
self.state.connected = False
# ── SensoryBus integration (#80) ───────────────────────
def register_on_bus(self, bus) -> int:
"""
Subscribe to Timmy's SensoryBus events.
Maps SensoryEvent types to bridge handler methods:
- thought_completed → on_thought()
- cognitive_state_changed → on_state_change()
- gitea.push → on_issue_filed() (bark about the push)
- gitea.issue.opened → on_issue_filed()
- gitea.pull_request → bark about PR
- visitor.entered → on_visitor_enter()
- visitor.left → on_visitor_leave()
Args:
bus: SensoryBus instance (from timmy.event_bus)
Returns:
Number of event types registered.
"""
handlers = {
"thought_completed": self._handle_bus_thought,
"cognitive_state_changed": self._handle_bus_state,
"gitea.push": self._handle_bus_gitea_push,
"gitea.issue.opened": self._handle_bus_gitea_issue,
"gitea.pull_request": self._handle_bus_gitea_pr,
"visitor.entered": self._handle_bus_visitor_enter,
"visitor.left": self._handle_bus_visitor_leave,
}
for event_type, handler in handlers.items():
bus.subscribe(event_type, handler)
log.info(f"Subscribed to SensoryBus: {event_type}")
return len(handlers)
async def _handle_bus_thought(self, event):
"""SensoryBus adapter: thought_completed → on_thought."""
data = event.data if hasattr(event, 'data') else {}
await self.on_thought({
"content": data.get("content", ""),
"seed_type": data.get("seed_type", "freeform"),
"thought_id": data.get("thought_id", ""),
})
async def _handle_bus_state(self, event):
"""SensoryBus adapter: cognitive_state_changed → on_state_change."""
data = event.data if hasattr(event, 'data') else {}
await self.on_state_change({
"mood": data.get("mood", "settled"),
"engagement": data.get("engagement", "idle"),
"energy": data.get("energy", 0.8),
"focus_topic": data.get("focus_topic"),
})
async def _handle_bus_gitea_push(self, event):
"""SensoryBus adapter: gitea.push → bark."""
data = event.data if hasattr(event, 'data') else {}
repo = data.get("repo", "unknown")
branch = data.get("branch", "main")
commits = data.get("commits", 0)
if not await self._ensure_connected():
return
await self._send({
"type": "bark",
"agentId": self.agent_id,
"text": f"Push detected: {commits} commit(s) to {repo}:{branch}",
"emotion": "curious",
})
async def _handle_bus_gitea_issue(self, event):
"""SensoryBus adapter: gitea.issue.opened → on_issue_filed."""
data = event.data if hasattr(event, 'data') else {}
await self.on_issue_filed({
"title": data.get("title", "New issue"),
"url": data.get("url", ""),
})
async def _handle_bus_gitea_pr(self, event):
"""SensoryBus adapter: gitea.pull_request → bark."""
data = event.data if hasattr(event, 'data') else {}
title = data.get("title", "")
action = data.get("action", "opened")
if not await self._ensure_connected():
return
await self._send({
"type": "bark",
"agentId": self.agent_id,
"text": f"PR {action}: {title[:80]}",
"emotion": "determined" if action == "merged" else "curious",
})
async def _handle_bus_visitor_enter(self, event):
"""SensoryBus adapter: visitor.entered → on_visitor_enter."""
await self.on_visitor_enter()
async def _handle_bus_visitor_leave(self, event):
"""SensoryBus adapter: visitor.left → on_visitor_leave."""
await self.on_visitor_leave()
# ── Research trigger evaluation (#78) ─────────────────────
def _evaluate_research_trigger(self, thought_content: str, seed_type: str) -> Optional[str]:
"""
Evaluate whether a thought should auto-trigger a research request.
Returns a research topic string if triggered, or None if not.
Enforces cooldown to prevent research spam.
"""
# Skip if already researching
if self.state.researching:
return None
# Enforce cooldown
now = time.time()
if now - self.state.last_auto_research_at < RESEARCH_COOLDOWN:
return None
# Only trigger from research-oriented seed types
research_seeds = {"existential", "creative", "observation", "freeform", "sovereignty"}
if seed_type not in research_seeds:
return None
# Check if thought matches any research trigger pattern
matched = False
for pattern in RESEARCH_TRIGGER_PATTERNS:
if pattern.search(thought_content):
matched = True
break
if not matched:
return None
# Extract the research topic from the thought
# Use first sentence or up to 200 chars as the topic
topic = self._extract_research_topic(thought_content)
if not topic or len(topic) < 10:
return None
self.state.last_auto_research_at = now
return topic
@staticmethod
def _extract_research_topic(thought: str) -> str:
"""
Extract a concise research topic from a thought.
Looks for questions first, then falls back to first meaningful sentence.
"""
# Try to find a question in the thought
sentences = re.split(r'[.!?]+', thought)
for s in sentences:
s = s.strip()
# Look for question-like sentences
if any(s.lower().startswith(w) for w in
["what", "how", "why", "when", "where", "which", "who",
"i wonder", "i should", "i need to", "i want to"]):
return s[:200]
# Fallback: first non-trivial sentence
for s in sentences:
s = s.strip()
if len(s) > 15:
return s[:200]
return thought[:200]
# ── Helpers ──────────────────────────────────────────────
@staticmethod

View File

@@ -353,6 +353,13 @@ async def start_rest_api(gateway: Gateway, host: str, port: int):
app.router.add_get("/api/gateway/status", handle_status)
app.router.add_get("/health", handle_health)
# Mount Gitea webhook routes (#77)
try:
from server.webhooks import setup_webhook_routes
setup_webhook_routes(app, gateway)
except ImportError:
log.info("webhooks module not found — webhook routes unavailable")
runner = aiohttp_web.AppRunner(app)
await runner.setup()
site = aiohttp_web.TCPSite(runner, host, port)

281
server/ollama_client.py Normal file
View File

@@ -0,0 +1,281 @@
#!/usr/bin/env python3
"""
ollama_client.py — Resilient Ollama client with model fallback chain.
Wraps Ollama's /api/generate with:
1. Configurable model chain (hermes3 → fallback models)
2. Automatic retry with next model on failure/timeout
3. Connection health checks
4. Structured JSON mode support
5. Think-token stripping for reasoning models
Usage:
from server.ollama_client import OllamaClient, OllamaConfig
client = OllamaClient(OllamaConfig(
ollama_url="http://localhost:11434",
model_chain=["hermes3", "mistral", "llama3.2"],
))
result = await client.generate("Summarize this.", system="You are a researcher.")
Resolves Issue #79 — Model fallback chain
Supersedes #51 — Timmy Model Fallback Chain (Hermes → DeepSeek removed per user)
"""
import asyncio
import logging
import re
import time
from dataclasses import dataclass, field
from typing import Optional
try:
import httpx
except ImportError:
import subprocess, sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "httpx", "-q"])
import httpx
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [Ollama] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("ollama_client")
@dataclass
class OllamaConfig:
"""Configuration for the Ollama client."""
ollama_url: str = "http://localhost:11434"
model_chain: list = field(default_factory=lambda: ["hermes3"])
timeout: float = 120.0 # seconds per request
max_retries: int = 2 # retries per model before moving to next
retry_delay: float = 1.0 # seconds between retries
temperature: float = 0.0
health_check_interval: float = 60.0 # seconds between health checks
@dataclass
class GenerateResult:
"""Result from a generate call."""
text: str
model: str # which model actually responded
attempt: int # which attempt succeeded (1-based)
duration_ms: float # generation time
fallback_used: bool # True if primary model failed
class OllamaClient:
"""
Resilient Ollama client with automatic model fallback.
Tries each model in the chain until one succeeds. Tracks health
state to skip known-dead models on subsequent calls.
"""
def __init__(self, config: Optional[OllamaConfig] = None):
self.config = config or OllamaConfig()
self._model_health: dict[str, float] = {} # model → last_failure_time
self._last_health_check: float = 0.0
self._available_models: set[str] = set()
async def generate(
self,
prompt: str,
system: str = "",
json_mode: bool = False,
temperature: Optional[float] = None,
timeout: Optional[float] = None,
) -> GenerateResult:
"""
Generate text using the model chain with automatic fallback.
Tries each model in order. On failure/timeout, moves to the next.
Raises RuntimeError if all models in the chain fail.
"""
temp = temperature if temperature is not None else self.config.temperature
req_timeout = timeout or self.config.timeout
errors = []
attempt = 0
for model in self.config.model_chain:
# Skip models that failed recently (within health_check_interval)
if self._is_model_unhealthy(model):
log.info(f"Skipping unhealthy model: {model}")
continue
for retry in range(self.config.max_retries):
attempt += 1
try:
result = await self._call_generate(
model=model,
prompt=prompt,
system=system,
json_mode=json_mode,
temperature=temp,
timeout=req_timeout,
)
# Success — clear any health mark
self._model_health.pop(model, None)
is_fallback = model != self.config.model_chain[0]
if is_fallback:
log.info(f"Fallback to {model} succeeded (attempt {attempt})")
return GenerateResult(
text=result["text"],
model=model,
attempt=attempt,
duration_ms=result["duration_ms"],
fallback_used=is_fallback,
)
except httpx.TimeoutException as e:
errors.append(f"{model} (attempt {retry+1}): timeout — {e}")
log.warning(f"{model} timeout (attempt {retry+1}/{self.config.max_retries})")
except httpx.ConnectError as e:
errors.append(f"{model}: connection failed — {e}")
log.warning(f"{model} connection failed: {e}")
self._mark_unhealthy(model)
break # Don't retry connect errors — move to next model
except httpx.HTTPStatusError as e:
status = e.response.status_code
if status == 404:
# Model not found on this Ollama instance
errors.append(f"{model}: not found (404)")
log.warning(f"{model} not found on Ollama instance")
self._mark_unhealthy(model)
break
elif status >= 500:
errors.append(f"{model}: server error {status}")
log.warning(f"{model} server error: {status}")
else:
errors.append(f"{model}: HTTP {status}")
log.warning(f"{model} HTTP error: {status}")
except Exception as e:
errors.append(f"{model}: {type(e).__name__}: {e}")
log.warning(f"{model} error: {e}")
# Delay between retries
if retry < self.config.max_retries - 1:
await asyncio.sleep(self.config.retry_delay)
# All models exhausted
error_summary = "; ".join(errors[-5:]) # Last 5 errors
raise RuntimeError(
f"All models in chain failed: [{', '.join(self.config.model_chain)}]. "
f"Errors: {error_summary}"
)
async def health_check(self) -> dict[str, bool]:
"""
Check which models are available on the Ollama instance.
Returns dict of model_name → is_available.
"""
result = {}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(f"{self.config.ollama_url}/api/tags")
resp.raise_for_status()
data = resp.json()
available = {
m["name"].split(":")[0]
for m in data.get("models", [])
}
self._available_models = available
for model in self.config.model_chain:
result[model] = model in available
self._last_health_check = time.time()
log.info(f"Health check: {result}")
except Exception as e:
log.warning(f"Health check failed: {e}")
for model in self.config.model_chain:
result[model] = False
return result
def status(self) -> dict:
"""Return client status for debugging/monitoring."""
return {
"ollama_url": self.config.ollama_url,
"model_chain": self.config.model_chain,
"model_health": {
model: "healthy" if model not in self._model_health else "unhealthy"
for model in self.config.model_chain
},
"available_models": sorted(self._available_models),
}
# ── Internal ─────────────────────────────────────────────
async def _call_generate(
self,
model: str,
prompt: str,
system: str,
json_mode: bool,
temperature: float,
timeout: float,
) -> dict:
"""Make a single Ollama generate API call."""
payload = {
"model": model,
"prompt": prompt,
"system": system,
"stream": False,
"options": {
"temperature": temperature,
},
}
if json_mode:
payload["format"] = "json"
start = time.time()
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(
f"{self.config.ollama_url}/api/generate",
json=payload,
)
resp.raise_for_status()
data = resp.json()
elapsed_ms = (time.time() - start) * 1000
text = data.get("response", "")
text = self._strip_thinking_tokens(text)
return {"text": text, "duration_ms": round(elapsed_ms, 1)}
def _is_model_unhealthy(self, model: str) -> bool:
"""Check if a model was recently marked unhealthy."""
last_failure = self._model_health.get(model)
if last_failure is None:
return False
# Unhealthy status expires after health_check_interval
return (time.time() - last_failure) < self.config.health_check_interval
def _mark_unhealthy(self, model: str):
"""Mark a model as temporarily unhealthy."""
self._model_health[model] = time.time()
log.info(f"Marked {model} as unhealthy")
@staticmethod
def _strip_thinking_tokens(text: str) -> str:
"""Remove <think>...</think> blocks from reasoning model output."""
while "<think>" in text and "</think>" in text:
start = text.find("<think>")
end = text.find("</think>") + len("</think>")
text = text[:start] + text[end:]
if "<think>" in text:
text = text[:text.find("<think>")]
return text.strip()

View File

@@ -60,6 +60,7 @@ class ResearchConfig:
"""Configuration for the research engine."""
ollama_url: str = "http://localhost:11434"
model: str = "hermes3"
model_chain: list = field(default_factory=lambda: ["hermes3"])
max_loops: int = 3
max_tokens_per_source: int = 1000
search_max_results: int = 3
@@ -154,6 +155,21 @@ class ResearchEngine:
self._all_sources: List[SearchResult] = []
self._seen_urls: set = set()
# Initialize the resilient Ollama client (#79)
try:
from server.ollama_client import OllamaClient, OllamaConfig
self._client = OllamaClient(OllamaConfig(
ollama_url=self.config.ollama_url,
model_chain=getattr(self.config, 'model_chain', [self.config.model]),
timeout=self.config.timeout,
temperature=self.config.temperature,
))
self._use_client = True
log.info(f"Using OllamaClient with fallback chain")
except ImportError:
self._use_client = False
log.info(f"OllamaClient not available, using direct httpx")
async def run(self, topic: str, request_id: str = "") -> AsyncGenerator[ResearchProgress, None]:
"""
Run the iterative research loop.
@@ -238,7 +254,18 @@ class ResearchEngine:
# ── LLM calls ───────────────────────────────────────────
async def _ollama_generate(self, prompt: str, system: str, json_mode: bool = False) -> str:
"""Call Ollama's generate API."""
"""Call Ollama with fallback chain (#79) or direct httpx."""
if self._use_client:
result = await self._client.generate(
prompt=prompt,
system=system,
json_mode=json_mode,
)
if result.fallback_used:
log.info(f"Used fallback model: {result.model}")
return result.text
# Legacy direct path (fallback if ollama_client not available)
payload = {
"model": self.config.model,
"prompt": prompt,

View File

@@ -234,6 +234,165 @@ async def test_excerpt(bridge, browser):
assert "" in excerpt or excerpt.endswith(".")
# ── SensoryBus integration tests (#80) ────────────────────────
class MockSensoryEvent:
"""Lightweight mock of timmy.events.SensoryEvent."""
def __init__(self, source, event_type, data=None, actor=""):
self.source = source
self.event_type = event_type
self.data = data or {}
self.actor = actor
class MockSensoryBus:
"""Lightweight mock of timmy.event_bus.SensoryBus."""
def __init__(self):
self._subscribers = {}
def subscribe(self, event_type, callback):
self._subscribers.setdefault(event_type, []).append(callback)
async def emit(self, event):
handlers = self._subscribers.get(event.event_type, [])
handlers.extend(self._subscribers.get("*", []))
for h in handlers:
import asyncio
result = h(event)
if asyncio.iscoroutine(result):
await result
return len(handlers)
@test("SensoryBus: register_on_bus subscribes to all event types")
async def test_bus_register(bridge, browser):
bus = MockSensoryBus()
count = bridge.register_on_bus(bus)
assert count == 7, f"Expected 7 subscriptions, got {count}"
assert "thought_completed" in bus._subscribers
assert "visitor.entered" in bus._subscribers
assert "gitea.push" in bus._subscribers
@test("SensoryBus: thought_completed → on_thought")
async def test_bus_thought(bridge, browser):
bus = MockSensoryBus()
bridge.register_on_bus(bus)
await drain(browser, 0.1)
event = MockSensoryEvent(
source="thinking",
event_type="thought_completed",
data={
"content": "Testing bus integration.",
"seed_type": "observation",
"thought_id": "bus-test-1",
},
)
# Reset cooldown to prevent auto-research from triggering
bridge.state.last_auto_research_at = __import__("time").time()
await bus.emit(event)
await asyncio.sleep(2)
msgs = await drain(browser, timeout=0.5)
types = [m.get("type") for m in msgs]
assert "agent_behavior" in types or "bark" in types, f"Expected behavior/bark, got {types}"
@test("SensoryBus: gitea.push → bark")
async def test_bus_push(bridge, browser):
bus = MockSensoryBus()
bridge.register_on_bus(bus)
await drain(browser, 0.1)
event = MockSensoryEvent(
source="gitea",
event_type="gitea.push",
data={"repo": "the-matrix", "branch": "main", "commits": 3},
)
await bus.emit(event)
await asyncio.sleep(0.5)
msgs = await drain(browser, timeout=0.5)
barks = [m for m in msgs if m.get("type") == "bark"]
assert len(barks) >= 1, f"Expected bark, got types: {[m.get('type') for m in msgs]}"
assert "Push detected" in barks[0]["text"]
assert "the-matrix:main" in barks[0]["text"]
# ── Research trigger tests (#78) ──────────────────────────────
@test("Research trigger: 'I wonder' pattern fires")
async def test_trigger_wonder(bridge, browser):
# Reset cooldown so trigger can fire
bridge.state.last_auto_research_at = 0
bridge.state.researching = False
topic = bridge._evaluate_research_trigger(
"I wonder what the best approach to WebRTC audio is.",
"existential",
)
assert topic is not None, "Expected trigger to fire"
assert "WebRTC" in topic or "wonder" in topic.lower()
@test("Research trigger: non-research seed type skips")
async def test_trigger_skip_seed(bridge, browser):
bridge.state.last_auto_research_at = 0
bridge.state.researching = False
topic = bridge._evaluate_research_trigger(
"I wonder about something deep.",
"scripture", # not in research_seeds
)
assert topic is None, "Should not trigger for scripture seed"
@test("Research trigger: cooldown prevents rapid fire")
async def test_trigger_cooldown(bridge, browser):
import time
bridge.state.last_auto_research_at = time.time() # just triggered
bridge.state.researching = False
topic = bridge._evaluate_research_trigger(
"How does quantum computing work?",
"observation",
)
assert topic is None, "Should be blocked by cooldown"
@test("Research trigger: already researching blocks")
async def test_trigger_already_researching(bridge, browser):
bridge.state.last_auto_research_at = 0
bridge.state.researching = True
topic = bridge._evaluate_research_trigger(
"What is the state of the art in LLM agents?",
"freeform",
)
assert topic is None, "Should be blocked when already researching"
bridge.state.researching = False
@test("Research trigger: no pattern match returns None")
async def test_trigger_no_match(bridge, browser):
bridge.state.last_auto_research_at = 0
bridge.state.researching = False
topic = bridge._evaluate_research_trigger(
"The sun is warm today.",
"observation",
)
assert topic is None, "Should not trigger for non-research thought"
@test("Extract research topic: question extraction")
async def test_topic_extraction(bridge, browser):
topic = CognitiveBridge._extract_research_topic(
"The tower grows. How does WebRTC handle NAT traversal? I must investigate."
)
assert "WebRTC" in topic
assert "How" in topic
# ── Runner ──────────────────────────────────────────────────────
async def run_tests():

View File

@@ -0,0 +1,367 @@
#!/usr/bin/env python3
"""
test_ollama_client.py — Tests for the resilient Ollama client.
Tests fallback logic, health tracking, think-token stripping,
and error handling without requiring a live Ollama instance.
Uses a mock HTTP server to simulate Ollama responses.
Usage:
python server/test_ollama_client.py
"""
import asyncio
import json
import sys
sys.path.insert(0, ".")
from server.ollama_client import OllamaClient, OllamaConfig, GenerateResult
passed = 0
failed = 0
errors = []
def test(name):
"""Decorator for test functions."""
def decorator(func):
async def wrapper():
global passed, failed
try:
result = func()
if asyncio.iscoroutine(result):
await result
passed += 1
print(f"{name}")
except Exception as e:
failed += 1
errors.append((name, str(e)))
print(f"{name}: {e}")
wrapper.__test_name__ = name
return wrapper
return decorator
# ── Mock Ollama server ────────────────────────────────────────────
try:
from aiohttp import web as aiohttp_web
HAS_AIOHTTP = True
except ImportError:
HAS_AIOHTTP = False
class MockOllama:
"""Mock Ollama server for testing fallback behavior."""
def __init__(self):
self.available_models = {"hermes3", "mistral"}
self.fail_models = set() # models that return 500
self.missing_models = set() # models that return 404
self.slow_models = set() # models that take >timeout
self.call_log = [] # track all calls
self.response_text = "Mock response"
async def handle_generate(self, request):
data = await request.json()
model = data.get("model", "")
self.call_log.append(model)
if model in self.missing_models:
return aiohttp_web.Response(status=404, text="model not found")
if model in self.fail_models:
return aiohttp_web.Response(status=500, text="internal error")
if model in self.slow_models:
await asyncio.sleep(10) # will timeout
return aiohttp_web.json_response({
"response": self.response_text,
"model": model,
"done": True,
})
async def handle_tags(self, request):
models = [{"name": f"{m}:latest"} for m in self.available_models]
return aiohttp_web.json_response({"models": models})
async def start_mock_server(mock: MockOllama, port: int):
"""Start mock Ollama on given port, return (runner, url)."""
app = aiohttp_web.Application()
app.router.add_post("/api/generate", mock.handle_generate)
app.router.add_get("/api/tags", mock.handle_tags)
runner = aiohttp_web.AppRunner(app)
await runner.setup()
site = aiohttp_web.TCPSite(runner, "127.0.0.1", port)
await site.start()
return runner, f"http://127.0.0.1:{port}"
# ── Tests ────────────────────────────────────────────────────────
@test("Think-token stripping")
def test_strip_thinking():
text = "<think>reasoning here</think>The actual answer."
result = OllamaClient._strip_thinking_tokens(text)
assert result == "The actual answer."
# Nested
text2 = "Before <think>stuff</think> middle <think>more</think> after."
result2 = OllamaClient._strip_thinking_tokens(text2)
assert "<think>" not in result2
assert "Before" in result2
# Unclosed
text3 = "Answer here <think>trailing thoughts"
result3 = OllamaClient._strip_thinking_tokens(text3)
assert result3 == "Answer here"
@test("Config defaults")
def test_config_defaults():
config = OllamaConfig()
assert config.model_chain == ["hermes3"]
assert config.timeout == 120.0
assert config.max_retries == 2
@test("GenerateResult dataclass")
def test_generate_result():
r = GenerateResult(
text="hello",
model="hermes3",
attempt=1,
duration_ms=100.5,
fallback_used=False,
)
assert r.text == "hello"
assert r.model == "hermes3"
assert not r.fallback_used
@test("Status method returns correct structure")
def test_status():
client = OllamaClient(OllamaConfig(
model_chain=["hermes3", "mistral"],
))
status = client.status()
assert "model_chain" in status
assert len(status["model_chain"]) == 2
assert "model_health" in status
@test("Primary model succeeds on first try")
async def test_primary_success():
if not HAS_AIOHTTP:
return
mock = MockOllama()
mock.response_text = "Primary response"
runner, url = await start_mock_server(mock, 19434)
try:
client = OllamaClient(OllamaConfig(
ollama_url=url,
model_chain=["hermes3", "mistral"],
timeout=5.0,
))
result = await client.generate("test prompt", system="test system")
assert result.text == "Primary response"
assert result.model == "hermes3"
assert result.attempt == 1
assert not result.fallback_used
assert mock.call_log == ["hermes3"]
finally:
await runner.cleanup()
@test("Fallback when primary returns 404")
async def test_fallback_404():
if not HAS_AIOHTTP:
return
mock = MockOllama()
mock.missing_models = {"hermes3"}
mock.response_text = "Fallback response"
runner, url = await start_mock_server(mock, 19435)
try:
client = OllamaClient(OllamaConfig(
ollama_url=url,
model_chain=["hermes3", "mistral"],
timeout=5.0,
))
result = await client.generate("test")
assert result.text == "Fallback response"
assert result.model == "mistral"
assert result.fallback_used
# hermes3 tried once (404, no retry), then mistral
assert "hermes3" in mock.call_log
assert "mistral" in mock.call_log
finally:
await runner.cleanup()
@test("Fallback when primary returns 500")
async def test_fallback_500():
if not HAS_AIOHTTP:
return
mock = MockOllama()
mock.fail_models = {"hermes3"}
mock.response_text = "Fallback ok"
runner, url = await start_mock_server(mock, 19436)
try:
client = OllamaClient(OllamaConfig(
ollama_url=url,
model_chain=["hermes3", "mistral"],
timeout=5.0,
max_retries=2,
retry_delay=0.1,
))
result = await client.generate("test")
assert result.text == "Fallback ok"
assert result.model == "mistral"
assert result.fallback_used
finally:
await runner.cleanup()
@test("All models fail raises RuntimeError")
async def test_all_fail():
if not HAS_AIOHTTP:
return
mock = MockOllama()
mock.fail_models = {"hermes3", "mistral"}
runner, url = await start_mock_server(mock, 19437)
try:
client = OllamaClient(OllamaConfig(
ollama_url=url,
model_chain=["hermes3", "mistral"],
timeout=5.0,
max_retries=1,
retry_delay=0.1,
))
try:
await client.generate("test")
assert False, "Should have raised RuntimeError"
except RuntimeError as e:
assert "All models in chain failed" in str(e)
finally:
await runner.cleanup()
@test("Health check returns model availability")
async def test_health_check():
if not HAS_AIOHTTP:
return
mock = MockOllama()
mock.available_models = {"hermes3", "llama3.2"}
runner, url = await start_mock_server(mock, 19438)
try:
client = OllamaClient(OllamaConfig(
ollama_url=url,
model_chain=["hermes3", "mistral", "llama3.2"],
))
health = await client.health_check()
assert health["hermes3"] is True
assert health["mistral"] is False
assert health["llama3.2"] is True
finally:
await runner.cleanup()
@test("JSON mode passes format parameter")
async def test_json_mode():
if not HAS_AIOHTTP:
return
mock = MockOllama()
mock.response_text = '{"query": "test query"}'
runner, url = await start_mock_server(mock, 19439)
try:
client = OllamaClient(OllamaConfig(
ollama_url=url,
model_chain=["hermes3"],
timeout=5.0,
))
result = await client.generate("test", json_mode=True)
assert "test query" in result.text
finally:
await runner.cleanup()
@test("Unhealthy model is skipped on subsequent calls")
async def test_unhealthy_skip():
if not HAS_AIOHTTP:
return
mock = MockOllama()
mock.missing_models = {"hermes3"}
mock.response_text = "from mistral"
runner, url = await start_mock_server(mock, 19440)
try:
client = OllamaClient(OllamaConfig(
ollama_url=url,
model_chain=["hermes3", "mistral"],
timeout=5.0,
health_check_interval=60.0,
))
# First call — hermes3 fails, falls back to mistral
result1 = await client.generate("first")
assert result1.model == "mistral"
mock.call_log.clear()
# Second call — hermes3 should be skipped (marked unhealthy)
result2 = await client.generate("second")
assert result2.model == "mistral"
assert "hermes3" not in mock.call_log # skipped!
finally:
await runner.cleanup()
# ── Runner ──────────────────────────────────────────────────────
async def run_tests():
global passed, failed
print(f"\nOllama Client test suite")
print(f"{'' * 50}")
tests = [v for v in globals().values() if callable(v) and hasattr(v, "__test_name__")]
for test_fn in tests:
await test_fn()
print(f"{'' * 50}")
print(f"Results: {passed} passed, {failed} failed, {passed + failed} total")
if errors:
print("\nFailures:")
for name, err in errors:
print(f"{name}: {err}")
return failed == 0
if __name__ == "__main__":
success = asyncio.run(run_tests())
sys.exit(0 if success else 1)

354
server/test_webhooks.py Normal file
View File

@@ -0,0 +1,354 @@
#!/usr/bin/env python3
"""
test_webhooks.py — Tests for the Gitea webhook → Matrix translator.
Tests the event translation logic, bot filtering, signature verification,
and the full process_webhook pipeline.
Usage:
python server/test_webhooks.py
"""
import asyncio
import json
import sys
# Insert repo root into path
sys.path.insert(0, ".")
from server.webhooks import (
translate_push,
translate_issue,
translate_pull_request,
translate_comment,
process_webhook,
_agent_for_user,
_issue_priority,
_verify_signature,
BOT_FILTER,
)
passed = 0
failed = 0
errors = []
def test(name):
"""Decorator for test functions."""
def decorator(func):
async def wrapper():
global passed, failed
try:
result = func()
if asyncio.iscoroutine(result):
await result
passed += 1
print(f"{name}")
except Exception as e:
failed += 1
errors.append((name, str(e)))
print(f"{name}: {e}")
wrapper.__test_name__ = name
return wrapper
return decorator
# ── Push event tests ─────────────────────────────────────────────
@test("Push: generates bark + task_created for each commit")
def test_push_basic():
payload = {
"pusher": {"login": "perplexity"},
"repository": {"full_name": "perplexity/the-matrix"},
"ref": "refs/heads/main",
"commits": [
{"id": "abc123", "message": "feat: add webhooks\nSome body text"},
{"id": "def456", "message": "fix: gateway routing"},
],
}
msgs = translate_push(payload)
# Should have 1 bark + 2 task_created = 3 messages
assert len(msgs) == 3, f"Expected 3 messages, got {len(msgs)}"
# First message is the bark
assert msgs[0]["type"] == "bark"
assert "2 commits" in msgs[0]["text"]
assert "perplexity/the-matrix:main" in msgs[0]["text"]
assert msgs[0]["agentId"] == "perplexity"
# Next two are task_created
assert msgs[1]["type"] == "task_created"
assert msgs[1]["title"] == "feat: add webhooks"
assert msgs[1]["status"] == "completed"
assert msgs[2]["type"] == "task_created"
assert msgs[2]["title"] == "fix: gateway routing"
@test("Push: caps at 3 commits")
def test_push_cap():
payload = {
"pusher": {"login": "rockachopa"},
"repository": {"full_name": "rockachopa/the-matrix"},
"ref": "refs/heads/dev",
"commits": [{"id": f"c{i}", "message": f"commit {i}"} for i in range(10)],
}
msgs = translate_push(payload)
# 1 bark + 3 task_created (capped)
assert len(msgs) == 4, f"Expected 4 messages, got {len(msgs)}"
@test("Push: empty commits yields no messages")
def test_push_empty():
payload = {
"pusher": {"login": "test"},
"repository": {"full_name": "test/repo"},
"ref": "refs/heads/main",
"commits": [],
}
msgs = translate_push(payload)
assert len(msgs) == 0
# ── Issue event tests ─────────────────────────────────────────────
@test("Issue opened: bark + task_created")
def test_issue_opened():
payload = {
"action": "opened",
"issue": {"title": "Bug in gateway", "number": 42, "labels": []},
"sender": {"login": "rockachopa"},
}
msgs = translate_issue(payload)
assert len(msgs) == 2
assert msgs[0]["type"] == "bark"
assert "#42" in msgs[0]["text"]
assert msgs[0]["agentId"] == "timmy" # rockachopa maps to timmy
assert msgs[1]["type"] == "task_created"
assert msgs[1]["task_id"] == "issue-42"
assert msgs[1]["status"] == "pending"
@test("Issue closed: task_update + bark")
def test_issue_closed():
payload = {
"action": "closed",
"issue": {"title": "Bug in gateway", "number": 42, "labels": []},
"sender": {"login": "perplexity"},
}
msgs = translate_issue(payload)
assert len(msgs) == 2
assert msgs[0]["type"] == "task_update"
assert msgs[0]["status"] == "completed"
@test("Issue with high-priority label")
def test_issue_priority():
payload = {
"action": "opened",
"issue": {
"title": "Critical failure",
"number": 99,
"labels": [{"name": "critical"}, {"name": "bug"}],
},
"sender": {"login": "perplexity"},
}
msgs = translate_issue(payload)
task = next(m for m in msgs if m["type"] == "task_created")
assert task["priority"] == "high"
# ── PR event tests ───────────────────────────────────────────────
@test("PR opened: bark + task_created")
def test_pr_opened():
payload = {
"action": "opened",
"pull_request": {"title": "feat: deep research", "number": 76},
"sender": {"login": "perplexity"},
}
msgs = translate_pull_request(payload)
assert len(msgs) == 2
assert msgs[0]["type"] == "bark"
assert "PR #76" in msgs[0]["text"]
assert msgs[1]["type"] == "task_created"
assert msgs[1]["task_id"] == "pr-76"
assert msgs[1]["status"] == "in_progress"
@test("PR merged: task_update completed + bark")
def test_pr_merged():
payload = {
"action": "closed",
"pull_request": {"title": "feat: deep research", "number": 76, "merged": True},
"sender": {"login": "perplexity"},
}
msgs = translate_pull_request(payload)
assert len(msgs) == 2
assert msgs[0]["type"] == "task_update"
assert msgs[0]["status"] == "completed"
assert "Merged" in msgs[1]["text"]
@test("PR closed without merge: task_update failed")
def test_pr_closed_not_merged():
payload = {
"action": "closed",
"pull_request": {"title": "bad PR", "number": 77, "merged": False},
"sender": {"login": "perplexity"},
}
msgs = translate_pull_request(payload)
assert msgs[0]["status"] == "failed"
assert "Closed" in msgs[1]["text"]
# ── Comment event tests ──────────────────────────────────────────
@test("Comment on issue: bark with excerpt")
def test_comment():
payload = {
"action": "created",
"comment": {"body": "This looks good, let's merge."},
"issue": {"number": 42, "pull_request": None},
"sender": {"login": "rockachopa"},
}
msgs = translate_comment(payload)
assert len(msgs) == 1
assert msgs[0]["type"] == "bark"
assert "Issue #42" in msgs[0]["text"]
@test("Comment on PR: bark says PR")
def test_comment_on_pr():
payload = {
"action": "created",
"comment": {"body": "LGTM"},
"issue": {"number": 76, "pull_request": {}},
"sender": {"login": "perplexity"},
}
msgs = translate_comment(payload)
assert len(msgs) == 1
assert "PR #76" in msgs[0]["text"]
# ── Bot filter test ──────────────────────────────────────────────
@test("Bot sender is filtered out")
async def test_bot_filter():
broadcast_log = []
async def mock_broadcast(msg):
broadcast_log.append(msg)
payload = {
"action": "opened",
"issue": {"title": "Auto issue", "number": 100, "labels": []},
"sender": {"login": "hermes"}, # in BOT_FILTER
}
result = await process_webhook(
event_type="issues",
payload=payload,
broadcast_fn=mock_broadcast,
)
assert result["status"] == "filtered"
assert len(broadcast_log) == 0
# ── process_webhook integration ──────────────────────────────────
@test("process_webhook: routes push event correctly")
async def test_process_push():
broadcast_log = []
async def mock_broadcast(msg):
broadcast_log.append(msg)
payload = {
"pusher": {"login": "perplexity"},
"sender": {"login": "perplexity"},
"repository": {"full_name": "perplexity/the-matrix"},
"ref": "refs/heads/main",
"commits": [{"id": "abc123", "message": "test commit"}],
}
result = await process_webhook(
event_type="push",
payload=payload,
broadcast_fn=mock_broadcast,
)
assert result["status"] == "ok"
assert result["messages"] == 2 # 1 bark + 1 task_created
assert len(broadcast_log) == 2
@test("process_webhook: unknown event type is ignored")
async def test_unknown_event():
async def mock_broadcast(msg):
pass
result = await process_webhook(
event_type="release",
payload={"sender": {"login": "test"}},
broadcast_fn=mock_broadcast,
)
assert result["status"] == "ignored"
# ── Helper tests ─────────────────────────────────────────────────
@test("Agent mapping: rockachopa → timmy")
def test_agent_map():
assert _agent_for_user("rockachopa") == "timmy"
assert _agent_for_user("perplexity") == "perplexity"
assert _agent_for_user("UnknownUser") == "unknownuser"
@test("Priority from labels")
def test_priority_labels():
assert _issue_priority({"labels": [{"name": "critical"}]}) == "high"
assert _issue_priority({"labels": [{"name": "low"}]}) == "low"
assert _issue_priority({"labels": []}) == "normal"
@test("HMAC signature verification")
def test_signature():
body = b'{"test": true}'
secret = "mysecret"
import hashlib, hmac as _hmac
sig = _hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
assert _verify_signature(body, secret, sig) is True
assert _verify_signature(body, secret, "badsig") is False
assert _verify_signature(body, secret, "") is False
# ── Runner ──────────────────────────────────────────────────────
async def run_tests():
global passed, failed
print(f"\nWebhook translator test suite")
print(f"{'' * 50}")
tests = [v for v in globals().values() if callable(v) and hasattr(v, "__test_name__")]
for test_fn in tests:
await test_fn()
print(f"{'' * 50}")
print(f"Results: {passed} passed, {failed} failed, {passed + failed} total")
if errors:
print("\nFailures:")
for name, err in errors:
print(f"{name}: {err}")
return failed == 0
if __name__ == "__main__":
success = asyncio.run(run_tests())
sys.exit(0 if success else 1)

371
server/webhooks.py Normal file
View File

@@ -0,0 +1,371 @@
#!/usr/bin/env python3
"""
webhooks.py — Gitea webhook receiver for the Matrix Gateway.
Receives Gitea webhook POST events (push, issue, PR, etc.) and translates
them into Matrix protocol messages broadcast via the WS gateway.
Integration:
# In gateway.py startup — mount on the existing aiohttp app
from server.webhooks import setup_webhook_routes
setup_webhook_routes(app, gateway)
# Gitea webhook URL: POST http://<gateway-host>:<rest-port>/api/webhook/gitea
Resolves Issue #77 — Gitea webhooks → gateway
"""
import hashlib
import hmac
import json
import logging
import time
import uuid
from typing import Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [Webhooks] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("webhooks")
# Bot usernames whose events we suppress to avoid noise loops
BOT_FILTER = {"hermes", "kimi", "manus"}
# ── Event translation ────────────────────────────────────────────
def translate_push(payload: dict) -> list[dict]:
"""Translate a Gitea push event into Matrix messages."""
pusher = payload.get("pusher", {}).get("login", "unknown")
repo = payload.get("repository", {}).get("full_name", "unknown")
commits = payload.get("commits", [])
ref = payload.get("ref", "").split("/")[-1] # branch name
if not commits:
return []
msgs: list[dict] = []
# 1. Bark announcing the push
commit_word = "commit" if len(commits) == 1 else "commits"
msgs.append({
"type": "bark",
"agentId": _agent_for_user(pusher),
"text": f"Pushed {len(commits)} {commit_word} to {repo}:{ref}",
"emotion": "determined",
})
# 2. Task created for each commit (shows as floating objects)
for commit in commits[:3]: # cap at 3 to avoid flooding
short_msg = commit.get("message", "").split("\n")[0][:80]
msgs.append({
"type": "task_created",
"task_id": commit.get("id", str(uuid.uuid4()))[:12],
"agent_id": _agent_for_user(pusher),
"title": short_msg,
"status": "completed",
"priority": "normal",
})
return msgs
def translate_issue(payload: dict) -> list[dict]:
"""Translate a Gitea issue event into Matrix messages."""
action = payload.get("action", "")
issue = payload.get("issue", {})
sender = payload.get("sender", {}).get("login", "unknown")
title = issue.get("title", "untitled")
number = issue.get("number", 0)
msgs: list[dict] = []
if action == "opened":
msgs.append({
"type": "bark",
"agentId": _agent_for_user(sender),
"text": f"Opened issue #{number}: {title[:80]}",
"emotion": "curious",
})
msgs.append({
"type": "task_created",
"task_id": f"issue-{number}",
"agent_id": _agent_for_user(sender),
"title": f"#{number}: {title[:60]}",
"status": "pending",
"priority": _issue_priority(issue),
})
elif action == "closed":
msgs.append({
"type": "task_update",
"task_id": f"issue-{number}",
"agent_id": _agent_for_user(sender),
"title": f"#{number}: {title[:60]}",
"status": "completed",
"priority": _issue_priority(issue),
})
msgs.append({
"type": "bark",
"agentId": _agent_for_user(sender),
"text": f"Closed #{number}: {title[:60]}",
"emotion": "determined",
})
elif action == "edited":
msgs.append({
"type": "task_update",
"task_id": f"issue-{number}",
"agent_id": _agent_for_user(sender),
"title": f"#{number}: {title[:60]}",
"status": "in_progress",
"priority": _issue_priority(issue),
})
elif action == "assigned":
assignee = issue.get("assignee", {}).get("login", "")
if assignee:
msgs.append({
"type": "bark",
"agentId": _agent_for_user(sender),
"text": f"Assigned #{number} to {assignee}",
"emotion": "neutral",
})
return msgs
def translate_pull_request(payload: dict) -> list[dict]:
"""Translate a Gitea pull request event into Matrix messages."""
action = payload.get("action", "")
pr = payload.get("pull_request", {})
sender = payload.get("sender", {}).get("login", "unknown")
title = pr.get("title", "untitled")
number = pr.get("number", 0)
msgs: list[dict] = []
if action == "opened":
msgs.append({
"type": "bark",
"agentId": _agent_for_user(sender),
"text": f"Opened PR #{number}: {title[:80]}",
"emotion": "excited",
})
msgs.append({
"type": "task_created",
"task_id": f"pr-{number}",
"agent_id": _agent_for_user(sender),
"title": f"PR #{number}: {title[:60]}",
"status": "in_progress",
"priority": "high",
})
elif action == "closed":
merged = pr.get("merged", False)
status = "completed" if merged else "failed"
verb = "Merged" if merged else "Closed"
msgs.append({
"type": "task_update",
"task_id": f"pr-{number}",
"agent_id": _agent_for_user(sender),
"title": f"PR #{number}: {title[:60]}",
"status": status,
"priority": "high",
})
msgs.append({
"type": "bark",
"agentId": _agent_for_user(sender),
"text": f"{verb} PR #{number}: {title[:60]}",
"emotion": "determined" if merged else "hesitant",
})
return msgs
def translate_comment(payload: dict) -> list[dict]:
"""Translate a Gitea comment event into Matrix messages."""
action = payload.get("action", "")
comment = payload.get("comment", {})
sender = payload.get("sender", {}).get("login", "unknown")
body = comment.get("body", "")
if action != "created" or not body:
return []
# Determine context (issue or PR)
issue = payload.get("issue", {})
number = issue.get("number", 0)
is_pr = issue.get("pull_request") is not None
prefix = "PR" if is_pr else "Issue"
excerpt = body[:120].replace("\n", " ")
if len(body) > 120:
excerpt += ""
return [{
"type": "bark",
"agentId": _agent_for_user(sender),
"text": f"Commented on {prefix} #{number}: {excerpt}",
"emotion": "neutral",
}]
# ── Dispatch table ────────────────────────────────────────────────
EVENT_HANDLERS = {
"push": translate_push,
"issues": translate_issue,
"pull_request": translate_pull_request,
"issue_comment": translate_comment,
}
async def process_webhook(
event_type: str,
payload: dict,
broadcast_fn,
secret: str = "",
raw_body: bytes = b"",
signature: str = "",
) -> dict:
"""
Process a Gitea webhook event.
Args:
event_type: Gitea event type header (X-Gitea-Event)
payload: Parsed JSON payload
broadcast_fn: async callable(msg: dict) to broadcast to all WS clients
secret: Optional webhook secret for HMAC verification
raw_body: Raw request body for signature verification
signature: X-Gitea-Signature header value
Returns:
dict with processing result
"""
# Signature verification (if secret configured)
if secret and raw_body:
if not _verify_signature(raw_body, secret, signature):
log.warning(f"Webhook signature mismatch for {event_type}")
return {"status": "error", "reason": "invalid_signature"}
# Filter bot events
sender = payload.get("sender", {}).get("login", "")
if sender.lower() in BOT_FILTER:
log.info(f"Filtered bot event from {sender}")
return {"status": "filtered", "reason": "bot_sender"}
# Find handler
handler = EVENT_HANDLERS.get(event_type)
if not handler:
log.info(f"Unhandled event type: {event_type}")
return {"status": "ignored", "reason": f"unhandled_event_type:{event_type}"}
# Translate to Matrix messages
messages = handler(payload)
if not messages:
log.info(f"No messages generated for {event_type}")
return {"status": "ok", "messages": 0}
# Broadcast each message
for msg in messages:
try:
await broadcast_fn(msg)
except Exception as e:
log.error(f"Broadcast failed: {e}")
log.info(f"Webhook {event_type}: broadcast {len(messages)} messages (sender={sender})")
return {"status": "ok", "messages": len(messages)}
# ── aiohttp route setup ──────────────────────────────────────────
def setup_webhook_routes(app, gateway, secret: str = ""):
"""
Mount the Gitea webhook endpoint onto an aiohttp Application.
Args:
app: aiohttp.web.Application
gateway: Gateway instance (has _broadcast method)
secret: Optional webhook secret for HMAC verification
"""
try:
from aiohttp import web as aiohttp_web
except ImportError:
log.warning("aiohttp not installed — webhook routes unavailable")
return
async def handle_gitea_webhook(request):
"""POST /api/webhook/gitea"""
event_type = request.headers.get("X-Gitea-Event", "")
signature = request.headers.get("X-Gitea-Signature", "")
if not event_type:
return aiohttp_web.json_response(
{"error": "missing X-Gitea-Event header"}, status=400
)
try:
raw_body = await request.read()
payload = json.loads(raw_body)
except (json.JSONDecodeError, Exception) as e:
return aiohttp_web.json_response(
{"error": f"invalid JSON: {e}"}, status=400
)
# Use gateway broadcast as the broadcast function
async def broadcast(msg):
await gateway._broadcast(msg)
result = await process_webhook(
event_type=event_type,
payload=payload,
broadcast_fn=broadcast,
secret=secret,
raw_body=raw_body,
signature=signature,
)
status_code = 200 if result["status"] in ("ok", "filtered", "ignored") else 403
return aiohttp_web.json_response(result, status=status_code)
app.router.add_post("/api/webhook/gitea", handle_gitea_webhook)
log.info("Webhook route registered: POST /api/webhook/gitea")
# ── Helpers ──────────────────────────────────────────────────────
def _agent_for_user(username: str) -> str:
"""Map a Gitea username to a Matrix agent_id."""
# Known mappings
user_map = {
"rockachopa": "timmy",
"hermes": "timmy",
"perplexity": "perplexity",
"replit": "replit",
"kimi": "kimi",
"admin": "timmy",
}
return user_map.get(username.lower(), username.lower())
def _issue_priority(issue: dict) -> str:
"""Derive priority from issue labels."""
labels = [l.get("name", "").lower() for l in issue.get("labels", [])]
if any("high" in l or "critical" in l or "urgent" in l for l in labels):
return "high"
if any("low" in l for l in labels):
return "low"
return "normal"
def _verify_signature(body: bytes, secret: str, signature: str) -> bool:
"""Verify Gitea webhook HMAC-SHA256 signature."""
if not signature:
return False
expected = hmac.new(
secret.encode(), body, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)