feat: dockerize OpenFang as vendored tool runtime sidecar (#96)

This commit is contained in:
Alexander Whitestone
2026-02-28 19:27:48 -05:00
committed by GitHub
parent d7d7a5a80a
commit b7c89d1101
18 changed files with 1383 additions and 23 deletions

View File

@@ -91,6 +91,35 @@ services:
condition: service_healthy
restart: unless-stopped
# ── OpenFang — vendored agent runtime sidecar ────────────────────────────────
# Rust binary providing real tool execution (browser, OSINT, forecasting).
# Timmy's coordinator delegates hand execution here via REST API.
openfang:
build:
context: .
dockerfile: docker/Dockerfile.openfang
image: timmy-openfang:latest
container_name: timmy-openfang
profiles:
- openfang
environment:
OLLAMA_URL: "http://ollama:11434"
OPENFANG_DATA_DIR: "/app/data"
volumes:
- openfang-data:/app/data
networks:
- timmy-net
depends_on:
ollama:
condition: service_healthy
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 5s
retries: 3
start_period: 15s
# ── Swarm Worker Pool (Template) ──────────────────────────────────────────
# Scale: docker compose -f docker-compose.microservices.yml up --scale worker=4
worker:
@@ -128,6 +157,8 @@ volumes:
device: "${PWD}/data"
ollama-data:
driver: local
openfang-data:
driver: local
# ── Network ───────────────────────────────────────────────────────────────────
networks:

View File

@@ -92,6 +92,34 @@ services:
condition: service_healthy
restart: unless-stopped
# ── OpenFang — vendored agent runtime sidecar ────────────────────────────────
# Rust binary providing real tool execution (browser, OSINT, forecasting).
# Timmy's coordinator delegates hand execution here via REST API.
openfang:
build:
context: .
dockerfile: docker/Dockerfile.openfang
image: timmy-openfang:latest
container_name: timmy-openfang
profiles:
- openfang
environment:
OLLAMA_URL: "${OLLAMA_URL:-http://host.docker.internal:11434}"
OPENFANG_DATA_DIR: "/app/data"
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- openfang-data:/app/data
networks:
- swarm-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 5s
retries: 3
start_period: 15s
# ── Agent worker template ───────────────────────────────────────────────────
# Scale horizontally: docker compose up --scale agent=4 --profile agents
# Each container gets a unique AGENT_ID via the replica index.
@@ -129,6 +157,8 @@ volumes:
type: none
o: bind
device: "${PWD}/data"
openfang-data:
driver: local
# ── Internal network ──────────────────────────────────────────────────────────
networks:

View File

@@ -0,0 +1,71 @@
# ── OpenFang — vendored binary sidecar ──────────────────────────────────────
#
# Downloads the OpenFang Agent OS binary from GitHub releases and runs it
# as a sidecar service. Timmy's coordinator delegates tool execution here.
#
# OpenFang exposes an OpenAI-compatible REST API that Timmy hits via the
# infrastructure/openfang client bridge.
#
# Build: docker build -f docker/Dockerfile.openfang -t timmy-openfang:latest .
# Run: docker run -p 8080:8080 timmy-openfang:latest
FROM debian:bookworm-slim AS downloader
RUN apt-get update && apt-get install -y --no-install-recommends \
curl ca-certificates \
&& rm -rf /var/lib/apt/lists/*
ARG OPENFANG_VERSION=latest
ARG TARGETARCH=amd64
# Download the binary from GitHub releases.
# The release asset is expected to be a single Linux binary.
RUN mkdir -p /opt/openfang && \
if [ "$OPENFANG_VERSION" = "latest" ]; then \
DOWNLOAD_URL=$(curl -sL \
-H "Accept: application/vnd.github+json" \
"https://api.github.com/repos/RightNow-AI/openfang/releases/latest" \
| grep "browser_download_url.*linux.*${TARGETARCH}" \
| head -1 \
| cut -d '"' -f 4); \
else \
DOWNLOAD_URL="https://github.com/RightNow-AI/openfang/releases/download/${OPENFANG_VERSION}/openfang-linux-${TARGETARCH}"; \
fi && \
echo "Downloading OpenFang from: ${DOWNLOAD_URL}" && \
curl -fSL "${DOWNLOAD_URL:-https://github.com/RightNow-AI/openfang/releases/latest/download/openfang-linux-${TARGETARCH}}" \
-o /opt/openfang/openfang && \
chmod +x /opt/openfang/openfang
# ── Runtime ─────────────────────────────────────────────────────────────────
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
# Non-root user
RUN groupadd -r openfang && useradd -r -g openfang -d /app -s /sbin/nologin openfang
WORKDIR /app
COPY --from=downloader /opt/openfang/openfang /usr/local/bin/openfang
# Data directory for OpenFang's SQLite state
RUN mkdir -p /app/data && chown -R openfang:openfang /app
USER openfang
# OpenFang listens on 8080 by default
EXPOSE 8080
# ── Healthcheck ─────────────────────────────────────────────────────────────
HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
# ── Entrypoint ──────────────────────────────────────────────────────────────
# OpenFang is configured entirely via env vars. Key ones:
# OPENFANG_HOST listen address (default 0.0.0.0)
# OPENFANG_PORT listen port (default 8080)
# OPENFANG_DATA_DIR state directory (default /app/data)
# OLLAMA_URL point to the shared Ollama instance
CMD ["openfang", "serve", "--host", "0.0.0.0", "--port", "8080"]

42
poetry.lock generated
View File

@@ -979,6 +979,22 @@ dev = ["pre-commit (>=2.16.0) ; python_version >= \"3.9\"", "pydoctor (>=25.4.0)
docs = ["pydoctor (>=25.4.0)"]
test = ["pytest"]
[[package]]
name = "execnet"
version = "2.1.2"
description = "execnet: rapid multi-Python deployment"
optional = false
python-versions = ">=3.8"
groups = ["main", "dev"]
files = [
{file = "execnet-2.1.2-py3-none-any.whl", hash = "sha256:67fba928dd5a544b783f6056f449e5e3931a5c378b128bc18501f7ea79e296ec"},
{file = "execnet-2.1.2.tar.gz", hash = "sha256:63d83bfdd9a23e35b9c6a3261412324f964c2ec8dcd8d3c6916ee9373e0befcd"},
]
markers = {main = "extra == \"dev\""}
[package.extras]
testing = ["hatch", "pre-commit", "pytest", "tox"]
[[package]]
name = "fastapi"
version = "0.134.0"
@@ -6726,6 +6742,28 @@ markers = {main = "extra == \"dev\""}
[package.dependencies]
pytest = ">=7.0.0"
[[package]]
name = "pytest-xdist"
version = "3.8.0"
description = "pytest xdist plugin for distributed testing, most importantly across multiple CPUs"
optional = false
python-versions = ">=3.9"
groups = ["main", "dev"]
files = [
{file = "pytest_xdist-3.8.0-py3-none-any.whl", hash = "sha256:202ca578cfeb7370784a8c33d6d05bc6e13b4f25b5053c30a152269fd10f0b88"},
{file = "pytest_xdist-3.8.0.tar.gz", hash = "sha256:7e578125ec9bc6050861aa93f2d59f1d8d085595d6551c2c90b6f4fad8d3a9f1"},
]
markers = {main = "extra == \"dev\""}
[package.dependencies]
execnet = ">=2.1"
pytest = ">=7.0.0"
[package.extras]
psutil = ["psutil (>=3.0)"]
setproctitle = ["setproctitle"]
testing = ["filelock"]
[[package]]
name = "python-dotenv"
version = "1.2.1"
@@ -8259,7 +8297,7 @@ propcache = ">=0.2.1"
[extras]
bigbrain = ["airllm"]
dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-randomly", "pytest-timeout", "selenium"]
dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-randomly", "pytest-timeout", "pytest-xdist", "selenium"]
discord = ["discord.py"]
swarm = ["redis"]
telegram = ["python-telegram-bot"]
@@ -8268,4 +8306,4 @@ voice = ["pyttsx3"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.11,<4"
content-hash = "8e608d71fafb99eda990a90f7879127522ec03fcd2bd34b115d2b4fde4c0fe87"
content-hash = "320a1a658bc130fe0164940e45971180ca28a7c073930c40c40fbf41b90bc3f2"

View File

@@ -55,6 +55,7 @@ pytest-cov = { version = ">=5.0.0", optional = true }
pytest-timeout = { version = ">=2.3.0", optional = true }
selenium = { version = ">=4.20.0", optional = true }
pytest-randomly = { version = ">=3.16.0", optional = true }
pytest-xdist = { version = ">=3.5.0", optional = true }
[tool.poetry.extras]
swarm = ["redis"]
@@ -62,7 +63,7 @@ telegram = ["python-telegram-bot"]
discord = ["discord.py"]
bigbrain = ["airllm"]
voice = ["pyttsx3"]
dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-timeout", "pytest-randomly", "selenium"]
dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-timeout", "pytest-randomly", "pytest-xdist", "selenium"]
[tool.poetry.group.dev.dependencies]
pytest = ">=8.0.0"
@@ -71,6 +72,7 @@ pytest-cov = ">=5.0.0"
pytest-timeout = ">=2.3.0"
selenium = ">=4.20.0"
pytest-randomly = "^4.0.1"
pytest-xdist = "^3.8.0"
[tool.poetry.scripts]
timmy = "timmy.cli:main"

View File

@@ -22,11 +22,14 @@ markers =
skip_ci: Skip in CI environment (local development only)
# Output and reporting
addopts =
# -n auto: run tests in parallel across all CPU cores (pytest-xdist)
# Override with -n0 to disable parallelism for debugging
addopts =
-v
--tb=short
--strict-markers
--disable-warnings
-n auto
# Coverage configuration
[coverage:run]

View File

@@ -147,6 +147,16 @@ class Settings(BaseSettings):
thinking_enabled: bool = True
thinking_interval_seconds: int = 300 # 5 minutes between thoughts
# ── OpenFang — vendored agent runtime ─────────────────────────────
# URL where the OpenFang sidecar listens. Set to the Docker service
# name when running in compose, or localhost for bare-metal dev.
openfang_url: str = "http://localhost:8080"
# Enable/disable OpenFang integration. When disabled, the tool
# executor falls back to Timmy's native (simulated) execution.
openfang_enabled: bool = False
# Timeout in seconds for OpenFang hand execution (some hands are slow).
openfang_timeout: int = 120
# ── Error Logging ─────────────────────────────────────────────────
error_log_enabled: bool = True
error_log_dir: str = "logs"

View File

@@ -424,6 +424,16 @@ async def lifespan(app: FastAPI):
# Bootstrap MCP tools in background
mcp_task = asyncio.create_task(_bootstrap_mcp_background())
# Register OpenFang vendor tools (if enabled)
if settings.openfang_enabled:
try:
from infrastructure.openfang.tools import register_openfang_tools
count = register_openfang_tools()
logger.info("OpenFang: registered %d vendor tools", count)
except Exception as exc:
logger.warning("OpenFang tool registration failed: %s", exc)
# Initialize Spark Intelligence engine
from spark.engine import spark_engine
if spark_engine.enabled:

View File

@@ -0,0 +1,18 @@
"""OpenFang — vendored binary sidecar for agent tool execution.
OpenFang is a Rust-compiled Agent OS that provides real tool execution
(browser automation, OSINT, forecasting, social management) in a
WASM-sandboxed runtime. Timmy's coordinator dispatches to it as a
tool vendor rather than a co-orchestrator.
Usage:
from infrastructure.openfang import openfang_client
# Check if OpenFang is available
if openfang_client.healthy:
result = await openfang_client.execute_hand("browser", params)
"""
from infrastructure.openfang.client import OpenFangClient, openfang_client
__all__ = ["OpenFangClient", "openfang_client"]

View File

@@ -0,0 +1,216 @@
"""OpenFang HTTP client — bridge between Timmy coordinator and OpenFang runtime.
Follows project conventions:
- Graceful degradation (log error, return fallback, never crash)
- Config via ``from config import settings``
- Singleton pattern for module-level import
The client wraps OpenFang's REST API and exposes its Hands
(Browser, Collector, Predictor, Lead, Twitter, Researcher, Clip)
as callable tool endpoints.
"""
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Optional
from config import settings
logger = logging.getLogger(__name__)
# Hand names that OpenFang ships out of the box
OPENFANG_HANDS = (
"browser",
"collector",
"predictor",
"lead",
"twitter",
"researcher",
"clip",
)
@dataclass
class HandResult:
"""Result from an OpenFang Hand execution."""
hand: str
success: bool
output: str = ""
error: str = ""
latency_ms: float = 0.0
metadata: dict = field(default_factory=dict)
class OpenFangClient:
"""HTTP client for the OpenFang sidecar.
All methods degrade gracefully — if OpenFang is down the client
returns a ``HandResult(success=False)`` rather than raising.
"""
def __init__(self, base_url: Optional[str] = None, timeout: int = 60) -> None:
self._base_url = (base_url or settings.openfang_url).rstrip("/")
self._timeout = timeout
self._healthy = False
self._last_health_check: float = 0.0
self._health_cache_ttl = 30.0 # seconds
logger.info("OpenFangClient initialised → %s", self._base_url)
# ── Health ───────────────────────────────────────────────────────────────
@property
def healthy(self) -> bool:
"""Cached health check — hits /health at most once per TTL."""
now = time.time()
if now - self._last_health_check > self._health_cache_ttl:
self._healthy = self._check_health()
self._last_health_check = now
return self._healthy
def _check_health(self) -> bool:
try:
import urllib.request
req = urllib.request.Request(
f"{self._base_url}/health",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status == 200
except Exception as exc:
logger.debug("OpenFang health check failed: %s", exc)
return False
# ── Hand execution ───────────────────────────────────────────────────────
async def execute_hand(
self,
hand: str,
params: dict[str, Any],
timeout: Optional[int] = None,
) -> HandResult:
"""Execute an OpenFang Hand and return the result.
Args:
hand: Hand name (browser, collector, predictor, etc.)
params: Parameters for the hand (task-specific)
timeout: Override default timeout for long-running hands
Returns:
HandResult with output or error details.
"""
if hand not in OPENFANG_HANDS:
return HandResult(
hand=hand,
success=False,
error=f"Unknown hand: {hand}. Available: {', '.join(OPENFANG_HANDS)}",
)
start = time.time()
try:
import json
import urllib.request
payload = json.dumps({"hand": hand, "params": params}).encode()
req = urllib.request.Request(
f"{self._base_url}/api/v1/hands/{hand}/execute",
data=payload,
method="POST",
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
effective_timeout = timeout or self._timeout
with urllib.request.urlopen(req, timeout=effective_timeout) as resp:
body = json.loads(resp.read().decode())
latency = (time.time() - start) * 1000
return HandResult(
hand=hand,
success=body.get("success", True),
output=body.get("output", body.get("result", "")),
latency_ms=latency,
metadata=body.get("metadata", {}),
)
except Exception as exc:
latency = (time.time() - start) * 1000
logger.warning(
"OpenFang hand '%s' failed (%.0fms): %s",
hand,
latency,
exc,
)
return HandResult(
hand=hand,
success=False,
error=str(exc),
latency_ms=latency,
)
# ── Convenience wrappers for common hands ────────────────────────────────
async def browse(self, url: str, instruction: str = "") -> HandResult:
"""Web automation via OpenFang's Browser hand."""
return await self.execute_hand(
"browser", {"url": url, "instruction": instruction}
)
async def collect(self, target: str, depth: str = "shallow") -> HandResult:
"""OSINT collection via OpenFang's Collector hand."""
return await self.execute_hand(
"collector", {"target": target, "depth": depth}
)
async def predict(self, question: str, horizon: str = "1w") -> HandResult:
"""Superforecasting via OpenFang's Predictor hand."""
return await self.execute_hand(
"predictor", {"question": question, "horizon": horizon}
)
async def find_leads(self, icp: str, max_results: int = 10) -> HandResult:
"""Prospect discovery via OpenFang's Lead hand."""
return await self.execute_hand(
"lead", {"icp": icp, "max_results": max_results}
)
async def research(self, topic: str, depth: str = "standard") -> HandResult:
"""Deep research via OpenFang's Researcher hand."""
return await self.execute_hand(
"researcher", {"topic": topic, "depth": depth}
)
# ── Inventory ────────────────────────────────────────────────────────────
async def list_hands(self) -> list[dict]:
"""Query OpenFang for its available hands and their status."""
try:
import json
import urllib.request
req = urllib.request.Request(
f"{self._base_url}/api/v1/hands",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
except Exception as exc:
logger.debug("Failed to list OpenFang hands: %s", exc)
return []
def status(self) -> dict:
"""Return a status summary for the dashboard."""
return {
"url": self._base_url,
"healthy": self.healthy,
"available_hands": list(OPENFANG_HANDS),
}
# ── Module-level singleton ──────────────────────────────────────────────────
openfang_client = OpenFangClient()

View File

@@ -0,0 +1,223 @@
"""Register OpenFang Hands as MCP tools in Timmy's tool registry.
Each OpenFang Hand becomes a callable MCP tool that personas can use
during task execution. The mapping ensures the right personas get
access to the right hands:
Mace (Security) → collector (OSINT), browser
Seer (Analytics) → predictor, researcher
Echo (Research) → researcher, browser, collector
Helm (DevOps) → browser
Lead hand → available to all personas via direct request
Call ``register_openfang_tools()`` during app startup (after config
is loaded) to populate the tool registry.
"""
import logging
from typing import Any
from infrastructure.openfang.client import OPENFANG_HANDS, openfang_client
from mcp.schemas.base import create_tool_schema
logger = logging.getLogger(__name__)
# ── Tool schemas ─────────────────────────────────────────────────────────────
_HAND_SCHEMAS: dict[str, dict] = {
"browser": create_tool_schema(
name="openfang_browser",
description=(
"Web automation via OpenFang's Browser hand. "
"Navigates URLs, extracts content, fills forms. "
"Includes mandatory purchase confirmation gates."
),
parameters={
"url": {"type": "string", "description": "URL to navigate to"},
"instruction": {
"type": "string",
"description": "What to do on the page",
},
},
required=["url"],
),
"collector": create_tool_schema(
name="openfang_collector",
description=(
"OSINT intelligence and continuous monitoring via OpenFang's "
"Collector hand. Gathers public information on targets."
),
parameters={
"target": {
"type": "string",
"description": "Target to investigate (domain, org, person)",
},
"depth": {
"type": "string",
"description": "Collection depth: shallow | standard | deep",
"default": "shallow",
},
},
required=["target"],
),
"predictor": create_tool_schema(
name="openfang_predictor",
description=(
"Superforecasting with calibrated reasoning via OpenFang's "
"Predictor hand. Produces probability estimates with reasoning."
),
parameters={
"question": {
"type": "string",
"description": "Forecasting question to evaluate",
},
"horizon": {
"type": "string",
"description": "Time horizon: 1d | 1w | 1m | 3m | 1y",
"default": "1w",
},
},
required=["question"],
),
"lead": create_tool_schema(
name="openfang_lead",
description=(
"Prospect discovery and ICP-based qualification via OpenFang's "
"Lead hand. Finds and scores potential leads."
),
parameters={
"icp": {
"type": "string",
"description": "Ideal Customer Profile description",
},
"max_results": {
"type": "integer",
"description": "Maximum leads to return",
"default": 10,
},
},
required=["icp"],
),
"twitter": create_tool_schema(
name="openfang_twitter",
description=(
"Social account management via OpenFang's Twitter hand. "
"Includes approval gates for sensitive actions."
),
parameters={
"action": {
"type": "string",
"description": "Action: post | reply | search | analyze",
},
"content": {
"type": "string",
"description": "Content for the action",
},
},
required=["action", "content"],
),
"researcher": create_tool_schema(
name="openfang_researcher",
description=(
"Deep autonomous research with source verification via "
"OpenFang's Researcher hand. Produces cited reports."
),
parameters={
"topic": {
"type": "string",
"description": "Research topic or question",
},
"depth": {
"type": "string",
"description": "Research depth: quick | standard | deep",
"default": "standard",
},
},
required=["topic"],
),
"clip": create_tool_schema(
name="openfang_clip",
description=(
"Video processing and social media publishing via OpenFang's "
"Clip hand. Edits, captions, and publishes video content."
),
parameters={
"source": {
"type": "string",
"description": "Source video path or URL",
},
"instruction": {
"type": "string",
"description": "What to do with the video",
},
},
required=["source"],
),
}
# Map personas to the OpenFang hands they should have access to
PERSONA_HAND_MAP: dict[str, list[str]] = {
"echo": ["researcher", "browser", "collector"],
"seer": ["predictor", "researcher"],
"mace": ["collector", "browser"],
"helm": ["browser"],
"forge": ["browser", "researcher"],
"quill": ["researcher"],
"pixel": ["clip", "browser"],
"lyra": [],
"reel": ["clip"],
}
def _make_hand_handler(hand_name: str):
"""Create an async handler that delegates to the OpenFang client."""
async def handler(**kwargs: Any) -> str:
result = await openfang_client.execute_hand(hand_name, kwargs)
if result.success:
return result.output
return f"[OpenFang {hand_name} error] {result.error}"
handler.__name__ = f"openfang_{hand_name}"
handler.__doc__ = _HAND_SCHEMAS.get(hand_name, {}).get(
"description", f"OpenFang {hand_name} hand"
)
return handler
def register_openfang_tools() -> int:
"""Register all OpenFang Hands as MCP tools.
Returns the number of tools registered.
"""
from mcp.registry import tool_registry
count = 0
for hand_name in OPENFANG_HANDS:
schema = _HAND_SCHEMAS.get(hand_name)
if not schema:
logger.warning("No schema for OpenFang hand: %s", hand_name)
continue
tool_name = f"openfang_{hand_name}"
handler = _make_hand_handler(hand_name)
tool_registry.register(
name=tool_name,
schema=schema,
handler=handler,
category="openfang",
tags=["openfang", hand_name, "vendor"],
source_module="infrastructure.openfang.tools",
requires_confirmation=(hand_name in ("twitter",)),
)
count += 1
logger.info("Registered %d OpenFang tools in MCP registry", count)
return count
def get_hands_for_persona(persona_id: str) -> list[str]:
"""Return the OpenFang tool names available to a persona."""
hand_names = PERSONA_HAND_MAP.get(persona_id, [])
return [f"openfang_{h}" for h in hand_names]

View File

@@ -8,10 +8,12 @@ Usage:
result = executor.execute_task("Write a function to calculate fibonacci")
"""
import asyncio
import logging
from typing import Any, Optional
from pathlib import Path
from config import settings
from timmy.tools import get_tools_for_persona, create_full_toolkit
from timmy.agent import create_timmy
@@ -274,6 +276,70 @@ Response:"""
]
# ── OpenFang delegation ──────────────────────────────────────────────────────
# These module-level functions allow the ToolExecutor (and other callers)
# to delegate task execution to the OpenFang sidecar when available.
# Keywords that map task descriptions to OpenFang hands.
_OPENFANG_HAND_KEYWORDS: dict[str, list[str]] = {
"browser": ["browse", "navigate", "webpage", "website", "url", "scrape", "crawl"],
"collector": ["osint", "collect", "intelligence", "monitor", "surveillance", "recon"],
"predictor": ["predict", "forecast", "probability", "calibrat"],
"lead": ["lead", "prospect", "icp", "qualify", "outbound"],
"twitter": ["tweet", "twitter", "social media post"],
"researcher": ["research", "investigate", "deep dive", "literature", "survey"],
"clip": ["video clip", "video process", "caption video", "publish video"],
}
def _match_openfang_hand(task_description: str) -> Optional[str]:
"""Match a task description to an OpenFang hand name.
Returns the hand name (e.g. "browser") or None if no match.
"""
desc_lower = task_description.lower()
for hand, keywords in _OPENFANG_HAND_KEYWORDS.items():
if any(kw in desc_lower for kw in keywords):
return hand
return None
async def try_openfang_execution(task_description: str) -> Optional[dict[str, Any]]:
"""Try to execute a task via OpenFang.
Returns a result dict if OpenFang handled it, or None if the caller
should fall back to native execution. Never raises.
"""
if not settings.openfang_enabled:
return None
try:
from infrastructure.openfang.client import openfang_client
except ImportError:
logger.debug("OpenFang client not available")
return None
if not openfang_client.healthy:
logger.debug("OpenFang is not healthy, falling back to native execution")
return None
hand = _match_openfang_hand(task_description)
if hand is None:
return None
result = await openfang_client.execute_hand(hand, {"task": task_description})
if result.success:
return {
"success": True,
"result": result.output,
"tools_used": [f"openfang_{hand}"],
"runtime": "openfang",
}
logger.warning("OpenFang hand %s failed: %s — falling back", hand, result.error)
return None
class DirectToolExecutor(ToolExecutor):
"""Tool executor that actually calls tools directly.

View File

@@ -84,29 +84,76 @@ def reset_coordinator_state():
@pytest.fixture(autouse=True)
def clean_database():
def clean_database(tmp_path):
"""Clean up database tables between tests for isolation.
Uses transaction rollback pattern: each test's changes are rolled back
to ensure perfect isolation between tests.
When running under pytest-xdist (parallel workers), each worker gets
its own tmp_path so DB files never collide. We redirect every
module-level DB_PATH to the per-test temp directory.
"""
# Pre-test: Clean database files for fresh start
db_paths = [
Path("data/swarm.db"),
Path("data/swarm.db-shm"),
Path("data/swarm.db-wal"),
tmp_swarm_db = tmp_path / "swarm.db"
tmp_spark_db = tmp_path / "spark.db"
tmp_self_coding_db = tmp_path / "self_coding.db"
# All modules that use DB_PATH = Path("data/swarm.db")
_swarm_db_modules = [
"swarm.tasks",
"swarm.registry",
"swarm.routing",
"swarm.learner",
"swarm.event_log",
"swarm.stats",
"swarm.work_orders.models",
"swarm.task_queue.models",
"self_coding.upgrades.models",
"lightning.ledger",
"timmy.memory.vector_store",
"infrastructure.models.registry",
]
for db_path in db_paths:
if db_path.exists():
try:
db_path.unlink()
except Exception:
pass
_spark_db_modules = [
"spark.memory",
"spark.eidos",
]
_self_coding_db_modules = [
"self_coding.modification_journal",
"self_coding.codebase_indexer",
]
originals = {}
for mod_name in _swarm_db_modules:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
attr = "DB_PATH"
originals[(mod_name, attr)] = getattr(mod, attr)
setattr(mod, attr, tmp_swarm_db)
except Exception:
pass
for mod_name in _spark_db_modules:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
originals[(mod_name, "DB_PATH")] = getattr(mod, "DB_PATH")
setattr(mod, "DB_PATH", tmp_spark_db)
except Exception:
pass
for mod_name in _self_coding_db_modules:
try:
mod = __import__(mod_name, fromlist=["DEFAULT_DB_PATH"])
originals[(mod_name, "DEFAULT_DB_PATH")] = getattr(mod, "DEFAULT_DB_PATH")
setattr(mod, "DEFAULT_DB_PATH", tmp_self_coding_db)
except Exception:
pass
yield
# Post-test cleanup is handled by the reset_coordinator_state fixture
# and file deletion above ensures each test starts fresh
# Restore originals so module-level state isn't permanently mutated
for (mod_name, attr), original in originals.items():
try:
mod = __import__(mod_name, fromlist=[attr])
setattr(mod, attr, original)
except Exception:
pass
@pytest.fixture(autouse=True)

View File

@@ -108,6 +108,7 @@ class TestDockerfiles:
subprocess.run(["which", "docker"], capture_output=True).returncode != 0,
reason="Docker not installed",
)
@pytest.mark.timeout(300)
def test_docker_image_build(self):
result = subprocess.run(
["docker", "build", "-t", "timmy-time:test", "."],

View File

@@ -0,0 +1,226 @@
"""Chunk 2: OpenFang HTTP client — test first, implement second.
Tests cover:
- Health check returns False when unreachable
- Health check TTL caching
- execute_hand() rejects unknown hands
- execute_hand() success with mocked HTTP
- execute_hand() graceful degradation on error
- Convenience wrappers call the correct hand
"""
import asyncio
import json
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Health checks
# ---------------------------------------------------------------------------
def test_health_check_false_when_unreachable():
"""Client should report unhealthy when OpenFang is not running."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
assert client._check_health() is False
def test_health_check_caching():
"""Repeated .healthy calls within TTL should not re-check."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
client._health_cache_ttl = 9999 # very long TTL
# Force a first check (will be False)
_ = client.healthy
assert client._healthy is False
# Manually flip the cached value — next access should use cache
client._healthy = True
assert client.healthy is True # still cached, no re-check
# ---------------------------------------------------------------------------
# execute_hand — unknown hand
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_hand_unknown_hand():
"""Requesting an unknown hand returns success=False immediately."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
result = await client.execute_hand("nonexistent_hand", {})
assert result.success is False
assert "Unknown hand" in result.error
# ---------------------------------------------------------------------------
# execute_hand — success path (mocked HTTP)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_hand_success_mocked():
"""When OpenFang returns 200 with output, HandResult.success is True."""
from infrastructure.openfang.client import OpenFangClient
response_body = json.dumps({
"success": True,
"output": "Page loaded successfully",
"metadata": {"url": "https://example.com"},
}).encode()
mock_resp = MagicMock()
mock_resp.status = 200
mock_resp.read.return_value = response_body
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("urllib.request.urlopen", return_value=mock_resp):
client = OpenFangClient(base_url="http://localhost:8080")
result = await client.execute_hand("browser", {"url": "https://example.com"})
assert result.success is True
assert result.output == "Page loaded successfully"
assert result.hand == "browser"
assert result.latency_ms > 0
# ---------------------------------------------------------------------------
# execute_hand — graceful degradation on connection error
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_hand_connection_error():
"""When OpenFang is unreachable, HandResult.success is False (no crash)."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
result = await client.execute_hand("browser", {"url": "https://example.com"})
assert result.success is False
assert result.error # non-empty error message
assert result.hand == "browser"
# ---------------------------------------------------------------------------
# Convenience wrappers
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_browse_calls_browser_hand():
"""browse() should delegate to execute_hand('browser', ...)."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
calls = []
original = client.execute_hand
async def spy(hand, params, **kw):
calls.append((hand, params))
return await original(hand, params, **kw)
client.execute_hand = spy
await client.browse("https://example.com", "click button")
assert len(calls) == 1
assert calls[0][0] == "browser"
assert calls[0][1]["url"] == "https://example.com"
@pytest.mark.asyncio
async def test_collect_calls_collector_hand():
"""collect() should delegate to execute_hand('collector', ...)."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
calls = []
original = client.execute_hand
async def spy(hand, params, **kw):
calls.append((hand, params))
return await original(hand, params, **kw)
client.execute_hand = spy
await client.collect("example.com", depth="deep")
assert len(calls) == 1
assert calls[0][0] == "collector"
assert calls[0][1]["target"] == "example.com"
@pytest.mark.asyncio
async def test_predict_calls_predictor_hand():
"""predict() should delegate to execute_hand('predictor', ...)."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
calls = []
original = client.execute_hand
async def spy(hand, params, **kw):
calls.append((hand, params))
return await original(hand, params, **kw)
client.execute_hand = spy
await client.predict("Will BTC hit 100k?", horizon="1m")
assert len(calls) == 1
assert calls[0][0] == "predictor"
assert calls[0][1]["question"] == "Will BTC hit 100k?"
# ---------------------------------------------------------------------------
# HandResult dataclass
# ---------------------------------------------------------------------------
def test_hand_result_defaults():
"""HandResult should have sensible defaults."""
from infrastructure.openfang.client import HandResult
r = HandResult(hand="browser", success=True)
assert r.output == ""
assert r.error == ""
assert r.latency_ms == 0.0
assert r.metadata == {}
# ---------------------------------------------------------------------------
# OPENFANG_HANDS constant
# ---------------------------------------------------------------------------
def test_openfang_hands_tuple():
"""The OPENFANG_HANDS constant should list all 7 hands."""
from infrastructure.openfang.client import OPENFANG_HANDS
assert len(OPENFANG_HANDS) == 7
assert "browser" in OPENFANG_HANDS
assert "collector" in OPENFANG_HANDS
assert "predictor" in OPENFANG_HANDS
assert "lead" in OPENFANG_HANDS
assert "twitter" in OPENFANG_HANDS
assert "researcher" in OPENFANG_HANDS
assert "clip" in OPENFANG_HANDS
# ---------------------------------------------------------------------------
# status() summary
# ---------------------------------------------------------------------------
def test_status_returns_summary():
"""status() should return a dict with url, healthy flag, and hands list."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
s = client.status()
assert "url" in s
assert "healthy" in s
assert "available_hands" in s
assert len(s["available_hands"]) == 7

View File

@@ -0,0 +1,25 @@
"""Chunk 1: OpenFang config settings — test first, implement second."""
def test_openfang_url_default():
"""Settings should expose openfang_url with a sensible default."""
from config import settings
assert hasattr(settings, "openfang_url")
assert settings.openfang_url == "http://localhost:8080"
def test_openfang_enabled_default_false():
"""OpenFang integration should be opt-in (disabled by default)."""
from config import settings
assert hasattr(settings, "openfang_enabled")
assert settings.openfang_enabled is False
def test_openfang_timeout_default():
"""Timeout should be generous (some hands are slow)."""
from config import settings
assert hasattr(settings, "openfang_timeout")
assert settings.openfang_timeout == 120

View File

@@ -0,0 +1,120 @@
"""Chunk 4: ToolExecutor OpenFang delegation — test first, implement second.
Tests cover:
- When openfang_enabled=True and client healthy → delegates to OpenFang
- When openfang_enabled=False → falls back to existing behavior
- When OpenFang is down → falls back gracefully
- Hand matching from task descriptions
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Hand matching (pure function, no mocking needed)
# ---------------------------------------------------------------------------
def test_match_hand_from_description():
"""_match_openfang_hand should detect relevant hand from task text."""
from swarm.tool_executor import _match_openfang_hand
assert _match_openfang_hand("browse https://example.com") == "browser"
assert _match_openfang_hand("navigate to the website") == "browser"
assert _match_openfang_hand("collect OSINT on target.com") == "collector"
assert _match_openfang_hand("predict whether Bitcoin hits 100k") == "predictor"
assert _match_openfang_hand("forecast the election outcome") == "predictor"
assert _match_openfang_hand("find leads matching our ICP") == "lead"
assert _match_openfang_hand("prospect discovery for SaaS") == "lead"
assert _match_openfang_hand("research quantum computing") == "researcher"
assert _match_openfang_hand("investigate the supply chain") == "researcher"
assert _match_openfang_hand("post a tweet about our launch") == "twitter"
assert _match_openfang_hand("process this video clip") == "clip"
def test_match_hand_returns_none_for_unmatched():
"""Tasks with no OpenFang-relevant keywords return None."""
from swarm.tool_executor import _match_openfang_hand
assert _match_openfang_hand("write a Python function") is None
assert _match_openfang_hand("fix the database migration") is None
# ---------------------------------------------------------------------------
# Delegation when enabled + healthy
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_openfang_delegation_when_enabled():
"""When openfang is enabled and healthy, try_openfang_execution delegates."""
from infrastructure.openfang.client import HandResult
mock_result = HandResult(
hand="browser",
success=True,
output="OpenFang executed the task",
)
mock_client = MagicMock()
mock_client.healthy = True
mock_client.execute_hand = AsyncMock(return_value=mock_result)
with patch("swarm.tool_executor.settings") as mock_settings, \
patch("infrastructure.openfang.client.openfang_client", mock_client), \
patch.dict("sys.modules", {}): # force re-import
mock_settings.openfang_enabled = True
# Re-import to pick up patches
from swarm.tool_executor import try_openfang_execution
# Patch the lazy import inside try_openfang_execution
with patch(
"infrastructure.openfang.client.openfang_client", mock_client
):
result = await try_openfang_execution(
"browse https://example.com and extract headlines"
)
assert result is not None
assert result["success"] is True
assert "OpenFang" in result["result"]
# ---------------------------------------------------------------------------
# Fallback when disabled
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_openfang_returns_none_when_disabled():
"""When openfang is disabled, try_openfang_execution returns None."""
with patch("swarm.tool_executor.settings") as mock_settings:
mock_settings.openfang_enabled = False
from swarm.tool_executor import try_openfang_execution
result = await try_openfang_execution("browse something")
assert result is None
# ---------------------------------------------------------------------------
# Fallback when down
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_openfang_returns_none_when_down():
"""When openfang is enabled but unhealthy, returns None (fallback)."""
mock_client = MagicMock()
mock_client.healthy = False
with patch("swarm.tool_executor.settings") as mock_settings, \
patch(
"infrastructure.openfang.client.openfang_client", mock_client
):
mock_settings.openfang_enabled = True
from swarm.tool_executor import try_openfang_execution
result = await try_openfang_execution("browse something")
assert result is None

View File

@@ -0,0 +1,223 @@
"""Chunk 3: OpenFang MCP tool registration — test first, implement second.
Tests cover:
- register_openfang_tools() registers all 7 hands
- Each tool has correct category, tags, and schema
- Twitter hand requires confirmation
- Persona-hand mapping is correct
- Handler delegates to openfang_client.execute_hand()
"""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@pytest.fixture(autouse=True)
def clean_tool_registry():
"""Remove OpenFang tools between tests so registration is idempotent."""
yield
from mcp.registry import tool_registry
for name in list(tool_registry._tools.keys()):
if name.startswith("openfang_"):
tool_registry.unregister(name)
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
def test_register_openfang_tools_count():
"""register_openfang_tools() should register exactly 7 tools."""
from infrastructure.openfang.tools import register_openfang_tools
count = register_openfang_tools()
assert count == 7
def test_all_seven_hands_registered():
"""After registration, all 7 openfang_* tools exist in the registry."""
from infrastructure.openfang.tools import register_openfang_tools
from mcp.registry import tool_registry
register_openfang_tools()
expected = {
"openfang_browser",
"openfang_collector",
"openfang_predictor",
"openfang_lead",
"openfang_twitter",
"openfang_researcher",
"openfang_clip",
}
registered = set(tool_registry.list_tools(category="openfang"))
assert registered == expected
def test_tools_have_correct_category():
"""Every OpenFang tool should be in the 'openfang' category."""
from infrastructure.openfang.tools import register_openfang_tools
from mcp.registry import tool_registry
register_openfang_tools()
for name in tool_registry.list_tools(category="openfang"):
record = tool_registry.get(name)
assert record is not None
assert record.category == "openfang"
def test_tools_have_vendor_tag():
"""Every OpenFang tool should be tagged with 'vendor'."""
from infrastructure.openfang.tools import register_openfang_tools
from mcp.registry import tool_registry
register_openfang_tools()
for name in tool_registry.list_tools(category="openfang"):
record = tool_registry.get(name)
assert "vendor" in record.tags
assert "openfang" in record.tags
def test_twitter_requires_confirmation():
"""The twitter hand should require user confirmation before execution."""
from infrastructure.openfang.tools import register_openfang_tools
from mcp.registry import tool_registry
register_openfang_tools()
twitter = tool_registry.get("openfang_twitter")
assert twitter is not None
assert twitter.requires_confirmation is True
def test_non_twitter_no_confirmation():
"""Non-twitter hands should NOT require confirmation."""
from infrastructure.openfang.tools import register_openfang_tools
from mcp.registry import tool_registry
register_openfang_tools()
for name in ["openfang_browser", "openfang_collector", "openfang_predictor"]:
record = tool_registry.get(name)
assert record is not None
assert record.requires_confirmation is False
def test_tools_have_schemas():
"""Every OpenFang tool should have a non-empty schema with 'name' and 'parameters'."""
from infrastructure.openfang.tools import register_openfang_tools
from mcp.registry import tool_registry
register_openfang_tools()
for name in tool_registry.list_tools(category="openfang"):
record = tool_registry.get(name)
assert record.schema
assert "name" in record.schema
assert "parameters" in record.schema
# ---------------------------------------------------------------------------
# Persona-hand mapping
# ---------------------------------------------------------------------------
def test_persona_hand_map_mace():
"""Mace (Security) should have collector and browser."""
from infrastructure.openfang.tools import get_hands_for_persona
hands = get_hands_for_persona("mace")
assert "openfang_collector" in hands
assert "openfang_browser" in hands
def test_persona_hand_map_seer():
"""Seer (Analytics) should have predictor and researcher."""
from infrastructure.openfang.tools import get_hands_for_persona
hands = get_hands_for_persona("seer")
assert "openfang_predictor" in hands
assert "openfang_researcher" in hands
def test_persona_hand_map_echo():
"""Echo (Research) should have researcher, browser, and collector."""
from infrastructure.openfang.tools import get_hands_for_persona
hands = get_hands_for_persona("echo")
assert "openfang_researcher" in hands
assert "openfang_browser" in hands
assert "openfang_collector" in hands
def test_persona_hand_map_unknown():
"""Unknown persona should get empty list."""
from infrastructure.openfang.tools import get_hands_for_persona
hands = get_hands_for_persona("nonexistent")
assert hands == []
# ---------------------------------------------------------------------------
# Handler delegation
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_handler_delegates_to_client():
"""Tool handler should call openfang_client.execute_hand()."""
from infrastructure.openfang.client import HandResult
from infrastructure.openfang.tools import register_openfang_tools
from mcp.registry import tool_registry
register_openfang_tools()
mock_result = HandResult(
hand="browser",
success=True,
output="Page loaded",
)
with patch(
"infrastructure.openfang.tools.openfang_client"
) as mock_client:
mock_client.execute_hand = AsyncMock(return_value=mock_result)
record = tool_registry.get("openfang_browser")
assert record is not None
output = await record.handler(url="https://example.com")
assert output == "Page loaded"
mock_client.execute_hand.assert_called_once_with(
"browser", {"url": "https://example.com"}
)
@pytest.mark.asyncio
async def test_handler_returns_error_on_failure():
"""On failure, handler should return the error string (not raise)."""
from infrastructure.openfang.client import HandResult
from infrastructure.openfang.tools import register_openfang_tools
from mcp.registry import tool_registry
register_openfang_tools()
mock_result = HandResult(
hand="collector",
success=False,
error="Connection refused",
)
with patch(
"infrastructure.openfang.tools.openfang_client"
) as mock_client:
mock_client.execute_hand = AsyncMock(return_value=mock_result)
record = tool_registry.get("openfang_collector")
output = await record.handler(target="example.com")
assert "error" in output.lower()
assert "Connection refused" in output