Merge branch 'main' into fix/gitea-mcp-binary-cycle-30
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -81,3 +81,4 @@ workspace/
|
||||
.LSOverride
|
||||
.Spotlight-V100
|
||||
.Trashes
|
||||
.timmy_gitea_token
|
||||
|
||||
26
AGENTS.md
26
AGENTS.md
@@ -42,15 +42,33 @@ Read [`CLAUDE.md`](CLAUDE.md) for architecture patterns and conventions.
|
||||
Every commit to `main` must arrive via a merged Pull Request. No agent, no human,
|
||||
no orchestrator pushes directly to main.
|
||||
|
||||
### Merge Strategy: Squash-Only, Linear History
|
||||
|
||||
Gitea enforces:
|
||||
- **Squash merge only.** No merge commits, no rebase merge. Every commit on
|
||||
main is a single squashed commit from a PR. Clean, linear, auditable.
|
||||
- **Branch must be up-to-date.** If a PR is behind main, it cannot merge.
|
||||
Rebase onto main, re-run tests, force-push the branch, then merge.
|
||||
- **Auto-delete branches** after merge. No stale branches.
|
||||
|
||||
### The Workflow
|
||||
```
|
||||
1. Create a feature branch: git checkout -b fix/my-thing
|
||||
2. Make changes, commit locally
|
||||
3. Run tests: python3 -m pytest tests/ -x -q
|
||||
3. Run tests: tox -e unit
|
||||
4. Push the branch: git push --no-verify origin fix/my-thing
|
||||
5. Create PR via Gitea API or UI
|
||||
6. Verify tests pass (orchestrator checks this)
|
||||
7. Merge PR via API or UI
|
||||
7. Merge PR via API: {"Do": "squash"}
|
||||
```
|
||||
|
||||
If behind main before merge:
|
||||
```
|
||||
1. git fetch origin main
|
||||
2. git rebase origin/main
|
||||
3. tox -e unit
|
||||
4. git push --force-with-lease --no-verify origin fix/my-thing
|
||||
5. Then merge the PR
|
||||
```
|
||||
|
||||
### Why This Exists
|
||||
@@ -62,8 +80,10 @@ to all active worktrees.
|
||||
|
||||
### Orchestrator Responsibilities
|
||||
The Hermes loop orchestrator must:
|
||||
- Run `pytest -x -q` in each worktree BEFORE committing
|
||||
- Run `tox -e unit` in each worktree BEFORE committing
|
||||
- Never push to main directly — always push a feature branch + PR
|
||||
- Always use `{"Do": "squash"}` when merging PRs via API
|
||||
- If a PR is behind main, rebase and re-test before merging
|
||||
- Verify test results before merging any PR
|
||||
- If tests fail, fix or reject — never merge red
|
||||
|
||||
|
||||
@@ -246,7 +246,7 @@ class Settings(BaseSettings):
|
||||
# Local Gitea instance for issue tracking and self-improvement.
|
||||
# These values are passed as env vars to the gitea-mcp server process.
|
||||
gitea_url: str = "http://localhost:3000"
|
||||
gitea_token: str = "" # GITEA_TOKEN env var; falls back to ~/.config/gitea/token
|
||||
gitea_token: str = "" # GITEA_TOKEN env var; falls back to .timmy_gitea_token
|
||||
gitea_repo: str = "rockachopa/Timmy-time-dashboard" # owner/repo
|
||||
gitea_enabled: bool = True
|
||||
|
||||
@@ -347,14 +347,19 @@ class Settings(BaseSettings):
|
||||
def model_post_init(self, __context) -> None:
|
||||
"""Post-init: resolve gitea_token from file if not set via env."""
|
||||
if not self.gitea_token:
|
||||
token_path = os.path.expanduser("~/.config/gitea/token")
|
||||
try:
|
||||
if os.path.isfile(token_path):
|
||||
token = open(token_path).read().strip() # noqa: SIM115
|
||||
if token:
|
||||
self.gitea_token = token
|
||||
except OSError:
|
||||
pass
|
||||
# Priority: Timmy's own token → legacy admin token
|
||||
repo_root = self._compute_repo_root()
|
||||
timmy_token_path = os.path.join(repo_root, ".timmy_gitea_token")
|
||||
legacy_token_path = os.path.expanduser("~/.config/gitea/token")
|
||||
for token_path in (timmy_token_path, legacy_token_path):
|
||||
try:
|
||||
if os.path.isfile(token_path):
|
||||
token = open(token_path).read().strip() # noqa: SIM115
|
||||
if token:
|
||||
self.gitea_token = token
|
||||
break
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
model_config = SettingsConfigDict(
|
||||
env_file=".env",
|
||||
|
||||
@@ -43,6 +43,7 @@ SEED_TYPES = (
|
||||
"freeform",
|
||||
"sovereignty",
|
||||
"observation",
|
||||
"workspace",
|
||||
)
|
||||
|
||||
# Existential reflection prompts — Timmy picks one at random
|
||||
@@ -263,6 +264,9 @@ class ThinkingEngine:
|
||||
# Post-hook: file Gitea issues for actionable observations
|
||||
await self._maybe_file_issues()
|
||||
|
||||
# Post-hook: check workspace for new messages from Hermes
|
||||
await self._check_workspace()
|
||||
|
||||
# Post-hook: update MEMORY.md with latest reflection
|
||||
self._update_memory(thought)
|
||||
|
||||
@@ -616,6 +620,29 @@ class ThinkingEngine:
|
||||
logger.debug("Task queue query failed: %s", exc)
|
||||
pass
|
||||
|
||||
# Workspace updates (file-based communication with Hermes)
|
||||
try:
|
||||
from timmy.workspace import workspace_monitor
|
||||
|
||||
updates = workspace_monitor.get_pending_updates()
|
||||
new_corr = updates.get("new_correspondence")
|
||||
new_inbox = updates.get("new_inbox_files", [])
|
||||
|
||||
if new_corr:
|
||||
# Count entries (assuming each entry starts with a timestamp or header)
|
||||
line_count = len([l for l in new_corr.splitlines() if l.strip()])
|
||||
parts.append(
|
||||
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
|
||||
)
|
||||
if new_inbox:
|
||||
files_str = ", ".join(new_inbox[:5])
|
||||
if len(new_inbox) > 5:
|
||||
files_str += f", ... (+{len(new_inbox) - 5} more)"
|
||||
parts.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
|
||||
except Exception as exc:
|
||||
logger.debug("Workspace check failed: %s", exc)
|
||||
pass
|
||||
|
||||
return "\n".join(parts) if parts else ""
|
||||
|
||||
def _load_memory_context(self) -> str:
|
||||
@@ -708,6 +735,8 @@ class ThinkingEngine:
|
||||
return seed_type, f"Sovereignty reflection: {prompt}"
|
||||
if seed_type == "observation":
|
||||
return seed_type, self._seed_from_observation()
|
||||
if seed_type == "workspace":
|
||||
return seed_type, self._seed_from_workspace()
|
||||
# freeform — minimal guidance to steer away from repetition
|
||||
return seed_type, "Free reflection — explore something you haven't thought about yet today."
|
||||
|
||||
@@ -778,6 +807,63 @@ class ThinkingEngine:
|
||||
logger.debug("Observation seed data unavailable: %s", exc)
|
||||
return "\n".join(context_parts)
|
||||
|
||||
def _seed_from_workspace(self) -> str:
|
||||
"""Gather workspace updates as thought seed.
|
||||
|
||||
When there are pending workspace updates, include them as context
|
||||
for Timmy to reflect on. Falls back to random seed type if none.
|
||||
"""
|
||||
try:
|
||||
from timmy.workspace import workspace_monitor
|
||||
|
||||
updates = workspace_monitor.get_pending_updates()
|
||||
new_corr = updates.get("new_correspondence")
|
||||
new_inbox = updates.get("new_inbox_files", [])
|
||||
|
||||
if new_corr:
|
||||
# Take first 200 chars of the new entry
|
||||
snippet = new_corr[:200].replace("\n", " ")
|
||||
if len(new_corr) > 200:
|
||||
snippet += "..."
|
||||
return f"New workspace message from Hermes: {snippet}"
|
||||
|
||||
if new_inbox:
|
||||
files_str = ", ".join(new_inbox[:3])
|
||||
if len(new_inbox) > 3:
|
||||
files_str += f", ... (+{len(new_inbox) - 3} more)"
|
||||
return f"New inbox files from Hermes: {files_str}"
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug("Workspace seed unavailable: %s", exc)
|
||||
|
||||
# Fall back to a random seed type if no workspace updates
|
||||
return "The workspace is quiet. What should I be watching for?"
|
||||
|
||||
async def _check_workspace(self) -> None:
|
||||
"""Post-hook: check workspace for updates and mark them as seen.
|
||||
|
||||
This ensures Timmy 'processes' workspace updates even if the seed
|
||||
was different, keeping the state file in sync.
|
||||
"""
|
||||
try:
|
||||
from timmy.workspace import workspace_monitor
|
||||
|
||||
updates = workspace_monitor.get_pending_updates()
|
||||
new_corr = updates.get("new_correspondence")
|
||||
new_inbox = updates.get("new_inbox_files", [])
|
||||
|
||||
if new_corr or new_inbox:
|
||||
if new_corr:
|
||||
line_count = len([l for l in new_corr.splitlines() if l.strip()])
|
||||
logger.info("Workspace: processed %d new correspondence entries", line_count)
|
||||
if new_inbox:
|
||||
logger.info("Workspace: processed %d new inbox files: %s", len(new_inbox), new_inbox)
|
||||
|
||||
# Mark as seen to update the state file
|
||||
workspace_monitor.mark_seen()
|
||||
except Exception as exc:
|
||||
logger.debug("Workspace check failed: %s", exc)
|
||||
|
||||
# Maximum retries when a generated thought is too similar to recent ones
|
||||
_MAX_DEDUP_RETRIES = 2
|
||||
# Similarity threshold (0.0 = completely different, 1.0 = identical)
|
||||
|
||||
140
src/timmy/workspace.py
Normal file
140
src/timmy/workspace.py
Normal file
@@ -0,0 +1,140 @@
|
||||
"""Workspace monitor — tracks file-based communication between Hermes and Timmy.
|
||||
|
||||
The workspace/ directory provides file-based communication:
|
||||
- workspace/correspondence.md — append-only journal
|
||||
- workspace/inbox/ — files from Hermes to Timmy
|
||||
- workspace/outbox/ — files from Timmy to Hermes
|
||||
|
||||
This module tracks what Timmy has seen and detects new content.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_STATE_PATH = Path("data/workspace_state.json")
|
||||
|
||||
|
||||
class WorkspaceMonitor:
|
||||
"""Monitors workspace/ directory for new correspondence and inbox files."""
|
||||
|
||||
def __init__(self, state_path: Path = _DEFAULT_STATE_PATH) -> None:
|
||||
self._state_path = state_path
|
||||
self._state: dict = {"last_correspondence_line": 0, "seen_inbox_files": []}
|
||||
self._load_state()
|
||||
|
||||
def _get_workspace_path(self) -> Path:
|
||||
"""Get the workspace directory path."""
|
||||
return Path(settings.repo_root) / "workspace"
|
||||
|
||||
def _load_state(self) -> None:
|
||||
"""Load persisted state from JSON file."""
|
||||
try:
|
||||
if self._state_path.exists():
|
||||
with open(self._state_path, encoding="utf-8") as f:
|
||||
loaded = json.load(f)
|
||||
self._state = {
|
||||
"last_correspondence_line": loaded.get("last_correspondence_line", 0),
|
||||
"seen_inbox_files": loaded.get("seen_inbox_files", []),
|
||||
}
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to load workspace state: %s", exc)
|
||||
self._state = {"last_correspondence_line": 0, "seen_inbox_files": []}
|
||||
|
||||
def _save_state(self) -> None:
|
||||
"""Persist state to JSON file."""
|
||||
try:
|
||||
self._state_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(self._state_path, "w", encoding="utf-8") as f:
|
||||
json.dump(self._state, f, indent=2)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to save workspace state: %s", exc)
|
||||
|
||||
def check_correspondence(self) -> str | None:
|
||||
"""Read workspace/correspondence.md and return new entries.
|
||||
|
||||
Returns everything after the last seen line, or None if no new content.
|
||||
"""
|
||||
try:
|
||||
workspace = self._get_workspace_path()
|
||||
correspondence_file = workspace / "correspondence.md"
|
||||
|
||||
if not correspondence_file.exists():
|
||||
return None
|
||||
|
||||
content = correspondence_file.read_text(encoding="utf-8")
|
||||
lines = content.splitlines()
|
||||
|
||||
last_seen = self._state.get("last_correspondence_line", 0)
|
||||
if len(lines) <= last_seen:
|
||||
return None
|
||||
|
||||
new_lines = lines[last_seen:]
|
||||
return "\n".join(new_lines)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to check correspondence: %s", exc)
|
||||
return None
|
||||
|
||||
def check_inbox(self) -> list[str]:
|
||||
"""List workspace/inbox/ files and return any not in seen list.
|
||||
|
||||
Returns a list of filenames that are new.
|
||||
"""
|
||||
try:
|
||||
workspace = self._get_workspace_path()
|
||||
inbox_dir = workspace / "inbox"
|
||||
|
||||
if not inbox_dir.exists():
|
||||
return []
|
||||
|
||||
seen = set(self._state.get("seen_inbox_files", []))
|
||||
current_files = {f.name for f in inbox_dir.iterdir() if f.is_file()}
|
||||
new_files = sorted(current_files - seen)
|
||||
|
||||
return new_files
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to check inbox: %s", exc)
|
||||
return []
|
||||
|
||||
def get_pending_updates(self) -> dict:
|
||||
"""Get all pending workspace updates.
|
||||
|
||||
Returns a dict with keys:
|
||||
- 'new_correspondence': str or None — new entries from correspondence.md
|
||||
- 'new_inbox_files': list[str] — new files in inbox/
|
||||
"""
|
||||
return {
|
||||
"new_correspondence": self.check_correspondence(),
|
||||
"new_inbox_files": self.check_inbox(),
|
||||
}
|
||||
|
||||
def mark_seen(self) -> None:
|
||||
"""Update state file after processing current content."""
|
||||
try:
|
||||
workspace = self._get_workspace_path()
|
||||
|
||||
# Update correspondence line count
|
||||
correspondence_file = workspace / "correspondence.md"
|
||||
if correspondence_file.exists():
|
||||
content = correspondence_file.read_text(encoding="utf-8")
|
||||
self._state["last_correspondence_line"] = len(content.splitlines())
|
||||
|
||||
# Update inbox seen list
|
||||
inbox_dir = workspace / "inbox"
|
||||
if inbox_dir.exists():
|
||||
current_files = [f.name for f in inbox_dir.iterdir() if f.is_file()]
|
||||
self._state["seen_inbox_files"] = sorted(current_files)
|
||||
else:
|
||||
self._state["seen_inbox_files"] = []
|
||||
|
||||
self._save_state()
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to mark workspace as seen: %s", exc)
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
workspace_monitor = WorkspaceMonitor()
|
||||
323
tests/timmy/test_workspace.py
Normal file
323
tests/timmy/test_workspace.py
Normal file
@@ -0,0 +1,323 @@
|
||||
"""Tests for timmy.workspace — Workspace heartbeat monitoring."""
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.workspace import WorkspaceMonitor
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_monitor(tmp_path, monkeypatch):
|
||||
"""Create a WorkspaceMonitor with tmp_path as the repo_root."""
|
||||
# Mock repo_root to use tmp_path
|
||||
monkeypatch.setattr("timmy.workspace.settings", type("obj", (object,), {
|
||||
"repo_root": str(tmp_path)
|
||||
})())
|
||||
|
||||
state_path = tmp_path / "workspace_state.json"
|
||||
return WorkspaceMonitor(state_path=state_path)
|
||||
|
||||
|
||||
def _setup_workspace(tmp_path):
|
||||
"""Create the workspace directory structure."""
|
||||
workspace = tmp_path / "workspace"
|
||||
workspace.mkdir()
|
||||
(workspace / "inbox").mkdir()
|
||||
(workspace / "outbox").mkdir()
|
||||
return workspace
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Basic monitoring
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_no_updates_when_empty(tmp_path, monkeypatch):
|
||||
"""Fresh monitor with no workspace files should report no updates."""
|
||||
# Don't create workspace dir — monitor should handle gracefully
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor.get_pending_updates()
|
||||
|
||||
assert updates["new_correspondence"] is None
|
||||
assert updates["new_inbox_files"] == []
|
||||
|
||||
|
||||
def test_detects_new_correspondence(tmp_path, monkeypatch):
|
||||
"""Writing to correspondence.md should be detected as new entries."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
correspondence = workspace / "correspondence.md"
|
||||
|
||||
# Pre-populate correspondence file
|
||||
correspondence.write_text("Entry 1\nEntry 2\nEntry 3\n")
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
# Should detect all 3 lines as new
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] == "Entry 1\nEntry 2\nEntry 3"
|
||||
assert updates["new_inbox_files"] == []
|
||||
|
||||
|
||||
def test_detects_new_inbox_file(tmp_path, monkeypatch):
|
||||
"""Creating a file in inbox/ should be detected as new."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
inbox = workspace / "inbox"
|
||||
|
||||
# Create a file in inbox
|
||||
(inbox / "message_1.md").write_text("Hello Timmy")
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] is None
|
||||
assert updates["new_inbox_files"] == ["message_1.md"]
|
||||
|
||||
|
||||
def test_detects_multiple_inbox_files(tmp_path, monkeypatch):
|
||||
"""Multiple new inbox files should all be detected."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
inbox = workspace / "inbox"
|
||||
|
||||
# Create multiple files
|
||||
(inbox / "message_2.md").write_text("Hello again")
|
||||
(inbox / "task_1.md").write_text("Do something")
|
||||
(inbox / "note.txt").write_text("A note")
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_inbox_files"] == ["message_2.md", "note.txt", "task_1.md"]
|
||||
|
||||
|
||||
def test_detects_both_correspondence_and_inbox(tmp_path, monkeypatch):
|
||||
"""Monitor should detect both correspondence and inbox updates together."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
|
||||
correspondence = workspace / "correspondence.md"
|
||||
correspondence.write_text("New journal entry\n")
|
||||
|
||||
inbox = workspace / "inbox"
|
||||
(inbox / "urgent.md").write_text("Urgent message")
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] == "New journal entry"
|
||||
assert updates["new_inbox_files"] == ["urgent.md"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Marking as seen
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_mark_seen_clears_pending(tmp_path, monkeypatch):
|
||||
"""After mark_seen, get_pending_updates should return empty."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
|
||||
correspondence = workspace / "correspondence.md"
|
||||
correspondence.write_text("Line 1\nLine 2\n")
|
||||
|
||||
inbox = workspace / "inbox"
|
||||
(inbox / "file.md").write_text("Content")
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
# First check — should have updates
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] is not None
|
||||
assert len(updates["new_inbox_files"]) == 1
|
||||
|
||||
# Mark as seen
|
||||
monitor.mark_seen()
|
||||
|
||||
# Second check — should be empty
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] is None
|
||||
assert updates["new_inbox_files"] == []
|
||||
|
||||
|
||||
def test_mark_seen_persists_line_count(tmp_path, monkeypatch):
|
||||
"""mark_seen should remember how many lines we've seen."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
|
||||
correspondence = workspace / "correspondence.md"
|
||||
correspondence.write_text("Line 1\nLine 2\nLine 3\n")
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
monitor.mark_seen()
|
||||
|
||||
# Add more lines
|
||||
correspondence.write_text("Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n")
|
||||
|
||||
# Should only see the new lines
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] == "Line 4\nLine 5"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# State persistence
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_state_persists_across_instances(tmp_path, monkeypatch):
|
||||
"""State should be saved and loaded when creating a new monitor instance."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
|
||||
correspondence = workspace / "correspondence.md"
|
||||
correspondence.write_text("First entry\n")
|
||||
|
||||
inbox = workspace / "inbox"
|
||||
(inbox / "first.md").write_text("First")
|
||||
|
||||
# First monitor instance
|
||||
monitor1 = _make_monitor(tmp_path, monkeypatch)
|
||||
monitor1.mark_seen()
|
||||
|
||||
# Add new content
|
||||
correspondence.write_text("First entry\nSecond entry\n")
|
||||
(inbox / "second.md").write_text("Second")
|
||||
|
||||
# Second monitor instance (should load state from file)
|
||||
monitor2 = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor2.get_pending_updates()
|
||||
assert updates["new_correspondence"] == "Second entry"
|
||||
assert updates["new_inbox_files"] == ["second.md"]
|
||||
|
||||
|
||||
def test_state_survives_missing_files(tmp_path, monkeypatch):
|
||||
"""Monitor should handle missing correspondence file gracefully."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
|
||||
# Create and mark as seen
|
||||
correspondence = workspace / "correspondence.md"
|
||||
correspondence.write_text("Entry\n")
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
monitor.mark_seen()
|
||||
|
||||
# Delete the file
|
||||
correspondence.unlink()
|
||||
|
||||
# Should return None gracefully
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_empty_correspondence_file(tmp_path, monkeypatch):
|
||||
"""Empty correspondence file should return None."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
(workspace / "correspondence.md").write_text("")
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] is None
|
||||
|
||||
|
||||
def test_empty_inbox_dir(tmp_path, monkeypatch):
|
||||
"""Empty inbox directory should return empty list."""
|
||||
_setup_workspace(tmp_path)
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_inbox_files"] == []
|
||||
|
||||
|
||||
def test_missing_inbox_dir(tmp_path, monkeypatch):
|
||||
"""Missing inbox directory should return empty list."""
|
||||
workspace = tmp_path / "workspace"
|
||||
workspace.mkdir()
|
||||
# No inbox subdir
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_inbox_files"] == []
|
||||
|
||||
|
||||
def test_missing_workspace_dir(tmp_path, monkeypatch):
|
||||
"""Missing workspace directory should return empty results."""
|
||||
# No workspace dir at all
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] is None
|
||||
assert updates["new_inbox_files"] == []
|
||||
|
||||
|
||||
def test_correspondence_with_blank_lines(tmp_path, monkeypatch):
|
||||
"""Correspondence with blank lines should be handled correctly."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
|
||||
correspondence = workspace / "correspondence.md"
|
||||
correspondence.write_text("Entry 1\n\nEntry 2\n\n\nEntry 3\n")
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] == "Entry 1\n\nEntry 2\n\n\nEntry 3"
|
||||
|
||||
|
||||
def test_inbox_ignores_subdirectories(tmp_path, monkeypatch):
|
||||
"""Inbox should only list files, not subdirectories."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
inbox = workspace / "inbox"
|
||||
|
||||
(inbox / "file.md").write_text("Content")
|
||||
(inbox / "subdir").mkdir()
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_inbox_files"] == ["file.md"]
|
||||
|
||||
|
||||
def test_deleted_inbox_files_removed_from_state(tmp_path, monkeypatch):
|
||||
"""When inbox files are deleted, they should be removed from seen list."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
inbox = workspace / "inbox"
|
||||
|
||||
# Create and see a file
|
||||
(inbox / "temp.md").write_text("Temp")
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
monitor.mark_seen()
|
||||
|
||||
# Delete the file
|
||||
(inbox / "temp.md").unlink()
|
||||
|
||||
# mark_seen should update seen list to remove deleted files
|
||||
monitor.mark_seen()
|
||||
|
||||
# State should now have empty seen list
|
||||
assert monitor._state["seen_inbox_files"] == []
|
||||
|
||||
|
||||
def test_correspondence_append_only(tmp_path, monkeypatch):
|
||||
"""Correspondence is append-only; modifying existing content doesn't re-notify."""
|
||||
workspace = _setup_workspace(tmp_path)
|
||||
|
||||
correspondence = workspace / "correspondence.md"
|
||||
correspondence.write_text("Line 1\nLine 2\n")
|
||||
|
||||
monitor = _make_monitor(tmp_path, monkeypatch)
|
||||
monitor.mark_seen()
|
||||
|
||||
# Modify the file (truncate and rewrite) — this resets line count
|
||||
# but correspondence.md should be append-only in practice
|
||||
correspondence.write_text("Modified Line 1\nModified Line 2\nLine 3\n")
|
||||
|
||||
# Line 3 is the only truly new line (we've now seen 2, file has 3)
|
||||
updates = monitor.get_pending_updates()
|
||||
assert updates["new_correspondence"] == "Line 3"
|
||||
Reference in New Issue
Block a user