import logging import subprocess import typer from timmy.agent import create_timmy from timmy.prompts import STATUS_PROMPT from timmy.tool_safety import format_action_description, get_impact_level logger = logging.getLogger(__name__) app = typer.Typer(help="Timmy — sovereign AI agent") # Stable session ID — Agno persists conversation history in SQLite keyed on this. # Every `timmy chat` invocation reuses the same session so context carries over. _CLI_SESSION_ID = "cli" # Shared option definitions (reused across commands for consistency). _BACKEND_OPTION = typer.Option( None, "--backend", "-b", help="Inference backend: 'ollama' (default) | 'airllm' | 'auto'", ) _MODEL_SIZE_OPTION = typer.Option( None, "--model-size", "-s", help="AirLLM model size when --backend airllm: '8b' | '70b' | '405b'", ) def _handle_tool_confirmation(agent, run_output, session_id: str): """Prompt user to approve/reject dangerous tool calls. When Agno pauses a run because a tool requires confirmation, this function displays the action, asks for approval via stdin, and resumes or rejects the run accordingly. Returns the final RunOutput after all confirmations are resolved. """ max_rounds = 10 # safety limit for _ in range(max_rounds): status = getattr(run_output, "status", None) is_paused = status == "PAUSED" or str(status) == "RunStatus.paused" if not is_paused: return run_output reqs = getattr(run_output, "active_requirements", None) or [] if not reqs: return run_output for req in reqs: if not getattr(req, "needs_confirmation", False): continue te = req.tool_execution tool_name = getattr(te, "tool_name", "unknown") tool_args = getattr(te, "tool_args", {}) or {} description = format_action_description(tool_name, tool_args) impact = get_impact_level(tool_name) typer.echo() typer.echo(typer.style("Tool confirmation required", bold=True)) typer.echo(f" Impact: {impact.upper()}") typer.echo(f" {description}") typer.echo() approved = typer.confirm("Allow this action?", default=False) if approved: req.confirm() logger.info("CLI: approved %s", tool_name) else: req.reject(note="User rejected from CLI") logger.info("CLI: rejected %s", tool_name) # Resume the run so the agent sees the confirmation result try: run_output = agent.continue_run( run_response=run_output, stream=False, session_id=session_id ) except Exception as exc: logger.error("CLI: continue_run failed: %s", exc) typer.echo(f"Error resuming: {exc}", err=True) return run_output return run_output @app.command() def tick( prompt: str | None = typer.Argument( None, help="Optional journal prompt for Timmy to reflect on" ), ): """Run one thinking cycle. Pass a prompt to ask Timmy a specific question.""" import asyncio from timmy.thinking import thinking_engine thought = asyncio.run(thinking_engine.think_once(prompt=prompt)) if thought: typer.echo(f"[{thought.seed_type}] {thought.content}") else: typer.echo("No thought produced (thinking disabled or Ollama down).") @app.command() def think( topic: str = typer.Argument(..., help="Topic to reason about"), backend: str | None = _BACKEND_OPTION, model_size: str | None = _MODEL_SIZE_OPTION, ): """Ask Timmy to think carefully about a topic.""" timmy = create_timmy(backend=backend, model_size=model_size) timmy.print_response(f"Think carefully about: {topic}", stream=True, session_id=_CLI_SESSION_ID) @app.command() def chat( message: str = typer.Argument(..., help="Message to send"), backend: str | None = _BACKEND_OPTION, model_size: str | None = _MODEL_SIZE_OPTION, new_session: bool = typer.Option( False, "--new", "-n", help="Start a fresh conversation (ignore prior context)", ), ): """Send a message to Timmy. Conversation history persists across invocations. Use --new to start fresh. """ import uuid session_id = str(uuid.uuid4()) if new_session else _CLI_SESSION_ID timmy = create_timmy(backend=backend, model_size=model_size) # Use agent.run() so we can intercept paused runs for tool confirmation. run_output = timmy.run(message, stream=False, session_id=session_id) # Handle paused runs — dangerous tools need user approval run_output = _handle_tool_confirmation(timmy, run_output, session_id) # Print the final response content = run_output.content if hasattr(run_output, "content") else str(run_output) if content: from timmy.session import _clean_response typer.echo(_clean_response(content)) @app.command() def status( backend: str | None = _BACKEND_OPTION, model_size: str | None = _MODEL_SIZE_OPTION, ): """Print Timmy's operational status.""" timmy = create_timmy(backend=backend, model_size=model_size) timmy.print_response(STATUS_PROMPT, stream=False, session_id=_CLI_SESSION_ID) @app.command() def interview( backend: str | None = _BACKEND_OPTION, model_size: str | None = _MODEL_SIZE_OPTION, ): """Initialize Timmy and run a structured interview. Asks Timmy a series of questions about his identity, capabilities, values, and operation to verify he is working correctly. """ import asyncio from timmy.interview import InterviewEntry, format_transcript, run_interview from timmy.session import chat typer.echo("Initializing Timmy for interview...\n") # Use a single persistent event loop for the entire interview. # Calling asyncio.run() per question kills the loop between calls, # orphaning MCP stdio transports and causing "Event loop is closed." loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # Force agent creation by calling chat once with a warm-up prompt try: loop.run_until_complete( chat("Hello, Timmy. We're about to start your interview.", session_id="interview") ) except Exception as exc: typer.echo(f"Warning: Initialization issue — {exc}", err=True) def _on_answer(entry: InterviewEntry) -> None: typer.echo(f"[{entry.category}]") typer.echo(f" Q: {entry.question}") typer.echo(f" A: {entry.answer}") typer.echo() typer.echo("Starting interview...\n") transcript = run_interview( chat_fn=lambda msg: loop.run_until_complete(chat(msg, session_id="interview")), on_answer=_on_answer, ) # Print full transcript at the end typer.echo("\n" + format_transcript(transcript)) finally: # Clean shutdown: close MCP sessions, then the loop try: from timmy.mcp_tools import close_mcp_sessions loop.run_until_complete(close_mcp_sessions()) except Exception: pass loop.close() @app.command() def up( dev: bool = typer.Option(False, "--dev", help="Enable hot-reload for development"), build: bool = typer.Option(True, "--build/--no-build", help="Rebuild images before starting"), ): """Start Timmy Time in Docker (dashboard + agents).""" cmd = ["docker", "compose"] if dev: cmd += ["-f", "docker-compose.yml", "-f", "docker-compose.dev.yml"] cmd += ["up", "-d"] if build: cmd.append("--build") mode = "dev mode (hot-reload active)" if dev else "production mode" typer.echo(f"Starting Timmy Time in {mode}...") result = subprocess.run(cmd) if result.returncode == 0: typer.echo(f"\n Timmy Time running at http://localhost:8000 ({mode})\n") else: typer.echo("Failed to start. Is Docker running?", err=True) raise typer.Exit(1) @app.command() def down(): """Stop all Timmy Time Docker containers.""" subprocess.run(["docker", "compose", "down"], check=True) def main(): app()