rescue: WS heartbeat ping + commitment tracking from stale PRs (#415)
## What Manually integrated unique code from two stale PRs that were **not** superseded by merged work. ### PR #399 (kimi/issue-362) — WebSocket heartbeat ping - 15-second ping loop detects dead iPad/Safari connections - `_heartbeat()` coroutine launched as background task per WS client - `ping_task` properly cancelled on disconnect ### PR #408 (kimi/issue-322) — Conversation commitment tracking - Regex extraction of commitments from Timmy replies (`I'll` / `I will` / `Let me`) - `_record_commitments()` stores with dedup + cap at 10 - `_tick_commitments()` increments message counter per commitment - `_build_commitment_context()` surfaces overdue commitments as grounding context - Wired into `_bark_and_broadcast()` and `_generate_bark()` - Public API: `get_commitments()`, `close_commitment()`, `reset_commitments()` ### Tests 22 new tests covering both features: extraction, recording, dedup, caps, tick/context, integration, heartbeat ping, dead connection handling. --- This PR rescues unique code from stale PRs #399 and #408. The other two stale PRs (#402, #411) were already superseded by merged work and should be closed. Co-authored-by: Perplexity Computer <perplexity@tower.dev> Reviewed-on: #415 Co-authored-by: Perplexity Computer <perplexity@tower.local> Co-committed-by: Perplexity Computer <perplexity@tower.local>
This commit was merged in pull request #415.
This commit is contained in:
@@ -17,6 +17,7 @@ or missing.
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from collections import deque
|
||||
from datetime import UTC, datetime
|
||||
@@ -45,6 +46,86 @@ _conversation: deque[dict] = deque(maxlen=_MAX_EXCHANGES)
|
||||
|
||||
_WORKSHOP_SESSION_ID = "workshop"
|
||||
|
||||
_HEARTBEAT_INTERVAL = 15 # seconds — ping to detect dead iPad/Safari connections
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Conversation grounding — commitment tracking (rescued from PR #408)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Patterns that indicate Timmy is committing to an action.
|
||||
_COMMITMENT_PATTERNS: list[re.Pattern[str]] = [
|
||||
re.compile(r"I'll (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
|
||||
re.compile(r"I will (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
|
||||
re.compile(r"[Ll]et me (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
|
||||
]
|
||||
|
||||
# After this many messages without follow-up, surface open commitments.
|
||||
_REMIND_AFTER = 5
|
||||
_MAX_COMMITMENTS = 10
|
||||
|
||||
# In-memory list of open commitments.
|
||||
# Each entry: {"text": str, "created_at": float, "messages_since": int}
|
||||
_commitments: list[dict] = []
|
||||
|
||||
|
||||
def _extract_commitments(text: str) -> list[str]:
|
||||
"""Pull commitment phrases from Timmy's reply text."""
|
||||
found: list[str] = []
|
||||
for pattern in _COMMITMENT_PATTERNS:
|
||||
for match in pattern.finditer(text):
|
||||
phrase = match.group(1).strip()
|
||||
if len(phrase) > 5: # skip trivially short matches
|
||||
found.append(phrase[:120])
|
||||
return found
|
||||
|
||||
|
||||
def _record_commitments(reply: str) -> None:
|
||||
"""Scan a Timmy reply for commitments and store them."""
|
||||
for phrase in _extract_commitments(reply):
|
||||
# Avoid near-duplicate commitments
|
||||
if any(c["text"] == phrase for c in _commitments):
|
||||
continue
|
||||
_commitments.append({"text": phrase, "created_at": time.time(), "messages_since": 0})
|
||||
if len(_commitments) > _MAX_COMMITMENTS:
|
||||
_commitments.pop(0)
|
||||
|
||||
|
||||
def _tick_commitments() -> None:
|
||||
"""Increment messages_since for every open commitment."""
|
||||
for c in _commitments:
|
||||
c["messages_since"] += 1
|
||||
|
||||
|
||||
def _build_commitment_context() -> str:
|
||||
"""Return a grounding note if any commitments are overdue for follow-up."""
|
||||
overdue = [c for c in _commitments if c["messages_since"] >= _REMIND_AFTER]
|
||||
if not overdue:
|
||||
return ""
|
||||
lines = [f"- {c['text']}" for c in overdue]
|
||||
return (
|
||||
"[Open commitments Timmy made earlier — "
|
||||
"weave awareness naturally, don't list robotically]\n" + "\n".join(lines)
|
||||
)
|
||||
|
||||
|
||||
def close_commitment(index: int) -> bool:
|
||||
"""Remove a commitment by index. Returns True if removed."""
|
||||
if 0 <= index < len(_commitments):
|
||||
_commitments.pop(index)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_commitments() -> list[dict]:
|
||||
"""Return a copy of open commitments (for testing / API)."""
|
||||
return list(_commitments)
|
||||
|
||||
|
||||
def reset_commitments() -> None:
|
||||
"""Clear all commitments (for testing / session reset)."""
|
||||
_commitments.clear()
|
||||
|
||||
|
||||
# Conversation grounding — anchor to opening topic so Timmy doesn't drift.
|
||||
_ground_topic: str | None = None
|
||||
_ground_set_at: float = 0.0
|
||||
@@ -127,13 +208,30 @@ async def get_world_state() -> JSONResponse:
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _heartbeat(websocket: WebSocket) -> None:
|
||||
"""Send periodic pings to detect dead connections (iPad resilience).
|
||||
|
||||
Safari suspends background tabs, killing the TCP socket silently.
|
||||
A 15-second ping ensures we notice within one interval.
|
||||
|
||||
Rescued from stale PR #399.
|
||||
"""
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(_HEARTBEAT_INTERVAL)
|
||||
await websocket.send_text(json.dumps({"type": "ping"}))
|
||||
except Exception:
|
||||
pass # connection gone — receive loop will clean up
|
||||
|
||||
|
||||
@router.websocket("/ws")
|
||||
async def world_ws(websocket: WebSocket) -> None:
|
||||
"""Accept a Workshop client and keep it alive for state broadcasts.
|
||||
|
||||
Sends a full ``world_state`` snapshot immediately on connect so the
|
||||
client never starts from a blank slate. Incoming frames are parsed
|
||||
as JSON — ``visitor_message`` triggers a bark response.
|
||||
as JSON — ``visitor_message`` triggers a bark response. A background
|
||||
heartbeat ping runs every 15 s to detect dead connections early.
|
||||
"""
|
||||
await websocket.accept()
|
||||
_ws_clients.append(websocket)
|
||||
@@ -145,6 +243,8 @@ async def world_ws(websocket: WebSocket) -> None:
|
||||
await websocket.send_text(json.dumps({"type": "world_state", **snapshot}))
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to send WS snapshot: %s", exc)
|
||||
|
||||
ping_task = asyncio.create_task(_heartbeat(websocket))
|
||||
try:
|
||||
while True:
|
||||
raw = await websocket.receive_text()
|
||||
@@ -152,6 +252,7 @@ async def world_ws(websocket: WebSocket) -> None:
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
ping_task.cancel()
|
||||
if websocket in _ws_clients:
|
||||
_ws_clients.remove(websocket)
|
||||
logger.info("World WS disconnected — %d clients", len(_ws_clients))
|
||||
@@ -242,7 +343,9 @@ async def _bark_and_broadcast(visitor_text: str) -> None:
|
||||
pass # Pip is optional
|
||||
|
||||
_refresh_ground(visitor_text)
|
||||
_tick_commitments()
|
||||
reply = await _generate_bark(visitor_text)
|
||||
_record_commitments(reply)
|
||||
|
||||
_conversation.append({"visitor": visitor_text, "timmy": reply})
|
||||
|
||||
@@ -269,8 +372,11 @@ async def _generate_bark(visitor_text: str) -> str:
|
||||
from timmy import session as _session
|
||||
|
||||
grounded = visitor_text
|
||||
commitment_ctx = _build_commitment_context()
|
||||
if commitment_ctx:
|
||||
grounded = f"{commitment_ctx}\n{grounded}"
|
||||
if _ground_topic and visitor_text != _ground_topic:
|
||||
grounded = f"[Workshop conversation topic: {_ground_topic}]\n{visitor_text}"
|
||||
grounded = f"[Workshop conversation topic: {_ground_topic}]\n{grounded}"
|
||||
response = await _session.chat(grounded, session_id=_WORKSHOP_SESSION_ID)
|
||||
return response
|
||||
except Exception as exc:
|
||||
|
||||
@@ -7,19 +7,31 @@ from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import asyncio
|
||||
|
||||
from dashboard.routes.world import (
|
||||
_GROUND_TTL,
|
||||
_REMIND_AFTER,
|
||||
_STALE_THRESHOLD,
|
||||
_bark_and_broadcast,
|
||||
_broadcast,
|
||||
_build_commitment_context,
|
||||
_build_world_state,
|
||||
_commitments,
|
||||
_conversation,
|
||||
_extract_commitments,
|
||||
_generate_bark,
|
||||
_handle_client_message,
|
||||
_heartbeat,
|
||||
_log_bark_failure,
|
||||
_read_presence_file,
|
||||
_record_commitments,
|
||||
_refresh_ground,
|
||||
_tick_commitments,
|
||||
broadcast_world_state,
|
||||
close_commitment,
|
||||
get_commitments,
|
||||
reset_commitments,
|
||||
reset_conversation_ground,
|
||||
)
|
||||
|
||||
@@ -506,3 +518,206 @@ class TestConversationGrounding:
|
||||
finally:
|
||||
_ws_clients.clear()
|
||||
_conversation.clear()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Conversation grounding — commitment tracking (rescued from PR #408)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture(autouse=False)
|
||||
def _clean_commitments():
|
||||
"""Reset commitments before and after each commitment test."""
|
||||
reset_commitments()
|
||||
yield
|
||||
reset_commitments()
|
||||
|
||||
|
||||
class TestExtractCommitments:
|
||||
def test_extracts_ill_pattern(self):
|
||||
text = "I'll draft the skeleton ticket in 30 minutes."
|
||||
result = _extract_commitments(text)
|
||||
assert len(result) == 1
|
||||
assert "draft the skeleton ticket" in result[0]
|
||||
|
||||
def test_extracts_i_will_pattern(self):
|
||||
result = _extract_commitments("I will review that PR tomorrow.")
|
||||
assert len(result) == 1
|
||||
assert "review that PR tomorrow" in result[0]
|
||||
|
||||
def test_extracts_let_me_pattern(self):
|
||||
result = _extract_commitments("Let me write up a summary for you.")
|
||||
assert len(result) == 1
|
||||
assert "write up a summary" in result[0]
|
||||
|
||||
def test_skips_short_matches(self):
|
||||
result = _extract_commitments("I'll do it.")
|
||||
# "do it" is 5 chars — should be skipped (needs > 5)
|
||||
assert result == []
|
||||
|
||||
def test_no_commitments_in_normal_text(self):
|
||||
result = _extract_commitments("The weather is nice today.")
|
||||
assert result == []
|
||||
|
||||
def test_truncates_long_commitments(self):
|
||||
long_phrase = "a" * 200
|
||||
result = _extract_commitments(f"I'll {long_phrase}.")
|
||||
assert len(result) == 1
|
||||
assert len(result[0]) == 120
|
||||
|
||||
|
||||
class TestRecordCommitments:
|
||||
def test_records_new_commitment(self, _clean_commitments):
|
||||
_record_commitments("I'll draft the ticket now.")
|
||||
assert len(get_commitments()) == 1
|
||||
assert get_commitments()[0]["messages_since"] == 0
|
||||
|
||||
def test_avoids_duplicate_commitments(self, _clean_commitments):
|
||||
_record_commitments("I'll draft the ticket now.")
|
||||
_record_commitments("I'll draft the ticket now.")
|
||||
assert len(get_commitments()) == 1
|
||||
|
||||
def test_caps_at_max(self, _clean_commitments):
|
||||
from dashboard.routes.world import _MAX_COMMITMENTS
|
||||
|
||||
for i in range(_MAX_COMMITMENTS + 3):
|
||||
_record_commitments(f"I'll handle commitment number {i} right away.")
|
||||
assert len(get_commitments()) <= _MAX_COMMITMENTS
|
||||
|
||||
|
||||
class TestTickAndContext:
|
||||
def test_tick_increments_messages_since(self, _clean_commitments):
|
||||
_commitments.append({"text": "write the docs", "created_at": 0, "messages_since": 0})
|
||||
_tick_commitments()
|
||||
_tick_commitments()
|
||||
assert _commitments[0]["messages_since"] == 2
|
||||
|
||||
def test_context_empty_when_no_overdue(self, _clean_commitments):
|
||||
_commitments.append({"text": "write the docs", "created_at": 0, "messages_since": 0})
|
||||
assert _build_commitment_context() == ""
|
||||
|
||||
def test_context_surfaces_overdue_commitments(self, _clean_commitments):
|
||||
_commitments.append(
|
||||
{
|
||||
"text": "draft the skeleton ticket",
|
||||
"created_at": 0,
|
||||
"messages_since": _REMIND_AFTER,
|
||||
}
|
||||
)
|
||||
ctx = _build_commitment_context()
|
||||
assert "draft the skeleton ticket" in ctx
|
||||
assert "Open commitments" in ctx
|
||||
|
||||
def test_context_only_includes_overdue(self, _clean_commitments):
|
||||
_commitments.append({"text": "recent thing", "created_at": 0, "messages_since": 1})
|
||||
_commitments.append(
|
||||
{
|
||||
"text": "old thing",
|
||||
"created_at": 0,
|
||||
"messages_since": _REMIND_AFTER,
|
||||
}
|
||||
)
|
||||
ctx = _build_commitment_context()
|
||||
assert "old thing" in ctx
|
||||
assert "recent thing" not in ctx
|
||||
|
||||
|
||||
class TestCloseCommitment:
|
||||
def test_close_valid_index(self, _clean_commitments):
|
||||
_commitments.append({"text": "write the docs", "created_at": 0, "messages_since": 0})
|
||||
assert close_commitment(0) is True
|
||||
assert len(get_commitments()) == 0
|
||||
|
||||
def test_close_invalid_index(self, _clean_commitments):
|
||||
assert close_commitment(99) is False
|
||||
|
||||
|
||||
class TestGroundingIntegration:
|
||||
@pytest.mark.asyncio
|
||||
async def test_bark_records_commitments_from_reply(self, _clean_commitments):
|
||||
from dashboard.routes.world import _ws_clients
|
||||
|
||||
ws = AsyncMock()
|
||||
_ws_clients.append(ws)
|
||||
_conversation.clear()
|
||||
try:
|
||||
with patch(
|
||||
"timmy.session.chat",
|
||||
new_callable=AsyncMock,
|
||||
return_value="I'll draft the ticket for you!",
|
||||
):
|
||||
await _bark_and_broadcast("Can you help?")
|
||||
|
||||
assert len(get_commitments()) == 1
|
||||
assert "draft the ticket" in get_commitments()[0]["text"]
|
||||
finally:
|
||||
_ws_clients.clear()
|
||||
_conversation.clear()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bark_prepends_context_after_n_messages(self, _clean_commitments):
|
||||
"""After _REMIND_AFTER messages, commitment context is prepended."""
|
||||
_commitments.append(
|
||||
{
|
||||
"text": "draft the skeleton ticket",
|
||||
"created_at": 0,
|
||||
"messages_since": _REMIND_AFTER - 1,
|
||||
}
|
||||
)
|
||||
|
||||
with patch(
|
||||
"timmy.session.chat",
|
||||
new_callable=AsyncMock,
|
||||
return_value="Sure thing!",
|
||||
) as mock_chat:
|
||||
# This tick will push messages_since to _REMIND_AFTER
|
||||
await _generate_bark("Any updates?")
|
||||
# _generate_bark doesn't tick — _bark_and_broadcast does.
|
||||
# But we pre-set messages_since to _REMIND_AFTER - 1,
|
||||
# so we need to tick once to make it overdue.
|
||||
_tick_commitments()
|
||||
await _generate_bark("Any updates?")
|
||||
|
||||
# Second call should have context prepended
|
||||
last_call = mock_chat.call_args_list[-1]
|
||||
sent_text = last_call[0][0]
|
||||
assert "draft the skeleton ticket" in sent_text
|
||||
assert "Open commitments" in sent_text
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WebSocket heartbeat ping (rescued from PR #399)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_heartbeat_sends_ping():
|
||||
"""Heartbeat sends a ping JSON frame after the interval elapses."""
|
||||
ws = AsyncMock()
|
||||
|
||||
with patch("dashboard.routes.world.asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
|
||||
# Let the first sleep complete, then raise to exit the loop
|
||||
call_count = 0
|
||||
|
||||
async def sleep_side_effect(_interval):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count > 1:
|
||||
raise ConnectionError("stop")
|
||||
|
||||
mock_sleep.side_effect = sleep_side_effect
|
||||
await _heartbeat(ws)
|
||||
|
||||
ws.send_text.assert_called_once()
|
||||
msg = json.loads(ws.send_text.call_args[0][0])
|
||||
assert msg["type"] == "ping"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_heartbeat_exits_on_dead_connection():
|
||||
"""Heartbeat exits cleanly when the WebSocket is dead."""
|
||||
ws = AsyncMock()
|
||||
ws.send_text.side_effect = ConnectionError("gone")
|
||||
|
||||
with patch("dashboard.routes.world.asyncio.sleep", new_callable=AsyncMock):
|
||||
await _heartbeat(ws) # should not raise
|
||||
|
||||
Reference in New Issue
Block a user