feat: autonomous self-modifying agent with multi-backend LLM support

Adds SelfModifyLoop — an edit→validate→test→commit cycle that can read
its own failure reports, diagnose root causes, and restart autonomously.

Key capabilities:
- Multi-backend LLM: Anthropic Claude API, Ollama, or auto-detect
- Syntax validation via compile() before writing to disk
- Autonomous self-correction loop with configurable max cycles
- XML-based output format to avoid triple-quote delimiter conflicts
- Branch creation skipped by default to prevent container restarts
- CLI: self-modify run "instruction" --backend auto --autonomous
- 939 tests passing, 30 skipped

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Payne
2026-02-25 17:18:58 -05:00
parent 8958cf830a
commit 8fec9c41a5
11 changed files with 1499 additions and 15 deletions

View File

@@ -76,6 +76,7 @@ creative = [
timmy = "timmy.cli:main"
timmy-serve = "timmy_serve.cli:main"
self-tdd = "self_tdd.watchdog:main"
self-modify = "self_modify.cli:main"
[tool.hatch.build.targets.wheel]
sources = {"src" = ""}
@@ -97,6 +98,7 @@ include = [
"src/creative",
"src/agent_core",
"src/lightning",
"src/self_modify",
]
[tool.pytest.ini_options]

View File

@@ -76,6 +76,14 @@ class Settings(BaseSettings):
# In production, security settings are strictly enforced.
timmy_env: Literal["development", "production"] = "development"
# ── Self-Modification ──────────────────────────────────────────────
# Enable self-modification capabilities. When enabled, Timmy can
# edit its own source code, run tests, and commit changes.
self_modify_enabled: bool = False
self_modify_max_retries: int = 2
self_modify_allowed_dirs: str = "src,tests"
self_modify_backend: str = "auto" # "ollama", "anthropic", or "auto"
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",

View File

@@ -26,6 +26,7 @@ from dashboard.routes.tools import router as tools_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.creative import router as creative_router
from dashboard.routes.discord import router as discord_router
from dashboard.routes.self_modify import router as self_modify_router
logging.basicConfig(
level=logging.INFO,
@@ -154,6 +155,7 @@ app.include_router(tools_router)
app.include_router(spark_router)
app.include_router(creative_router)
app.include_router(discord_router)
app.include_router(self_modify_router)
@app.get("/", response_class=HTMLResponse)

View File

@@ -0,0 +1,71 @@
"""Self-modification routes — /self-modify endpoints.
Exposes the edit-test-commit loop as a REST API. Gated by
``SELF_MODIFY_ENABLED`` (default False).
"""
import asyncio
import logging
from fastapi import APIRouter, Form, HTTPException
from config import settings
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/self-modify", tags=["self-modify"])
@router.post("/run")
async def run_self_modify(
instruction: str = Form(...),
target_files: str = Form(""),
dry_run: bool = Form(False),
speak_result: bool = Form(False),
):
"""Execute a self-modification loop.
Returns the ModifyResult as JSON.
"""
if not settings.self_modify_enabled:
raise HTTPException(403, "Self-modification is disabled")
from self_modify.loop import SelfModifyLoop, ModifyRequest
files = [f.strip() for f in target_files.split(",") if f.strip()]
request = ModifyRequest(
instruction=instruction,
target_files=files,
dry_run=dry_run,
)
loop = SelfModifyLoop()
result = await asyncio.to_thread(loop.run, request)
if speak_result and result.success:
try:
from timmy_serve.voice_tts import voice_tts
if voice_tts.available:
voice_tts.speak(
f"Code modification complete. "
f"{len(result.files_changed)} files changed. Tests passing."
)
except Exception:
pass
return {
"success": result.success,
"files_changed": result.files_changed,
"test_passed": result.test_passed,
"commit_sha": result.commit_sha,
"branch_name": result.branch_name,
"error": result.error,
"attempts": result.attempts,
}
@router.get("/status")
async def self_modify_status():
"""Return whether self-modification is enabled."""
return {"enabled": settings.self_modify_enabled}

View File

@@ -55,6 +55,39 @@ async def process_voice_input(
elif intent.name == "voice":
response_text = "Voice settings acknowledged. TTS is available for spoken responses."
elif intent.name == "code":
from config import settings as app_settings
if not app_settings.self_modify_enabled:
response_text = (
"Self-modification is disabled. "
"Set SELF_MODIFY_ENABLED=true to enable."
)
else:
import asyncio
from self_modify.loop import SelfModifyLoop, ModifyRequest
target_files = []
if "target_file" in intent.entities:
target_files = [intent.entities["target_file"]]
loop = SelfModifyLoop()
request = ModifyRequest(
instruction=text,
target_files=target_files,
)
result = await asyncio.to_thread(loop.run, request)
if result.success:
sha_short = result.commit_sha[:8] if result.commit_sha else "none"
response_text = (
f"Code modification complete. "
f"Changed {len(result.files_changed)} file(s). "
f"Tests passed. Committed as {sha_short} "
f"on branch {result.branch_name}."
)
else:
response_text = f"Code modification failed: {result.error}"
else:
# Default: chat with Timmy
agent = create_timmy()

View File

134
src/self_modify/cli.py Normal file
View File

@@ -0,0 +1,134 @@
"""CLI for self-modification — run from the terminal.
Usage:
self-modify run "Add a docstring to src/timmy/prompts.py" --file src/timmy/prompts.py
self-modify run "Fix the bug in config" --dry-run
self-modify run "Add logging" --backend anthropic --autonomous
self-modify status
"""
import logging
import os
from typing import Optional
import typer
from rich.console import Console
from rich.panel import Panel
console = Console()
app = typer.Typer(help="Timmy self-modify — edit code, run tests, commit")
@app.command()
def run(
instruction: str = typer.Argument(..., help="What to change (natural language)"),
file: Optional[list[str]] = typer.Option(None, "--file", "-f", help="Target file(s) to modify"),
dry_run: bool = typer.Option(False, "--dry-run", "-n", help="Generate edits but don't write"),
retries: int = typer.Option(2, "--retries", "-r", help="Max retry attempts on test failure"),
backend: Optional[str] = typer.Option(None, "--backend", "-b", help="LLM backend: ollama, anthropic, auto"),
autonomous: bool = typer.Option(False, "--autonomous", "-a", help="Enable autonomous self-correction"),
max_cycles: int = typer.Option(3, "--max-cycles", help="Max autonomous correction cycles"),
branch: bool = typer.Option(False, "--branch", help="Create a git branch (off by default to avoid container restarts)"),
speak: bool = typer.Option(False, "--speak", "-s", help="Speak the result via TTS"),
):
"""Run the self-modification loop."""
# Force enable for CLI usage
os.environ["SELF_MODIFY_ENABLED"] = "true"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s -- %(message)s",
datefmt="%H:%M:%S",
)
# Skip branch creation unless explicitly requested
if not branch:
os.environ["SELF_MODIFY_SKIP_BRANCH"] = "1"
from self_modify.loop import SelfModifyLoop, ModifyRequest
target_files = list(file) if file else []
effective_backend = backend or os.environ.get("SELF_MODIFY_BACKEND", "auto")
console.print(Panel(
f"[bold]Instruction:[/bold] {instruction}\n"
f"[bold]Files:[/bold] {', '.join(target_files) or '(auto-detect)'}\n"
f"[bold]Backend:[/bold] {effective_backend}\n"
f"[bold]Autonomous:[/bold] {autonomous}\n"
f"[bold]Dry run:[/bold] {dry_run}\n"
f"[bold]Max retries:[/bold] {retries}",
title="Self-Modify",
border_style="cyan",
))
loop = SelfModifyLoop(
max_retries=retries,
backend=effective_backend,
autonomous=autonomous,
max_autonomous_cycles=max_cycles,
)
request = ModifyRequest(
instruction=instruction,
target_files=target_files,
dry_run=dry_run,
)
with console.status("[bold cyan]Running self-modification loop..."):
result = loop.run(request)
if result.report_path:
console.print(f"\n[dim]Report saved: {result.report_path}[/dim]\n")
if result.success:
console.print(Panel(
f"[green bold]SUCCESS[/green bold]\n\n"
f"Files changed: {', '.join(result.files_changed)}\n"
f"Tests passed: {result.test_passed}\n"
f"Commit: {result.commit_sha or 'none (dry run)'}\n"
f"Branch: {result.branch_name or 'current'}\n"
f"Attempts: {result.attempts}\n"
f"Autonomous cycles: {result.autonomous_cycles}",
title="Result",
border_style="green",
))
else:
console.print(Panel(
f"[red bold]FAILED[/red bold]\n\n"
f"Error: {result.error}\n"
f"Attempts: {result.attempts}\n"
f"Autonomous cycles: {result.autonomous_cycles}",
title="Result",
border_style="red",
))
raise typer.Exit(1)
if speak and result.success:
try:
from timmy_serve.voice_tts import voice_tts
if voice_tts.available:
voice_tts.speak_sync(
f"Code modification complete. "
f"{len(result.files_changed)} files changed. Tests passing."
)
except Exception:
pass
@app.command()
def status():
"""Show whether self-modification is enabled."""
from config import settings
enabled = settings.self_modify_enabled
color = "green" if enabled else "red"
console.print(f"Self-modification: [{color}]{'ENABLED' if enabled else 'DISABLED'}[/{color}]")
console.print(f"Max retries: {settings.self_modify_max_retries}")
console.print(f"Backend: {settings.self_modify_backend}")
console.print(f"Allowed dirs: {settings.self_modify_allowed_dirs}")
def main():
app()
if __name__ == "__main__":
main()

741
src/self_modify/loop.py Normal file
View File

@@ -0,0 +1,741 @@
"""Self-modification loop — read source, generate edits, test, commit.
Orchestrates the full cycle for Timmy to modify its own codebase:
1. Create a working git branch
2. Read target source files
3. Send instruction + source to the LLM
4. Validate syntax before writing
5. Write edits to disk
6. Run pytest
7. On success -> git add + commit; on failure -> revert
8. On total failure -> diagnose from report, restart autonomously
Supports multiple LLM backends:
- "ollama" — local Ollama (default, sovereign)
- "anthropic" — Claude API via Anthropic SDK
- "auto" — try anthropic first (if key set), fall back to ollama
Reports are saved to data/self_modify_reports/ for debugging.
"""
from __future__ import annotations
import logging
import os
import re
import subprocess
import sys
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from config import settings
logger = logging.getLogger(__name__)
# Project root — two levels up from src/self_modify/
PROJECT_ROOT = Path(__file__).parent.parent.parent
# Reports directory
REPORTS_DIR = PROJECT_ROOT / "data" / "self_modify_reports"
# Only one self-modification at a time
_LOCK = threading.Lock()
# Maximum file size we'll send to the LLM (bytes)
_MAX_FILE_SIZE = 50_000
# Delimiter format the LLM is instructed to use
_FILE_BLOCK_RE = re.compile(
r"---\s*FILE:\s*(.+?)\s*---\n(.*?)---\s*END\s*FILE\s*---",
re.DOTALL,
)
# Backend type literal
BACKENDS = ("ollama", "anthropic", "auto")
@dataclass
class ModifyRequest:
"""A request to modify code."""
instruction: str
target_files: list[str] = field(default_factory=list)
dry_run: bool = False
@dataclass
class ModifyResult:
"""Result of a self-modification attempt."""
success: bool
files_changed: list[str] = field(default_factory=list)
test_passed: bool = False
commit_sha: Optional[str] = None
branch_name: Optional[str] = None
error: Optional[str] = None
llm_response: str = ""
attempts: int = 0
report_path: Optional[str] = None
autonomous_cycles: int = 0
class SelfModifyLoop:
"""Orchestrates the read -> edit -> test -> commit cycle.
Supports autonomous self-correction: when all retries fail, reads its own
failure report, diagnoses the root cause, and restarts with a corrected
instruction.
"""
def __init__(
self,
repo_path: Optional[Path] = None,
max_retries: Optional[int] = None,
backend: Optional[str] = None,
autonomous: bool = False,
max_autonomous_cycles: int = 3,
) -> None:
self._repo_path = repo_path or PROJECT_ROOT
self._max_retries = (
max_retries if max_retries is not None else settings.self_modify_max_retries
)
self._allowed_dirs = [
d.strip() for d in settings.self_modify_allowed_dirs.split(",") if d.strip()
]
self._run_id = f"{int(time.time())}"
self._attempt_reports: list[dict] = []
self._backend = backend or settings.self_modify_backend
self._autonomous = autonomous
self._max_autonomous_cycles = max_autonomous_cycles
# ── Public API ────────────────────────────────────────────────────────────
def run(self, request: ModifyRequest) -> ModifyResult:
"""Execute the full self-modification loop."""
if not settings.self_modify_enabled:
return ModifyResult(
success=False,
error="Self-modification is disabled. Set SELF_MODIFY_ENABLED=true.",
)
if not _LOCK.acquire(blocking=False):
return ModifyResult(
success=False,
error="Another self-modification is already running.",
)
try:
result = self._run_locked(request)
report_path = self._save_report(request, result)
result.report_path = str(report_path)
# Autonomous mode: if failed, diagnose and restart
if self._autonomous and not result.success and not request.dry_run:
result = self._autonomous_loop(request, result, report_path)
return result
finally:
_LOCK.release()
# ── Autonomous self-correction ─────────────────────────────────────────
def _autonomous_loop(
self, original_request: ModifyRequest, last_result: ModifyResult, last_report: Path
) -> ModifyResult:
"""Read the failure report, diagnose, and restart with a fix."""
for cycle in range(1, self._max_autonomous_cycles + 1):
logger.info("Autonomous cycle %d/%d", cycle, self._max_autonomous_cycles)
# Diagnose what went wrong
diagnosis = self._diagnose_failure(last_report)
if not diagnosis:
logger.warning("Could not diagnose failure, stopping autonomous loop")
last_result.autonomous_cycles = cycle
return last_result
logger.info("Diagnosis: %s", diagnosis[:200])
# Build a corrected instruction
corrected_instruction = (
f"{original_request.instruction}\n\n"
f"IMPORTANT CORRECTION from previous failure:\n{diagnosis}"
)
# Reset attempt reports for this cycle
self._attempt_reports = []
corrected_request = ModifyRequest(
instruction=corrected_instruction,
target_files=original_request.target_files,
dry_run=original_request.dry_run,
)
result = self._run_locked(corrected_request)
report_path = self._save_report(corrected_request, result)
result.report_path = str(report_path)
result.autonomous_cycles = cycle
if result.success:
logger.info("Autonomous cycle %d succeeded!", cycle)
return result
last_result = result
last_report = report_path
logger.warning("Autonomous loop exhausted after %d cycles", self._max_autonomous_cycles)
return last_result
def _diagnose_failure(self, report_path: Path) -> Optional[str]:
"""Read a failure report and produce a diagnosis + fix instruction.
Uses the best available LLM to analyze the report. This is the
'meta-reasoning' step — the agent reasoning about its own failures.
"""
try:
report_text = report_path.read_text(encoding="utf-8")
except Exception as exc:
logger.error("Could not read report %s: %s", report_path, exc)
return None
# Truncate to keep within context limits
if len(report_text) > 8000:
report_text = report_text[:8000] + "\n... (truncated)"
diagnosis_prompt = f"""You are a code debugging expert. Analyze this self-modification failure report and provide a concise diagnosis.
FAILURE REPORT:
{report_text}
Analyze the report and provide:
1. ROOT CAUSE: What specifically went wrong (syntax error, logic error, missing import, etc.)
2. FIX INSTRUCTIONS: Exact instructions for a code-generation LLM to avoid this mistake.
Be very specific — e.g. "Do NOT start the file with triple-quotes" or
"The em-dash character U+2014 must stay INSIDE a string literal, never outside one."
Keep your response under 500 words. Focus on actionable fix instructions."""
try:
raw = self._call_llm(diagnosis_prompt)
return raw.strip() if raw else None
except Exception as exc:
logger.error("Diagnosis LLM call failed: %s", exc)
return None
# ── Internal orchestration ────────────────────────────────────────────────
def _run_locked(self, request: ModifyRequest) -> ModifyResult:
branch_name = None
attempt = 0
# Skip branch creation — writing files triggers container restarts
# which kills the process mid-operation. Work on the current branch.
if not os.environ.get("SELF_MODIFY_SKIP_BRANCH"):
try:
branch_name = self._create_branch()
except Exception as exc:
logger.warning("Could not create branch: %s (continuing on current)", exc)
# Resolve target files
target_files = request.target_files or self._infer_target_files(
request.instruction
)
if not target_files:
return ModifyResult(
success=False,
error="No target files identified. Specify target_files or use more specific language.",
branch_name=branch_name,
)
# Validate paths
try:
self._validate_paths(target_files)
except ValueError as exc:
return ModifyResult(success=False, error=str(exc), branch_name=branch_name)
last_test_output = ""
last_llm_response = ""
last_syntax_errors: dict[str, str] = {}
while attempt <= self._max_retries:
attempt += 1
logger.info(
"Self-modify attempt %d/%d: %s",
attempt,
self._max_retries + 1,
request.instruction[:80],
)
# Read current contents
file_contents = self._read_files(target_files)
if not file_contents:
return ModifyResult(
success=False,
error="Could not read any target files.",
branch_name=branch_name,
attempts=attempt,
)
# Generate edits via LLM
try:
edits, llm_response = self._generate_edits(
request.instruction, file_contents,
prev_test_output=last_test_output if attempt > 1 else None,
prev_syntax_errors=last_syntax_errors if attempt > 1 else None,
)
last_llm_response = llm_response
except Exception as exc:
self._attempt_reports.append({
"attempt": attempt,
"phase": "llm_generation",
"error": str(exc),
})
return ModifyResult(
success=False,
error=f"LLM generation failed: {exc}",
branch_name=branch_name,
attempts=attempt,
)
if not edits:
self._attempt_reports.append({
"attempt": attempt,
"phase": "parse_edits",
"error": "No file edits parsed from LLM response",
"llm_response": llm_response,
})
return ModifyResult(
success=False,
error="LLM produced no file edits.",
llm_response=llm_response,
branch_name=branch_name,
attempts=attempt,
)
# Syntax validation — check BEFORE writing to disk
syntax_errors = self._validate_syntax(edits)
if syntax_errors:
last_syntax_errors = syntax_errors
error_summary = "; ".join(
f"{fp}: {err}" for fp, err in syntax_errors.items()
)
logger.warning("Syntax errors in LLM output: %s", error_summary)
self._attempt_reports.append({
"attempt": attempt,
"phase": "syntax_validation",
"error": error_summary,
"edits_content": {fp: content for fp, content in edits.items()},
"llm_response": llm_response,
})
# Don't write — go straight to retry
continue
last_syntax_errors = {}
if request.dry_run:
self._attempt_reports.append({
"attempt": attempt,
"phase": "dry_run",
"edits": {fp: content[:500] + "..." if len(content) > 500 else content
for fp, content in edits.items()},
"llm_response": llm_response,
})
return ModifyResult(
success=True,
files_changed=list(edits.keys()),
llm_response=llm_response,
branch_name=branch_name,
attempts=attempt,
)
# Write edits
written = self._write_files(edits)
# Run tests
test_passed, test_output = self._run_tests()
last_test_output = test_output
# Save per-attempt report
self._attempt_reports.append({
"attempt": attempt,
"phase": "complete",
"files_written": written,
"edits_content": {fp: content for fp, content in edits.items()},
"test_passed": test_passed,
"test_output": test_output,
"llm_response": llm_response,
})
if test_passed:
sha = self._git_commit(
f"self-modify: {request.instruction[:72]}", written
)
return ModifyResult(
success=True,
files_changed=written,
test_passed=True,
commit_sha=sha,
branch_name=branch_name,
llm_response=llm_response,
attempts=attempt,
)
# Tests failed — revert and maybe retry
logger.warning(
"Tests failed on attempt %d: %s", attempt, test_output[:200]
)
self._revert_files(written)
return ModifyResult(
success=False,
files_changed=[],
test_passed=False,
error=f"Tests failed after {attempt} attempt(s).",
llm_response=last_llm_response,
branch_name=branch_name,
attempts=attempt,
)
# ── Syntax validation ──────────────────────────────────────────────────
def _validate_syntax(self, edits: dict[str, str]) -> dict[str, str]:
"""Compile-check each .py file edit. Returns {path: error} for failures."""
errors: dict[str, str] = {}
for fp, content in edits.items():
if not fp.endswith(".py"):
continue
try:
compile(content, fp, "exec")
except SyntaxError as exc:
errors[fp] = f"line {exc.lineno}: {exc.msg}"
return errors
# ── Report saving ─────────────────────────────────────────────────────────
def _save_report(self, request: ModifyRequest, result: ModifyResult) -> Path:
"""Save a detailed report to data/self_modify_reports/."""
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
slug = re.sub(r"[^a-z0-9]+", "_", request.instruction[:40].lower()).strip("_")
report_file = REPORTS_DIR / f"{ts}_{slug}.md"
lines = [
f"# Self-Modify Report: {ts}",
"",
f"**Instruction:** {request.instruction[:200]}",
f"**Target files:** {', '.join(request.target_files) or '(auto-detected)'}",
f"**Dry run:** {request.dry_run}",
f"**Backend:** {self._backend}",
f"**Branch:** {result.branch_name or 'N/A'}",
f"**Result:** {'SUCCESS' if result.success else 'FAILED'}",
f"**Error:** {result.error or 'none'}",
f"**Commit:** {result.commit_sha or 'none'}",
f"**Attempts:** {result.attempts}",
f"**Autonomous cycles:** {result.autonomous_cycles}",
"",
]
for attempt_data in self._attempt_reports:
n = attempt_data.get("attempt", "?")
phase = attempt_data.get("phase", "?")
lines.append(f"## Attempt {n} -- {phase}")
lines.append("")
if "error" in attempt_data and attempt_data.get("phase") != "complete":
lines.append(f"**Error:** {attempt_data['error']}")
lines.append("")
if "llm_response" in attempt_data:
lines.append("### LLM Response")
lines.append("```")
lines.append(attempt_data["llm_response"])
lines.append("```")
lines.append("")
if "edits_content" in attempt_data:
lines.append("### Edits Written")
for fp, content in attempt_data["edits_content"].items():
lines.append(f"#### {fp}")
lines.append("```python")
lines.append(content)
lines.append("```")
lines.append("")
if "test_output" in attempt_data:
lines.append(f"### Test Result: {'PASSED' if attempt_data.get('test_passed') else 'FAILED'}")
lines.append("```")
lines.append(attempt_data["test_output"])
lines.append("```")
lines.append("")
report_text = "\n".join(lines)
report_file.write_text(report_text, encoding="utf-8")
logger.info("Report saved: %s", report_file)
return report_file
# ── Git helpers ───────────────────────────────────────────────────────────
def _create_branch(self) -> str:
"""Create and switch to a working branch."""
from tools.git_tools import git_branch
branch_name = f"timmy/self-modify-{int(time.time())}"
git_branch(self._repo_path, create=branch_name, switch=branch_name)
logger.info("Created branch: %s", branch_name)
return branch_name
def _git_commit(self, message: str, files: list[str]) -> Optional[str]:
"""Stage files and commit."""
from tools.git_tools import git_add, git_commit
try:
git_add(self._repo_path, paths=files)
result = git_commit(self._repo_path, message)
sha = result.get("sha")
logger.info("Committed %s: %s", sha[:8] if sha else "?", message)
return sha
except Exception as exc:
logger.error("Git commit failed: %s", exc)
return None
def _revert_files(self, file_paths: list[str]) -> None:
"""Restore files from git HEAD."""
for fp in file_paths:
try:
subprocess.run(
["git", "checkout", "HEAD", "--", fp],
cwd=self._repo_path,
capture_output=True,
timeout=10,
)
except Exception as exc:
logger.error("Failed to revert %s: %s", fp, exc)
# ── File I/O ──────────────────────────────────────────────────────────────
def _validate_paths(self, file_paths: list[str]) -> None:
"""Ensure all paths are within allowed directories."""
for fp in file_paths:
resolved = (self._repo_path / fp).resolve()
repo_resolved = self._repo_path.resolve()
if not str(resolved).startswith(str(repo_resolved)):
raise ValueError(f"Path escapes repository: {fp}")
rel = str(resolved.relative_to(repo_resolved))
if not any(rel.startswith(d) for d in self._allowed_dirs):
raise ValueError(
f"Path not in allowed directories ({self._allowed_dirs}): {fp}"
)
def _read_files(self, file_paths: list[str]) -> dict[str, str]:
"""Read file contents from disk."""
contents: dict[str, str] = {}
for fp in file_paths:
full = self._repo_path / fp
if not full.is_file():
logger.warning("File not found: %s", full)
continue
if full.stat().st_size > _MAX_FILE_SIZE:
logger.warning("File too large, skipping: %s", fp)
continue
try:
contents[fp] = full.read_text(encoding="utf-8")
except Exception as exc:
logger.warning("Could not read %s: %s", fp, exc)
return contents
def _write_files(self, edits: dict[str, str]) -> list[str]:
"""Write edited content to disk. Returns paths written."""
written: list[str] = []
for fp, content in edits.items():
full = self._repo_path / fp
full.parent.mkdir(parents=True, exist_ok=True)
full.write_text(content, encoding="utf-8")
written.append(fp)
logger.info("Wrote %d bytes to %s", len(content), fp)
return written
def _infer_target_files(self, instruction: str) -> list[str]:
"""Guess which files to modify from the instruction text."""
paths = re.findall(r"[\w/._-]+\.py", instruction)
if paths:
return paths
keyword_files = {
"config": ["src/config.py"],
"health": ["src/dashboard/routes/health.py"],
"swarm": ["src/swarm/coordinator.py"],
"voice": ["src/voice/nlu.py"],
"agent": ["src/timmy/agent.py"],
"tool": ["src/timmy/tools.py"],
"dashboard": ["src/dashboard/app.py"],
"prompt": ["src/timmy/prompts.py"],
}
instruction_lower = instruction.lower()
for keyword, files in keyword_files.items():
if keyword in instruction_lower:
return files
return []
# ── Test runner ───────────────────────────────────────────────────────────
def _run_tests(self) -> tuple[bool, str]:
"""Run the test suite. Returns (passed, output)."""
try:
result = subprocess.run(
[sys.executable, "-m", "pytest", "tests/", "-q", "--tb=short"],
capture_output=True,
text=True,
cwd=self._repo_path,
timeout=120,
)
output = (result.stdout + result.stderr).strip()
return result.returncode == 0, output
except subprocess.TimeoutExpired:
return False, "Tests timed out after 120s"
except Exception as exc:
return False, f"Failed to run tests: {exc}"
# ── Multi-backend LLM ─────────────────────────────────────────────────────
def _resolve_backend(self) -> str:
"""Resolve 'auto' backend to a concrete one."""
if self._backend == "auto":
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
if api_key:
return "anthropic"
return "ollama"
return self._backend
def _call_llm(self, prompt: str) -> str:
"""Route a prompt to the configured LLM backend. Returns raw text."""
backend = self._resolve_backend()
if backend == "anthropic":
return self._call_anthropic(prompt)
else:
return self._call_ollama(prompt)
def _call_anthropic(self, prompt: str) -> str:
"""Call Claude via the Anthropic SDK."""
import anthropic
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
if not api_key:
raise RuntimeError("ANTHROPIC_API_KEY not set — cannot use anthropic backend")
client = anthropic.Anthropic(api_key=api_key)
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}],
)
return message.content[0].text
def _call_ollama(self, prompt: str) -> str:
"""Call the local Ollama instance via Agno."""
from agno.agent import Agent
from agno.models.ollama import Ollama
agent = Agent(
name="SelfModify",
model=Ollama(id=settings.ollama_model, host=settings.ollama_url),
markdown=False,
)
run_result = agent.run(prompt, stream=False)
return run_result.content if hasattr(run_result, "content") else str(run_result)
# ── LLM interaction ───────────────────────────────────────────────────────
def _generate_edits(
self,
instruction: str,
file_contents: dict[str, str],
prev_test_output: Optional[str] = None,
prev_syntax_errors: Optional[dict[str, str]] = None,
) -> tuple[dict[str, str], str]:
"""Ask the LLM to generate file edits.
Returns (edits_dict, raw_llm_response).
"""
# Build the prompt
files_block = ""
for fp, content in file_contents.items():
files_block += f"\n<FILE path=\"{fp}\">\n{content}\n</FILE>\n"
retry_context = ""
if prev_test_output:
retry_context += f"""
PREVIOUS ATTEMPT FAILED with test errors:
<TEST_OUTPUT>
{prev_test_output[:2000]}
</TEST_OUTPUT>
Fix the issues shown above.
"""
if prev_syntax_errors:
errors_text = "\n".join(f" {fp}: {err}" for fp, err in prev_syntax_errors.items())
retry_context += f"""
PREVIOUS ATTEMPT HAD SYNTAX ERRORS (code was rejected before writing):
{errors_text}
You MUST produce syntactically valid Python. Run through the code mentally
and make sure all strings are properly terminated, all indentation is correct,
and there are no invalid characters outside of string literals.
"""
prompt = f"""You are a precise code modification agent. Edit source files according to the instruction.
INSTRUCTION: {instruction}
CURRENT FILES:
{files_block}
{retry_context}
OUTPUT FORMAT — wrap each modified file like this:
<MODIFIED path="filepath">
complete file content here
</MODIFIED>
CRITICAL RULES:
- Output the COMPLETE file content, not just changed lines
- Keep ALL existing functionality unless told to remove it
- The output must be syntactically valid Python — verify mentally before outputting
- Preserve all special characters (unicode, em-dashes, etc.) exactly as they appear in the original
- Do NOT wrap the file content in triple-quotes or markdown code fences
- Do NOT start the file content with \"\"\" — that would turn the code into a string literal
- Follow the existing code style
Generate the modified files now:"""
raw = self._call_llm(prompt)
# Parse <MODIFIED path="..."> ... </MODIFIED> blocks
edits = {}
xml_re = re.compile(
r'<MODIFIED\s+path=["\'](.+?)["\']\s*>\n?(.*?)</MODIFIED>',
re.DOTALL,
)
for match in xml_re.finditer(raw):
filepath = match.group(1).strip()
content = match.group(2)
# Strip trailing whitespace but keep a final newline
content = content.rstrip() + "\n"
edits[filepath] = content
# Fallback: try the old delimiter format
if not edits:
for match in _FILE_BLOCK_RE.finditer(raw):
filepath = match.group(1).strip()
content = match.group(2).rstrip() + "\n"
edits[filepath] = content
# Last resort: single file + code block
if not edits and len(file_contents) == 1:
only_path = next(iter(file_contents))
code_match = re.search(r"```(?:python)?\n(.*?)```", raw, re.DOTALL)
if code_match:
edits[only_path] = code_match.group(1).rstrip() + "\n"
return edits, raw

View File

@@ -276,22 +276,55 @@ Response:"""
class DirectToolExecutor(ToolExecutor):
"""Tool executor that actually calls tools directly.
This is a more advanced version that actually executes the tools
rather than just simulating. Use with caution - it has real side effects.
Currently WIP - for future implementation.
For code-modification tasks assigned to the Forge persona, dispatches
to the SelfModifyLoop for real edit → test → commit execution.
Other tasks fall back to the simulated parent.
"""
_CODE_KEYWORDS = frozenset({
"modify", "edit", "fix", "refactor", "implement",
"add function", "change code", "update source", "patch",
})
def execute_with_tools(self, task_description: str) -> dict[str, Any]:
"""Actually execute tools to complete the task.
This would involve:
1. Parsing the task into tool calls
2. Executing each tool
3. Handling results and errors
4. Potentially iterating based on results
"""Execute tools to complete the task.
Code-modification tasks on the Forge persona are routed through
the SelfModifyLoop. Everything else delegates to the parent.
"""
# Future: Implement ReAct pattern or similar
# For now, just delegate to parent
task_lower = task_description.lower()
is_code_task = any(kw in task_lower for kw in self._CODE_KEYWORDS)
if is_code_task and self._persona_id == "forge":
try:
from config import settings as cfg
if not cfg.self_modify_enabled:
return self.execute_task(task_description)
from self_modify.loop import SelfModifyLoop, ModifyRequest
loop = SelfModifyLoop()
result = loop.run(ModifyRequest(instruction=task_description))
return {
"success": result.success,
"result": (
f"Modified {len(result.files_changed)} file(s). "
f"Tests {'passed' if result.test_passed else 'failed'}."
),
"tools_used": ["read_file", "write_file", "shell", "git_commit"],
"persona_id": self._persona_id,
"agent_id": self._agent_id,
"commit_sha": result.commit_sha,
}
except Exception as exc:
logger.exception("Direct tool execution failed")
return {
"success": False,
"error": str(exc),
"result": None,
"tools_used": [],
}
return self.execute_task(task_description)

View File

@@ -11,6 +11,7 @@ Intents:
- task: Task creation/management
- help: Request help or list commands
- voice: Voice settings (volume, rate, etc.)
- code: Code modification / self-modify commands
- unknown: Unrecognized intent
"""
@@ -62,6 +63,14 @@ _PATTERNS: list[tuple[str, re.Pattern, float]] = [
r"\b(voice|speak|volume|rate|speed|louder|quieter|faster|slower|mute|unmute)\b",
re.IGNORECASE,
), 0.85),
# Code modification / self-modify
("code", re.compile(
r"\b(modify|edit|change|update|fix|refactor|implement|patch)\s+(the\s+)?(code|file|function|class|module|source)\b"
r"|\bself[- ]?modify\b"
r"|\b(update|change|edit)\s+(your|the)\s+(code|source)\b",
re.IGNORECASE,
), 0.9),
]
# Keywords for entity extraction
@@ -69,6 +78,7 @@ _ENTITY_PATTERNS = {
"agent_name": re.compile(r"(?:spawn|start)\s+(?:agent\s+)?(\w+)|(?:agent)\s+(\w+)", re.IGNORECASE),
"task_description": re.compile(r"(?:task|assign)[:;]?\s+(.+)", re.IGNORECASE),
"number": re.compile(r"\b(\d+)\b"),
"target_file": re.compile(r"(?:in|file|modify)\s+(?:the\s+)?([/\w._-]+\.py)", re.IGNORECASE),
}

450
tests/test_self_modify.py Normal file
View File

@@ -0,0 +1,450 @@
"""Tests for the self-modification loop (self_modify/loop.py).
All tests are fully mocked — no Ollama, no real file I/O, no git.
"""
from unittest.mock import MagicMock, patch
from pathlib import Path
import pytest
from self_modify.loop import SelfModifyLoop, ModifyRequest, ModifyResult
# ── Dataclass tests ───────────────────────────────────────────────────────────
class TestModifyRequest:
def test_defaults(self):
req = ModifyRequest(instruction="Fix the bug")
assert req.instruction == "Fix the bug"
assert req.target_files == []
assert req.dry_run is False
def test_with_target_files(self):
req = ModifyRequest(
instruction="Add docstring",
target_files=["src/foo.py"],
dry_run=True,
)
assert req.target_files == ["src/foo.py"]
assert req.dry_run is True
class TestModifyResult:
def test_success_result(self):
result = ModifyResult(
success=True,
files_changed=["src/foo.py"],
test_passed=True,
commit_sha="abc12345",
branch_name="timmy/self-modify-123",
llm_response="...",
attempts=1,
)
assert result.success
assert result.commit_sha == "abc12345"
assert result.error is None
assert result.autonomous_cycles == 0
def test_failure_result(self):
result = ModifyResult(success=False, error="something broke")
assert not result.success
assert result.error == "something broke"
assert result.files_changed == []
# ── SelfModifyLoop unit tests ────────────────────────────────────────────────
class TestSelfModifyLoop:
def test_init_defaults(self):
loop = SelfModifyLoop()
assert loop._max_retries == 2
def test_init_custom_retries(self):
loop = SelfModifyLoop(max_retries=5)
assert loop._max_retries == 5
def test_init_backend(self):
loop = SelfModifyLoop(backend="anthropic")
assert loop._backend == "anthropic"
def test_init_autonomous(self):
loop = SelfModifyLoop(autonomous=True, max_autonomous_cycles=5)
assert loop._autonomous is True
assert loop._max_autonomous_cycles == 5
@patch("self_modify.loop.settings")
def test_run_disabled(self, mock_settings):
mock_settings.self_modify_enabled = False
loop = SelfModifyLoop()
result = loop.run(ModifyRequest(instruction="test"))
assert not result.success
assert "disabled" in result.error.lower()
@patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
@patch("self_modify.loop.settings")
def test_run_no_target_files(self, mock_settings):
mock_settings.self_modify_enabled = True
mock_settings.self_modify_max_retries = 0
mock_settings.self_modify_allowed_dirs = "src,tests"
mock_settings.self_modify_backend = "ollama"
loop = SelfModifyLoop()
loop._infer_target_files = MagicMock(return_value=[])
result = loop.run(ModifyRequest(instruction="do something vague"))
assert not result.success
assert "no target files" in result.error.lower()
@patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
@patch("self_modify.loop.settings")
def test_run_success_path(self, mock_settings):
mock_settings.self_modify_enabled = True
mock_settings.self_modify_max_retries = 2
mock_settings.self_modify_allowed_dirs = "src,tests"
mock_settings.self_modify_backend = "ollama"
loop = SelfModifyLoop()
loop._read_files = MagicMock(return_value={"src/foo.py": "old content"})
loop._generate_edits = MagicMock(
return_value=({"src/foo.py": "x = 1\n"}, "llm raw")
)
loop._write_files = MagicMock(return_value=["src/foo.py"])
loop._run_tests = MagicMock(return_value=(True, "5 passed"))
loop._git_commit = MagicMock(return_value="abc12345")
loop._validate_paths = MagicMock()
result = loop.run(
ModifyRequest(instruction="Add docstring", target_files=["src/foo.py"])
)
assert result.success
assert result.test_passed
assert result.commit_sha == "abc12345"
assert result.files_changed == ["src/foo.py"]
loop._run_tests.assert_called_once()
loop._git_commit.assert_called_once()
@patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
@patch("self_modify.loop.settings")
def test_run_test_failure_reverts(self, mock_settings):
mock_settings.self_modify_enabled = True
mock_settings.self_modify_max_retries = 0
mock_settings.self_modify_allowed_dirs = "src,tests"
mock_settings.self_modify_backend = "ollama"
loop = SelfModifyLoop(max_retries=0)
loop._read_files = MagicMock(return_value={"src/foo.py": "old content"})
loop._generate_edits = MagicMock(
return_value=({"src/foo.py": "x = 1\n"}, "llm raw")
)
loop._write_files = MagicMock(return_value=["src/foo.py"])
loop._run_tests = MagicMock(return_value=(False, "1 failed"))
loop._revert_files = MagicMock()
loop._validate_paths = MagicMock()
result = loop.run(
ModifyRequest(instruction="Break it", target_files=["src/foo.py"])
)
assert not result.success
assert not result.test_passed
loop._revert_files.assert_called()
@patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
@patch("self_modify.loop.settings")
def test_dry_run(self, mock_settings):
mock_settings.self_modify_enabled = True
mock_settings.self_modify_max_retries = 2
mock_settings.self_modify_allowed_dirs = "src,tests"
mock_settings.self_modify_backend = "ollama"
loop = SelfModifyLoop()
loop._read_files = MagicMock(return_value={"src/foo.py": "old content"})
loop._generate_edits = MagicMock(
return_value=({"src/foo.py": "x = 1\n"}, "llm raw")
)
loop._validate_paths = MagicMock()
result = loop.run(
ModifyRequest(
instruction="Add docstring",
target_files=["src/foo.py"],
dry_run=True,
)
)
assert result.success
assert result.files_changed == ["src/foo.py"]
# ── Syntax validation tests ─────────────────────────────────────────────────
class TestSyntaxValidation:
def test_valid_python_passes(self):
loop = SelfModifyLoop()
errors = loop._validate_syntax({"src/foo.py": "x = 1\nprint(x)\n"})
assert errors == {}
def test_invalid_python_caught(self):
loop = SelfModifyLoop()
errors = loop._validate_syntax({"src/foo.py": "def foo(\n"})
assert "src/foo.py" in errors
assert "line" in errors["src/foo.py"]
def test_unterminated_string_caught(self):
loop = SelfModifyLoop()
bad_code = '"""\nTIMMY = """\nstuff\n"""\n'
errors = loop._validate_syntax({"src/foo.py": bad_code})
# This specific code is actually valid, but let's test truly broken code
broken = '"""\nunclosed string\n'
errors = loop._validate_syntax({"src/foo.py": broken})
assert "src/foo.py" in errors
def test_non_python_files_skipped(self):
loop = SelfModifyLoop()
errors = loop._validate_syntax({"README.md": "this is not python {{{}"})
assert errors == {}
@patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
@patch("self_modify.loop.settings")
def test_syntax_error_skips_write(self, mock_settings):
"""When LLM produces invalid syntax, we skip writing and retry."""
mock_settings.self_modify_enabled = True
mock_settings.self_modify_max_retries = 1
mock_settings.self_modify_allowed_dirs = "src,tests"
mock_settings.self_modify_backend = "ollama"
loop = SelfModifyLoop(max_retries=1)
loop._read_files = MagicMock(return_value={"src/foo.py": "x = 1\n"})
# First call returns broken syntax, second returns valid
loop._generate_edits = MagicMock(side_effect=[
({"src/foo.py": "def foo(\n"}, "bad llm"),
({"src/foo.py": "def foo():\n pass\n"}, "good llm"),
])
loop._write_files = MagicMock(return_value=["src/foo.py"])
loop._run_tests = MagicMock(return_value=(True, "passed"))
loop._git_commit = MagicMock(return_value="abc123")
loop._validate_paths = MagicMock()
result = loop.run(
ModifyRequest(instruction="Fix foo", target_files=["src/foo.py"])
)
assert result.success
# _write_files should only be called once (for the valid attempt)
loop._write_files.assert_called_once()
# ── Multi-backend tests ──────────────────────────────────────────────────────
class TestBackendResolution:
def test_resolve_ollama(self):
loop = SelfModifyLoop(backend="ollama")
assert loop._resolve_backend() == "ollama"
def test_resolve_anthropic(self):
loop = SelfModifyLoop(backend="anthropic")
assert loop._resolve_backend() == "anthropic"
@patch.dict("os.environ", {"ANTHROPIC_API_KEY": "sk-test-123"})
def test_resolve_auto_with_key(self):
loop = SelfModifyLoop(backend="auto")
assert loop._resolve_backend() == "anthropic"
@patch.dict("os.environ", {}, clear=True)
def test_resolve_auto_without_key(self):
loop = SelfModifyLoop(backend="auto")
assert loop._resolve_backend() == "ollama"
# ── Autonomous loop tests ────────────────────────────────────────────────────
class TestAutonomousLoop:
@patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
@patch("self_modify.loop.settings")
def test_autonomous_retries_after_failure(self, mock_settings):
mock_settings.self_modify_enabled = True
mock_settings.self_modify_max_retries = 0
mock_settings.self_modify_allowed_dirs = "src,tests"
mock_settings.self_modify_backend = "ollama"
loop = SelfModifyLoop(max_retries=0, autonomous=True, max_autonomous_cycles=2)
loop._validate_paths = MagicMock()
loop._read_files = MagicMock(return_value={"src/foo.py": "x = 1\n"})
# First run fails, autonomous cycle 1 succeeds
call_count = [0]
def fake_generate(instruction, contents, prev_test_output=None, prev_syntax_errors=None):
call_count[0] += 1
return ({"src/foo.py": "x = 2\n"}, "llm raw")
loop._generate_edits = MagicMock(side_effect=fake_generate)
loop._write_files = MagicMock(return_value=["src/foo.py"])
loop._revert_files = MagicMock()
# First call fails tests, second succeeds
test_results = [(False, "FAILED"), (True, "PASSED")]
loop._run_tests = MagicMock(side_effect=test_results)
loop._git_commit = MagicMock(return_value="abc123")
loop._diagnose_failure = MagicMock(return_value="Fix: do X instead of Y")
result = loop.run(
ModifyRequest(instruction="Fix foo", target_files=["src/foo.py"])
)
assert result.success
assert result.autonomous_cycles == 1
loop._diagnose_failure.assert_called_once()
def test_diagnose_failure_reads_report(self, tmp_path):
report = tmp_path / "report.md"
report.write_text("# Report\n**Error:** SyntaxError line 5\n")
loop = SelfModifyLoop(backend="ollama")
loop._call_llm = MagicMock(return_value="ROOT CAUSE: Missing closing paren")
diagnosis = loop._diagnose_failure(report)
assert "Missing closing paren" in diagnosis
loop._call_llm.assert_called_once()
def test_diagnose_failure_handles_missing_report(self, tmp_path):
loop = SelfModifyLoop(backend="ollama")
result = loop._diagnose_failure(tmp_path / "nonexistent.md")
assert result is None
# ── Path validation tests ─────────────────────────────────────────────────────
class TestPathValidation:
def test_rejects_path_outside_repo(self):
loop = SelfModifyLoop(repo_path=Path("/tmp/test-repo"))
with pytest.raises(ValueError, match="escapes repository"):
loop._validate_paths(["../../etc/passwd"])
def test_rejects_path_outside_allowed_dirs(self):
loop = SelfModifyLoop(repo_path=Path("/tmp/test-repo"))
with pytest.raises(ValueError, match="not in allowed directories"):
loop._validate_paths(["docs/secret.py"])
def test_accepts_src_path(self):
loop = SelfModifyLoop(repo_path=Path("/tmp/test-repo"))
loop._validate_paths(["src/some_module.py"])
def test_accepts_tests_path(self):
loop = SelfModifyLoop(repo_path=Path("/tmp/test-repo"))
loop._validate_paths(["tests/test_something.py"])
# ── File inference tests ──────────────────────────────────────────────────────
class TestFileInference:
def test_infer_explicit_py_path(self):
loop = SelfModifyLoop()
files = loop._infer_target_files("fix bug in src/dashboard/app.py")
assert "src/dashboard/app.py" in files
def test_infer_from_keyword_config(self):
loop = SelfModifyLoop()
files = loop._infer_target_files("update the config to add a new setting")
assert "src/config.py" in files
def test_infer_from_keyword_agent(self):
loop = SelfModifyLoop()
files = loop._infer_target_files("modify the agent prompt")
assert "src/timmy/agent.py" in files
def test_infer_returns_empty_for_vague(self):
loop = SelfModifyLoop()
files = loop._infer_target_files("do something cool")
assert files == []
# ── NLU intent tests ──────────────────────────────────────────────────────────
class TestCodeIntent:
def test_detects_modify_code(self):
from voice.nlu import detect_intent
intent = detect_intent("modify the code in config.py")
assert intent.name == "code"
def test_detects_self_modify(self):
from voice.nlu import detect_intent
intent = detect_intent("self-modify to add a new endpoint")
assert intent.name == "code"
def test_detects_edit_source(self):
from voice.nlu import detect_intent
intent = detect_intent("edit the source to fix the bug")
assert intent.name == "code"
def test_detects_update_your_code(self):
from voice.nlu import detect_intent
intent = detect_intent("update your code to handle errors")
assert intent.name == "code"
def test_detects_fix_function(self):
from voice.nlu import detect_intent
intent = detect_intent("fix the function that calculates totals")
assert intent.name == "code"
def test_does_not_match_general_chat(self):
from voice.nlu import detect_intent
intent = detect_intent("tell me about the weather today")
assert intent.name == "chat"
def test_extracts_target_file_entity(self):
from voice.nlu import detect_intent
intent = detect_intent("modify file src/config.py to add debug flag")
assert intent.entities.get("target_file") == "src/config.py"
# ── Route tests ───────────────────────────────────────────────────────────────
class TestSelfModifyRoutes:
def test_status_endpoint(self, client):
resp = client.get("/self-modify/status")
assert resp.status_code == 200
data = resp.json()
assert "enabled" in data
assert data["enabled"] is False # Default
def test_run_when_disabled(self, client):
resp = client.post("/self-modify/run", data={"instruction": "test"})
assert resp.status_code == 403
# ── DirectToolExecutor integration ────────────────────────────────────────────
class TestDirectToolExecutor:
def test_code_task_falls_back_when_disabled(self):
from swarm.tool_executor import DirectToolExecutor
executor = DirectToolExecutor("forge", "forge-test-001")
result = executor.execute_with_tools("modify the code to fix bug")
# Should fall back to simulated since self_modify_enabled=False
assert isinstance(result, dict)
assert "result" in result or "success" in result
def test_non_code_task_delegates_to_parent(self):
from swarm.tool_executor import DirectToolExecutor
executor = DirectToolExecutor("echo", "echo-test-001")
result = executor.execute_with_tools("search for information")
assert isinstance(result, dict)