Files
Timmy-time-dashboard/src/timmy/workshop_state.py
kimi 59c182cdfa
All checks were successful
Tests / lint (pull_request) Successful in 5s
Tests / test (pull_request) Successful in 58s
fix: wire Pip familiar into Workshop state pipeline
Pip's backend behavioral AI (state machine, mood mirroring, event
reactions) was never connected to the world state API. The frontend
Pip wandered randomly with no connection to the backend.

- Add _pip_snapshot() to workshop_state.py — ticks Pip and feeds
  Timmy's mood/confidence into Pip's behavioral AI each heartbeat
- Include familiar snapshot in presence state dict (get_state_dict)
- Include familiar in _build_world_state() API response
- Trigger pip_familiar.on_event("visitor_spoke") when visitors chat

Fixes #222

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 03:04:17 -04:00

262 lines
8.5 KiB
Python

"""Workshop presence heartbeat — periodic writer for ``~/.timmy/presence.json``.
Maintains Timmy's observable presence state for the Workshop 3D renderer.
Writes the presence file every 30 seconds (or on cognitive state change),
skipping writes when state is unchanged.
See ADR-023 for the schema contract and issue #360 for the full v1 schema.
"""
import asyncio
import hashlib
import json
import logging
import time
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from pathlib import Path
logger = logging.getLogger(__name__)
PRESENCE_FILE = Path.home() / ".timmy" / "presence.json"
HEARTBEAT_INTERVAL = 30 # seconds
# Cognitive mood → presence mood mapping (issue #360 enum values)
_MOOD_MAP: dict[str, str] = {
"curious": "contemplative",
"settled": "calm",
"hesitant": "uncertain",
"energized": "excited",
}
# Activity mapping from cognitive engagement
_ACTIVITY_MAP: dict[str, str] = {
"idle": "idle",
"surface": "thinking",
"deep": "thinking",
}
# Module-level energy tracker — decays over time, resets on interaction
_energy_state: dict[str, float] = {"value": 0.8, "last_interaction": time.monotonic()}
# Startup timestamp for uptime calculation
_start_time = time.monotonic()
# Energy decay: 0.01 per minute without interaction (per issue #360)
_ENERGY_DECAY_PER_SECOND = 0.01 / 60.0
_ENERGY_MIN = 0.1
def _time_of_day(hour: int) -> str:
"""Map hour (0-23) to a time-of-day label."""
if 5 <= hour < 12:
return "morning"
if 12 <= hour < 17:
return "afternoon"
if 17 <= hour < 21:
return "evening"
if 21 <= hour or hour < 2:
return "night"
return "deep-night"
def reset_energy() -> None:
"""Reset energy to full (called on interaction)."""
_energy_state["value"] = 0.8
_energy_state["last_interaction"] = time.monotonic()
def _current_energy() -> float:
"""Compute current energy with time-based decay."""
elapsed = time.monotonic() - _energy_state["last_interaction"]
decayed = _energy_state["value"] - (elapsed * _ENERGY_DECAY_PER_SECOND)
return max(_ENERGY_MIN, min(1.0, decayed))
def _pip_snapshot(mood: str, confidence: float) -> dict:
"""Tick Pip and return his current snapshot dict.
Feeds Timmy's mood and confidence into Pip's behavioral AI so the
familiar reacts to Timmy's cognitive state.
"""
from timmy.familiar import pip_familiar
pip_familiar.on_mood_change(mood, confidence=confidence)
pip_familiar.tick()
return pip_familiar.snapshot().to_dict()
def get_state_dict() -> dict:
"""Build presence state dict from current cognitive state.
Returns a v1 presence schema dict suitable for JSON serialisation.
Includes the full schema from issue #360: identity, mood, activity,
attention, interaction, environment, and meta sections.
"""
from timmy.cognitive_state import cognitive_tracker
state = cognitive_tracker.get_state()
now = datetime.now(UTC)
# Map cognitive mood to presence mood
mood = _MOOD_MAP.get(state.mood, "calm")
if state.engagement == "idle" and state.mood == "settled":
mood = "calm"
# Confidence from cognitive tracker
if state._confidence_count > 0:
confidence = state._confidence_sum / state._confidence_count
else:
confidence = 0.7
# Build active threads from commitments
threads = []
for commitment in state.active_commitments[:10]:
threads.append({"type": "thinking", "ref": commitment[:80], "status": "active"})
# Activity
activity = _ACTIVITY_MAP.get(state.engagement, "idle")
# Environment
local_now = datetime.now()
return {
"version": 1,
"liveness": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"current_focus": state.focus_topic or "",
"active_threads": threads,
"recent_events": [],
"concerns": [],
"mood": mood,
"confidence": round(max(0.0, min(1.0, confidence)), 2),
"energy": round(_current_energy(), 2),
"identity": {
"name": "Timmy",
"title": "The Workshop Wizard",
"uptime_seconds": int(time.monotonic() - _start_time),
},
"activity": {
"current": activity,
"detail": state.focus_topic or "",
},
"interaction": {
"visitor_present": False,
"conversation_turns": state.conversation_depth,
},
"environment": {
"time_of_day": _time_of_day(local_now.hour),
"local_time": local_now.strftime("%-I:%M %p"),
"day_of_week": local_now.strftime("%A"),
},
"familiar": _pip_snapshot(mood, confidence),
"meta": {
"schema_version": 1,
"updated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"writer": "timmy-loop",
},
}
def write_state(state_dict: dict | None = None, path: Path | None = None) -> None:
"""Write presence state to ``~/.timmy/presence.json``.
Gracefully degrades if the file cannot be written.
"""
if state_dict is None:
state_dict = get_state_dict()
target = path or PRESENCE_FILE
try:
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(json.dumps(state_dict, indent=2) + "\n")
except OSError as exc:
logger.warning("Failed to write presence state: %s", exc)
def _state_hash(state_dict: dict) -> str:
"""Compute hash of state dict, ignoring volatile timestamps."""
stable = {k: v for k, v in state_dict.items() if k not in ("liveness", "meta")}
return hashlib.md5(json.dumps(stable, sort_keys=True).encode()).hexdigest()
class WorkshopHeartbeat:
"""Async background task that keeps ``presence.json`` fresh.
- Writes every ``interval`` seconds (default 30).
- Reacts to cognitive state changes via sensory bus.
- Skips write if state hasn't changed (hash comparison).
"""
def __init__(
self,
interval: int = HEARTBEAT_INTERVAL,
path: Path | None = None,
on_change: Callable[[dict], Awaitable[None]] | None = None,
) -> None:
self._interval = interval
self._path = path or PRESENCE_FILE
self._last_hash: str | None = None
self._task: asyncio.Task | None = None
self._trigger = asyncio.Event()
self._on_change = on_change
async def start(self) -> None:
"""Start the heartbeat background loop."""
self._subscribe_to_events()
self._task = asyncio.create_task(self._run())
async def stop(self) -> None:
"""Cancel the heartbeat task gracefully."""
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
def notify(self) -> None:
"""Signal an immediate state write (e.g. on cognitive state change)."""
self._trigger.set()
async def _run(self) -> None:
"""Main loop: write state on interval or trigger."""
await asyncio.sleep(1) # Initial stagger
while True:
try:
# Wait for interval OR early trigger
try:
await asyncio.wait_for(self._trigger.wait(), timeout=self._interval)
self._trigger.clear()
except TimeoutError:
pass # Normal periodic tick
await self._write_if_changed()
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Workshop heartbeat error: %s", exc)
async def _write_if_changed(self) -> None:
"""Build state, compare hash, write only if changed."""
state_dict = get_state_dict()
current_hash = _state_hash(state_dict)
if current_hash == self._last_hash:
return
self._last_hash = current_hash
write_state(state_dict, self._path)
if self._on_change:
try:
await self._on_change(state_dict)
except Exception as exc:
logger.warning("on_change callback failed: %s", exc)
def _subscribe_to_events(self) -> None:
"""Subscribe to cognitive state change events on the sensory bus."""
try:
from timmy.event_bus import get_sensory_bus
bus = get_sensory_bus()
bus.subscribe("cognitive_state_changed", lambda _: self.notify())
except Exception as exc:
logger.debug("Heartbeat event subscription skipped: %s", exc)