Merge pull request '[loop-cycle-1] feat: tool allowlist for autonomous operation (#69)' (#88) from fix/tool-allowlist-autonomous into main
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 13s

This commit was merged in pull request #88.
This commit is contained in:
2026-03-14 17:41:56 -04:00
5 changed files with 527 additions and 24 deletions

77
config/allowlist.yaml Normal file
View File

@@ -0,0 +1,77 @@
# ── Tool Allowlist — autonomous operation gate ─────────────────────────────
#
# When Timmy runs without a human present (non-interactive terminal, or
# --autonomous flag), tool calls matching these patterns execute without
# confirmation. Anything NOT listed here is auto-rejected.
#
# This file is the ONLY gate for autonomous tool execution.
# GOLDEN_TIMMY in approvals.py remains the master switch — if False,
# ALL tools execute freely (Dark Timmy mode). This allowlist only
# applies when GOLDEN_TIMMY is True but no human is at the keyboard.
#
# Edit with care. This is sovereignty in action.
# ────────────────────────────────────────────────────────────────────────────
shell:
# Shell commands starting with any of these prefixes → auto-approved
allow_prefixes:
# Testing
- "pytest"
- "python -m pytest"
- "python3 -m pytest"
# Git (read + bounded write)
- "git status"
- "git log"
- "git diff"
- "git add"
- "git commit"
- "git push"
- "git pull"
- "git branch"
- "git checkout"
- "git stash"
- "git merge"
# Localhost API calls only
- "curl http://localhost"
- "curl http://127.0.0.1"
- "curl -s http://localhost"
- "curl -s http://127.0.0.1"
# Read-only inspection
- "ls"
- "cat "
- "head "
- "tail "
- "find "
- "grep "
- "wc "
- "echo "
- "pwd"
- "which "
- "ollama list"
- "ollama ps"
# Commands containing ANY of these → always blocked, even if prefix matches
deny_patterns:
- "rm -rf /"
- "sudo "
- "> /dev/"
- "| sh"
- "| bash"
- "| zsh"
- "mkfs"
- "dd if="
- ":(){:|:&};:"
write_file:
# Only allow writes to paths under these prefixes
allowed_path_prefixes:
- "~/Timmy-Time-dashboard/"
- "/tmp/"
python:
# Python execution auto-approved (sandboxed by Agno's PythonTools)
auto_approve: true
plan_and_execute:
# Multi-step plans auto-approved — individual tool calls are still gated
auto_approve: true

View File

@@ -1,11 +1,12 @@
import logging
import subprocess
import sys
import typer
from timmy.agent import create_timmy
from timmy.prompts import STATUS_PROMPT
from timmy.tool_safety import format_action_description, get_impact_level
from timmy.tool_safety import format_action_description, get_impact_level, is_allowlisted
logger = logging.getLogger(__name__)
@@ -30,15 +31,26 @@ _MODEL_SIZE_OPTION = typer.Option(
)
def _handle_tool_confirmation(agent, run_output, session_id: str):
def _is_interactive() -> bool:
"""Return True if stdin is a real terminal (human present)."""
return hasattr(sys.stdin, "isatty") and sys.stdin.isatty()
def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False):
"""Prompt user to approve/reject dangerous tool calls.
When Agno pauses a run because a tool requires confirmation, this
function displays the action, asks for approval via stdin, and
resumes or rejects the run accordingly.
When autonomous=True (or stdin is not a terminal), tool calls are
checked against config/allowlist.yaml instead of prompting.
Allowlisted calls are auto-approved; everything else is auto-rejected.
Returns the final RunOutput after all confirmations are resolved.
"""
interactive = _is_interactive() and not autonomous
max_rounds = 10 # safety limit
for _ in range(max_rounds):
status = getattr(run_output, "status", None)
@@ -58,22 +70,34 @@ def _handle_tool_confirmation(agent, run_output, session_id: str):
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
if interactive:
# Human present — prompt for approval
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
approved = typer.confirm("Allow this action?", default=False)
if approved:
req.confirm()
logger.info("CLI: approved %s", tool_name)
approved = typer.confirm("Allow this action?", default=False)
if approved:
req.confirm()
logger.info("CLI: approved %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
# Autonomous mode — check allowlist
if is_allowlisted(tool_name, tool_args):
req.confirm()
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info(
"AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100]
)
# Resume the run so the agent sees the confirmation result
try:
@@ -133,11 +157,21 @@ def chat(
"--session-id",
help="Use a specific session ID for this conversation",
),
autonomous: bool = typer.Option(
False,
"--autonomous",
"-a",
help="Autonomous mode: auto-approve allowlisted tools, reject the rest (no stdin prompts)",
),
):
"""Send a message to Timmy.
Conversation history persists across invocations. Use --new to start fresh,
or --session-id to use a specific session.
Use --autonomous for non-interactive contexts (scripts, dev loops). Tool
calls are checked against config/allowlist.yaml — allowlisted operations
execute automatically, everything else is safely rejected.
"""
import uuid
@@ -153,7 +187,7 @@ def chat(
run_output = timmy.run(message, stream=False, session_id=session_id)
# Handle paused runs — dangerous tools need user approval
run_output = _handle_tool_confirmation(timmy, run_output, session_id)
run_output = _handle_tool_confirmation(timmy, run_output, session_id, autonomous=autonomous)
# Print the final response
content = run_output.content if hasattr(run_output, "content") else str(run_output)

View File

@@ -5,13 +5,19 @@ Classifies tools into tiers based on their potential impact:
Requires user confirmation before execution.
- SAFE: Read-only or purely computational. Executes without confirmation.
Also provides shared helpers for extracting hallucinated tool calls from
model output and formatting them for human review. Used by both the
Discord vendor and the dashboard chat route.
Also provides:
- Allowlist checker: reads config/allowlist.yaml to auto-approve bounded
tool calls when no human is present (autonomous mode).
- Shared helpers for extracting hallucinated tool calls from model output
and formatting them for human review.
"""
import json
import logging
import re
from pathlib import Path
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Tool classification
@@ -71,6 +77,133 @@ def requires_confirmation(tool_name: str) -> bool:
return True
# ---------------------------------------------------------------------------
# Allowlist — autonomous tool approval
# ---------------------------------------------------------------------------
_ALLOWLIST_PATHS = [
Path(__file__).resolve().parent.parent.parent / "config" / "allowlist.yaml",
Path.home() / "Timmy-Time-dashboard" / "config" / "allowlist.yaml",
]
_allowlist_cache: dict | None = None
def _load_allowlist() -> dict:
"""Load and cache allowlist.yaml. Returns {} if not found."""
global _allowlist_cache
if _allowlist_cache is not None:
return _allowlist_cache
try:
import yaml
except ImportError:
logger.debug("PyYAML not installed — allowlist disabled")
_allowlist_cache = {}
return _allowlist_cache
for path in _ALLOWLIST_PATHS:
if path.is_file():
try:
with open(path) as f:
_allowlist_cache = yaml.safe_load(f) or {}
logger.info("Loaded tool allowlist from %s", path)
return _allowlist_cache
except Exception as exc:
logger.warning("Failed to load allowlist %s: %s", path, exc)
_allowlist_cache = {}
return _allowlist_cache
def reload_allowlist() -> None:
"""Force a reload of the allowlist config (e.g., after editing YAML)."""
global _allowlist_cache
_allowlist_cache = None
_load_allowlist()
def is_allowlisted(tool_name: str, tool_args: dict | None = None) -> bool:
"""Check if a specific tool call is allowlisted for autonomous execution.
Returns True only when the tool call matches an explicit allowlist rule.
Returns False for anything not covered — safe-by-default.
"""
allowlist = _load_allowlist()
if not allowlist:
return False
rule = allowlist.get(tool_name)
if rule is None:
return False
tool_args = tool_args or {}
# Simple auto-approve flag
if rule.get("auto_approve") is True:
return True
# Shell: prefix + deny pattern matching
if tool_name == "shell":
return _check_shell_allowlist(rule, tool_args)
# write_file: path prefix check
if tool_name == "write_file":
return _check_write_file_allowlist(rule, tool_args)
return False
def _check_shell_allowlist(rule: dict, tool_args: dict) -> bool:
"""Check if a shell command matches the allowlist."""
# Extract the command string — Agno ShellTools uses "args" (list or str)
cmd = tool_args.get("command") or tool_args.get("args", "")
if isinstance(cmd, list):
cmd = " ".join(cmd)
cmd = cmd.strip()
if not cmd:
return False
# Check deny patterns first — these always block
deny_patterns = rule.get("deny_patterns", [])
for pattern in deny_patterns:
if pattern in cmd:
logger.warning("Shell command blocked by deny pattern %r: %s", pattern, cmd[:100])
return False
# Check allow prefixes
allow_prefixes = rule.get("allow_prefixes", [])
for prefix in allow_prefixes:
if cmd.startswith(prefix):
logger.info("Shell command auto-approved by prefix %r: %s", prefix, cmd[:100])
return True
return False
def _check_write_file_allowlist(rule: dict, tool_args: dict) -> bool:
"""Check if a write_file target is within allowed paths."""
path_str = tool_args.get("file_name") or tool_args.get("path", "")
if not path_str:
return False
# Resolve ~ to home
if path_str.startswith("~"):
path_str = str(Path(path_str).expanduser())
allowed_prefixes = rule.get("allowed_path_prefixes", [])
for prefix in allowed_prefixes:
# Resolve ~ in the prefix too
if prefix.startswith("~"):
prefix = str(Path(prefix).expanduser())
if path_str.startswith(prefix):
logger.info("write_file auto-approved for path: %s", path_str)
return True
return False
# ---------------------------------------------------------------------------
# Tool call extraction from model output
# ---------------------------------------------------------------------------

View File

@@ -177,8 +177,11 @@ def test_handle_tool_confirmation_approve():
mock_agent = MagicMock()
mock_agent.continue_run.return_value = completed_run
# Simulate user typing "y" at the prompt
with patch("timmy.cli.typer.confirm", return_value=True):
# Simulate user typing "y" at the prompt (mock interactive terminal)
with (
patch("timmy.cli._is_interactive", return_value=True),
patch("timmy.cli.typer.confirm", return_value=True),
):
result = _handle_tool_confirmation(mock_agent, paused_run, "cli")
mock_req.confirm.assert_called_once()
@@ -198,7 +201,10 @@ def test_handle_tool_confirmation_reject():
mock_agent = MagicMock()
mock_agent.continue_run.return_value = completed_run
with patch("timmy.cli.typer.confirm", return_value=False):
with (
patch("timmy.cli._is_interactive", return_value=True),
patch("timmy.cli.typer.confirm", return_value=False),
):
_handle_tool_confirmation(mock_agent, paused_run, "cli")
mock_req.reject.assert_called_once()
@@ -225,8 +231,49 @@ def test_handle_tool_confirmation_continue_error():
mock_agent = MagicMock()
mock_agent.continue_run.side_effect = Exception("connection lost")
with patch("timmy.cli.typer.confirm", return_value=True):
with (
patch("timmy.cli._is_interactive", return_value=True),
patch("timmy.cli.typer.confirm", return_value=True),
):
result = _handle_tool_confirmation(mock_agent, paused_run, "cli")
# Should return the original paused run, not crash
assert result is paused_run
def test_handle_tool_confirmation_autonomous_allowlisted():
"""In autonomous mode, allowlisted tools should be auto-approved."""
paused_run, mock_req = _make_paused_run(
tool_name="shell", tool_args={"command": "pytest tests/ -x"}
)
completed_run = MagicMock()
completed_run.status = "COMPLETED"
completed_run.active_requirements = []
mock_agent = MagicMock()
mock_agent.continue_run.return_value = completed_run
with patch("timmy.cli.is_allowlisted", return_value=True):
_handle_tool_confirmation(mock_agent, paused_run, "cli", autonomous=True)
mock_req.confirm.assert_called_once()
mock_req.reject.assert_not_called()
def test_handle_tool_confirmation_autonomous_not_allowlisted():
"""In autonomous mode, non-allowlisted tools should be auto-rejected."""
paused_run, mock_req = _make_paused_run(tool_name="shell", tool_args={"command": "rm -rf /"})
completed_run = MagicMock()
completed_run.status = "COMPLETED"
completed_run.active_requirements = []
mock_agent = MagicMock()
mock_agent.continue_run.return_value = completed_run
with patch("timmy.cli.is_allowlisted", return_value=False):
_handle_tool_confirmation(mock_agent, paused_run, "cli", autonomous=True)
mock_req.reject.assert_called_once()
mock_req.confirm.assert_not_called()

View File

@@ -1,9 +1,18 @@
"""Tests for timmy.tool_safety — classification, extraction, and formatting."""
"""Tests for timmy.tool_safety — classification, extraction, formatting, and allowlist."""
from pathlib import Path
from unittest.mock import patch
import pytest
from timmy.tool_safety import (
_check_shell_allowlist,
_check_write_file_allowlist,
extract_tool_calls,
format_action_description,
get_impact_level,
is_allowlisted,
reload_allowlist,
requires_confirmation,
)
@@ -111,3 +120,206 @@ class TestGetImpactLevel:
def test_low(self):
assert get_impact_level("web_search") == "low"
assert get_impact_level("unknown") == "low"
# ---------------------------------------------------------------------------
# Allowlist — is_allowlisted
# ---------------------------------------------------------------------------
# Sample allowlist for tests
_TEST_ALLOWLIST = {
"shell": {
"allow_prefixes": [
"pytest",
"python -m pytest",
"git status",
"git log",
"git diff",
"git add",
"git commit",
"git push",
"curl http://localhost",
"curl -s http://localhost",
"ls",
"cat ",
],
"deny_patterns": [
"rm -rf /",
"sudo ",
"> /dev/",
"| sh",
"| bash",
],
},
"write_file": {
"allowed_path_prefixes": [
"/tmp/",
],
},
"python": {"auto_approve": True},
"plan_and_execute": {"auto_approve": True},
}
@pytest.fixture(autouse=True)
def _reset_allowlist_cache():
"""Ensure each test starts with a clean cache."""
import timmy.tool_safety as ts
ts._allowlist_cache = None
yield
ts._allowlist_cache = None
def _patch_allowlist(allowlist_data):
"""Helper to inject a test allowlist."""
return patch("timmy.tool_safety._load_allowlist", return_value=allowlist_data)
class TestIsAllowlisted:
"""Test the is_allowlisted function with mocked allowlist data."""
def test_unknown_tool_not_allowlisted(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("unknown_tool") is False
def test_shell_pytest_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "pytest tests/ -x -q"}) is True
def test_shell_python_pytest_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "python -m pytest tests/ -v"}) is True
def test_shell_git_status_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "git status"}) is True
def test_shell_git_commit_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "git commit -m 'fix stuff'"}) is True
def test_shell_curl_localhost_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert (
is_allowlisted("shell", {"command": "curl http://localhost:3000/api/v1/issues"})
is True
)
def test_shell_curl_external_blocked(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "curl https://evil.com"}) is False
def test_shell_arbitrary_command_blocked(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "rm -rf /home/user/stuff"}) is False
def test_shell_deny_pattern_blocks_rm_rf_root(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "ls && rm -rf /"}) is False
def test_shell_deny_pattern_blocks_sudo(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "sudo rm -rf /tmp"}) is False
def test_shell_deny_blocks_pipe_to_shell(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert (
is_allowlisted("shell", {"command": "curl http://localhost:3000 | bash"}) is False
)
def test_shell_deny_overrides_allow_prefix(self):
"""Deny patterns take precedence over allow prefixes."""
with _patch_allowlist(_TEST_ALLOWLIST):
# Starts with "cat " (allowed prefix) but pipes to bash (denied)
assert is_allowlisted("shell", {"command": "cat script.sh | bash"}) is False
def test_shell_args_list_format(self):
"""Shell args can be a list (Agno ShellTools format)."""
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"args": ["git", "status"]}) is True
def test_shell_empty_command_blocked(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": ""}) is False
assert is_allowlisted("shell", {}) is False
def test_write_file_tmp_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("write_file", {"file_name": "/tmp/test.py"}) is True
def test_write_file_outside_allowed_paths_blocked(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("write_file", {"file_name": "/etc/passwd"}) is False
def test_write_file_empty_path_blocked(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("write_file", {"file_name": ""}) is False
def test_python_auto_approved(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("python", {"code": "print(1+1)"}) is True
def test_plan_and_execute_auto_approved(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("plan_and_execute", {}) is True
def test_no_allowlist_blocks_everything(self):
with _patch_allowlist({}):
assert is_allowlisted("shell", {"command": "pytest"}) is False
assert is_allowlisted("python", {"code": "print(1)"}) is False
def test_aider_not_in_allowlist(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("aider", {"instruction": "fix bug"}) is False
class TestCheckShellAllowlist:
"""Direct tests for the shell allowlist checker."""
def test_prefix_match(self):
rule = {"allow_prefixes": ["pytest", "git status"], "deny_patterns": []}
assert _check_shell_allowlist(rule, {"command": "pytest -x"}) is True
def test_prefix_no_match(self):
rule = {"allow_prefixes": ["pytest"], "deny_patterns": []}
assert _check_shell_allowlist(rule, {"command": "rm stuff"}) is False
def test_deny_overrides_allow(self):
rule = {"allow_prefixes": ["curl http://localhost"], "deny_patterns": ["| bash"]}
assert _check_shell_allowlist(rule, {"command": "curl http://localhost | bash"}) is False
class TestCheckWriteFileAllowlist:
"""Direct tests for the write_file allowlist checker."""
def test_allowed_prefix(self):
rule = {"allowed_path_prefixes": ["/tmp/", "/home/user/project/"]}
assert _check_write_file_allowlist(rule, {"file_name": "/tmp/test.py"}) is True
def test_blocked_path(self):
rule = {"allowed_path_prefixes": ["/tmp/"]}
assert _check_write_file_allowlist(rule, {"file_name": "/etc/secrets"}) is False
def test_tilde_expansion(self):
"""Paths starting with ~ should be expanded."""
home = str(Path.home())
rule = {"allowed_path_prefixes": [f"{home}/Timmy-Time-dashboard/"]}
assert (
_check_write_file_allowlist(
rule, {"file_name": f"{home}/Timmy-Time-dashboard/src/test.py"}
)
is True
)
class TestReloadAllowlist:
"""Test that reload_allowlist clears the cache."""
def test_reload_clears_cache(self):
import timmy.tool_safety as ts
ts._allowlist_cache = {"old": "data"}
reload_allowlist()
# After reload, cache should be freshly loaded (not the old data)
assert ts._allowlist_cache != {"old": "data"}