Improve test coverage from 63.6% to 73.4% and fix test infrastructure (#137)

This commit is contained in:
Alexander Whitestone
2026-03-06 13:21:05 -05:00
committed by GitHub
parent 23f744f296
commit 3f06e7231d
17 changed files with 2312 additions and 16 deletions

View File

@@ -25,16 +25,21 @@ jobs:
cache: "pip"
- name: Install dependencies
run: pip install -e ".[dev]"
run: |
pip install poetry
poetry install --with dev
- name: Run tests
run: |
mkdir -p reports
pytest \
poetry run pytest \
--cov=src \
--cov-report=term-missing \
--cov-report=xml:reports/coverage.xml \
--junitxml=reports/junit.xml
--cov-fail-under=73 \
--junitxml=reports/junit.xml \
-p no:xdist \
-m "not ollama and not docker and not selenium and not external_api"
# Posts a check annotation + PR comment showing pass/fail counts.
# Visible in the GitHub mobile app under Checks and in PR conversations.

View File

@@ -88,13 +88,13 @@ watch:
# ── Testing ───────────────────────────────────────────────────────────────────
test:
$(PYTEST) tests/ -q --tb=short
$(PYTEST) tests/ -q --tb=short -n auto --dist worksteal
test-unit:
$(PYTEST) tests -m "unit" --tb=short -v
$(PYTEST) tests -m "unit" --tb=short -v -n auto --dist worksteal
test-integration:
$(PYTEST) tests -m "integration" --tb=short -v
$(PYTEST) tests -m "integration" --tb=short -v -n auto --dist worksteal
test-functional:
$(PYTEST) tests -m "functional and not slow and not selenium" --tb=short -v -n0
@@ -103,16 +103,16 @@ test-e2e:
$(PYTEST) tests -m "e2e" --tb=short -v -n0
test-fast:
$(PYTEST) tests -m "unit or integration" --tb=short -v
$(PYTEST) tests -m "unit or integration" --tb=short -v -n auto --dist worksteal
test-ci:
$(PYTEST) tests -m "not skip_ci" --tb=short --cov=src --cov-report=term-missing
$(PYTEST) tests -m "not skip_ci" --tb=short --cov=src --cov-report=term-missing --cov-fail-under=73 -p no:xdist
test-cov:
$(PYTEST) tests/ --cov=src --cov-report=term-missing --cov-report=xml -q
$(PYTEST) tests/ --cov=src --cov-report=term-missing --cov-report=xml --cov-fail-under=73 -q -p no:xdist
test-cov-html:
$(PYTEST) tests/ --cov=src --cov-report=term-missing --cov-report=html -q
$(PYTEST) tests/ --cov=src --cov-report=term-missing --cov-report=html --cov-fail-under=73 -q -p no:xdist
@echo "✓ HTML coverage report: open htmlcov/index.html"
# Full-stack functional test: spins up Ollama (CPU, qwen2.5:0.5b) + dashboard

195
TEST_COVERAGE_ANALYSIS.md Normal file
View File

@@ -0,0 +1,195 @@
# Test Coverage Analysis — Timmy Time Dashboard
**Date:** 2026-03-06
**Overall coverage:** 63.6% (7,996 statements, 2,910 missed)
**Threshold:** 60% (passes, but barely)
**Test suite:** 914 passed, 4 failed, 39 skipped, 5 errors — 35 seconds
---
## Current Coverage by Package
| Package | Approx. Coverage | Notes |
|---------|-----------------|-------|
| `spark/` | 9098% | Best-covered package |
| `timmy_serve/` | 80100% | Small package, well tested |
| `infrastructure/models/` | 4297% | `registry` great, `multimodal` weak |
| `dashboard/middleware/` | 79100% | Solid |
| `dashboard/routes/` | 36100% | Highly uneven — some routes untested |
| `integrations/` | 51100% | Paperclip well covered; Discord weak |
| `timmy/` | 0100% | Several core modules at 0% |
| `brain/` | 075% | `client` and `worker` very low |
| `infrastructure/events/` | 0% | Completely untested |
| `infrastructure/error_capture.py` | 0% | Completely untested |
---
## Priority 1 — Zero-Coverage Modules (0%)
These modules have **no test coverage at all** and represent the biggest risk:
| Module | Stmts | Purpose |
|--------|-------|---------|
| `src/timmy/semantic_memory.py` | 187 | Semantic memory system — core agent feature |
| `src/timmy/agents/timmy.py` | 165 | Main Timmy agent class |
| `src/timmy/agents/base.py` | 57 | Base agent class |
| `src/timmy/interview.py` | 46 | Interview flow |
| `src/infrastructure/error_capture.py` | 91 | Error capture/reporting |
| `src/infrastructure/events/broadcaster.py` | 67 | Event broadcasting |
| `src/infrastructure/events/bus.py` | 74 | Event bus |
| `src/infrastructure/openfang/tools.py` | 41 | OpenFang tool definitions |
| `src/brain/schema.py` | 14 | Brain schema definitions |
**Recommendation:** `timmy/agents/timmy.py` (165 stmts) and `semantic_memory.py` (187 stmts) are the highest-value targets. The events subsystem (`broadcaster.py` + `bus.py` = 141 stmts) is critical infrastructure with zero tests.
---
## Priority 2 — Under-Tested Modules (<50%)
| Module | Cover | Stmts Missed | Purpose |
|--------|-------|-------------|---------|
| `brain/client.py` | 14.8% | 127 | Brain client — primary brain interface |
| `brain/worker.py` | 16.1% | 156 | Background brain worker |
| `brain/embeddings.py` | 35.0% | 26 | Embedding generation |
| `timmy/approvals.py` | 39.1% | 42 | Approval workflow |
| `dashboard/routes/marketplace.py` | 36.4% | 21 | Marketplace routes |
| `dashboard/routes/paperclip.py` | 41.1% | 96 | Paperclip dashboard routes |
| `infrastructure/hands/tools.py` | 41.3% | 27 | Tool execution |
| `infrastructure/models/multimodal.py` | 42.6% | 81 | Multimodal model support |
| `dashboard/routes/router.py` | 42.9% | 12 | Route registration |
| `dashboard/routes/swarm.py` | 43.3% | 17 | Swarm routes |
| `timmy/cascade_adapter.py` | 43.2% | 25 | Cascade LLM adapter |
| `timmy/tools_intro/__init__.py` | 44.7% | 84 | Tool introduction system |
| `timmy/tools.py` | 46.4% | 147 | Agent tool definitions |
| `timmy/cli.py` | 47.4% | 30 | CLI entry point |
| `timmy/conversation.py` | 48.5% | 34 | Conversation management |
**Recommendation:** `brain/client.py` + `brain/worker.py` together miss 283 statements and are the core of the brain/memory system. `timmy/tools.py` misses 147 statements and is the agent's tool registry — high impact.
---
## Priority 3 — Test Infrastructure Issues
### 3a. Broken Tests (4 failures)
All in `tests/test_setup_script.py` — tests reference `/home/ubuntu/setup_timmy.sh` which doesn't exist. These tests are environment-specific and should either:
- Be marked `@pytest.mark.skip_ci` or `@pytest.mark.functional`
- Use a fixture to locate the script relative to the project
### 3b. Collection Errors (5 errors)
`tests/functional/test_setup_prod.py` — same issue, references a non-existent script path. Should be guarded with a skip condition.
### 3c. pytest-xdist Conflicts with Coverage
The `pyproject.toml` `addopts` includes `-n auto --dist worksteal` (xdist), but `make test-cov` also passes `--cov` flags. This causes a conflict:
```
pytest: error: unrecognized arguments: -n --dist worksteal
```
**Fix:** Either:
- Remove `-n auto --dist worksteal` from `addopts` and add it only in `make test` target
- Or use `-p no:xdist` in the coverage targets (current workaround)
### 3d. Tox Configuration
`tox.ini` has `unit` and `integration` environments that run the **exact same command** — they're aliases. This is misleading:
- `unit` should run `-m unit` (fast, no I/O)
- `integration` should run `-m integration` (may use SQLite)
- Consider adding a `coverage` tox env
### 3e. CI Workflow (`tests.yml`)
- CI uses `pip install -e ".[dev]"` but the project uses Poetry — dependency resolution may differ
- CI doesn't pass marker filters, so it runs **all** tests including those that may need Docker/Ollama
- No coverage enforcement in CI (the `fail_under=60` in pyproject.toml only works with `--cov-fail-under`)
- No caching of Poetry virtualenvs
---
## Priority 4 — Test Quality Gaps
### 4a. Missing Error-Path Testing
Many modules have happy-path tests but lack coverage for:
- **Graceful degradation paths**: The architecture mandates graceful degradation when Ollama/Redis/AirLLM are unavailable, but most fallback paths are untested (e.g., `cascade.py` lines 563655)
- **`brain/client.py`**: Only 14.8% covered — connection failures, retries, and error handling are untested
- **`infrastructure/error_capture.py`**: 0% — the error capture system itself has no tests
### 4b. No Integration Tests for Events System
The `infrastructure/events/` package (`broadcaster.py` + `bus.py`) is 0% covered. This is the pub/sub backbone for the application. Tests should cover:
- Event subscription and dispatch
- Multiple subscribers
- Error handling in event handlers
- Async event broadcasting
### 4c. Security Tests Are Thin
- `tests/security/` has only 3 files totaling ~140 lines
- `src/timmy_serve/l402_proxy.py` (payment gating, listed as security-sensitive) has no dedicated test file
- CSRF tests exist but bypass/traversal tests are minimal
- No tests for the `approvals.py` authorization workflow (39.1% covered)
### 4d. Missing WebSocket Tests
WebSocket handler (`ws_manager/handler.py`) has 81.2% coverage, but the disconnect/reconnect and error paths (lines 132147) aren't tested. For a real-time dashboard, WebSocket reliability is critical.
### 4e. No Tests for `timmy/agents/` Subpackage
The Agno-based agent classes (`base.py`, `timmy.py`) are at 0% coverage (222 statements). These are stubbed in conftest but never actually exercised. Even with the Agno stub, the control flow and prompt construction logic should be tested.
---
## Priority 5 — Test Speed & Parallelism
| Metric | Value |
|--------|-------|
| Total wall time | ~35s (sequential) |
| Parallel (`-n auto`) | Would be ~10-15s |
| Slowest category | Functional tests (HTTP, Docker) |
**Observations:**
- 30-second timeout per test is generous — consider 10s for unit, 30s for integration
- The `--dist worksteal` strategy is good for uneven test durations
- 39 tests are skipped (mostly due to missing markers/env) — this is expected
- No test duration profiling is configured (consider `--durations=10`)
---
## Recommended Action Plan
### Quick Wins (High ROI, Low Effort)
1. **Fix the 4 broken tests** in `test_setup_script.py` (add skip guards)
2. **Fix xdist/coverage conflict** in `pyproject.toml` addopts
3. **Differentiate tox `unit` vs `integration`** environments
4. **Add `--durations=10`** to default addopts for profiling slow tests
5. **Add `--cov-fail-under=60`** to CI workflow to enforce the threshold
### Medium Effort, High Impact
6. **Test the events system** (`broadcaster.py` + `bus.py`) — 141 uncovered statements, critical infrastructure
7. **Test `timmy/agents/timmy.py`** — 165 uncovered statements, core agent
8. **Test `brain/client.py` and `brain/worker.py`** — 283 uncovered statements, core memory
9. **Test `timmy/tools.py`** error paths — 147 uncovered statements
10. **Test `error_capture.py`** — 91 uncovered statements, observability blind spot
### Longer Term
11. **Add graceful-degradation tests** — verify fallback behavior for all optional services
12. **Expand security test suite** — approvals, L402 proxy, input sanitization
13. **Add coverage tox environment** and enforce in CI
14. **Align CI with Poetry** — use `poetry install` instead of pip for consistent resolution
15. **Target 75% coverage** as the next threshold milestone (currently 63.6%)
---
## Coverage Floor Modules (Already Well-Tested)
These modules are at 95%+ and serve as good examples of testing patterns:
- `spark/eidos.py` — 98.3%
- `spark/memory.py` — 98.3%
- `infrastructure/models/registry.py` — 97.1%
- `timmy/agent_core/ollama_adapter.py` — 97.8%
- `timmy/agent_core/interface.py` — 100%
- `dashboard/middleware/security_headers.py` — 100%
- `dashboard/routes/agents.py` — 100%
- `timmy_serve/inter_agent.py` — 100%

View File

@@ -82,7 +82,7 @@ asyncio_default_fixture_loop_scope = "function"
timeout = 30
timeout_method = "signal"
timeout_func_only = false
addopts = "-v --tb=short --strict-markers --disable-warnings -n auto --dist worksteal"
addopts = "-v --tb=short --strict-markers --disable-warnings --durations=10"
markers = [
"unit: Unit tests (fast, no I/O)",
"integration: Integration tests (may use SQLite)",
@@ -115,7 +115,7 @@ exclude_lines = [
"@abstractmethod",
]
# Fail CI if coverage drops below this threshold
fail_under = 60
fail_under = 73
[tool.coverage.html]
directory = "htmlcov"

View File

@@ -0,0 +1,282 @@
"""Tests for brain.client — BrainClient memory + task operations."""
import json
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from brain.client import BrainClient, DEFAULT_RQLITE_URL
class TestBrainClientInit:
"""Test BrainClient initialization."""
def test_default_url(self):
client = BrainClient()
assert client.rqlite_url == DEFAULT_RQLITE_URL
def test_custom_url(self):
client = BrainClient(rqlite_url="http://custom:4001")
assert client.rqlite_url == "http://custom:4001"
def test_node_id_generated(self):
client = BrainClient()
assert client.node_id # not empty
def test_custom_node_id(self):
client = BrainClient(node_id="my-node")
assert client.node_id == "my-node"
def test_source_detection(self):
client = BrainClient()
assert isinstance(client.source, str)
class TestBrainClientMemory:
"""Test memory operations (remember, recall, get_recent, get_context)."""
def _make_client(self):
return BrainClient(rqlite_url="http://test:4001", node_id="test-node")
async def test_remember_success(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [{"last_insert_id": 42}]
}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
with patch("brain.client.BrainClient._detect_source", return_value="test"):
with patch("brain.embeddings.get_embedder") as mock_emb:
mock_embedder = MagicMock()
mock_embedder.encode_single.return_value = b"\x00" * 16
mock_emb.return_value = mock_embedder
result = await client.remember("test memory", tags=["test"])
assert result["id"] == 42
assert result["status"] == "stored"
async def test_remember_failure_raises(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("connection refused"))
with patch("brain.embeddings.get_embedder") as mock_emb:
mock_embedder = MagicMock()
mock_embedder.encode_single.return_value = b"\x00" * 16
mock_emb.return_value = mock_embedder
with pytest.raises(Exception, match="connection refused"):
await client.remember("fail")
async def test_recall_success(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [{"rows": [
["memory content", "test", '{"key": "val"}', 0.1],
]}]
}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
with patch("brain.embeddings.get_embedder") as mock_emb:
mock_embedder = MagicMock()
mock_embedder.encode_single.return_value = b"\x00" * 16
mock_emb.return_value = mock_embedder
results = await client.recall("search query")
assert len(results) == 1
assert results[0]["content"] == "memory content"
assert results[0]["metadata"] == {"key": "val"}
async def test_recall_with_source_filter(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {"results": [{"rows": []}]}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
with patch("brain.embeddings.get_embedder") as mock_emb:
mock_embedder = MagicMock()
mock_embedder.encode_single.return_value = b"\x00" * 16
mock_emb.return_value = mock_embedder
results = await client.recall("test", sources=["timmy", "user"])
assert results == []
# Check that sources were passed in the SQL
call_args = client._client.post.call_args
sql_params = call_args[1]["json"]
assert "timmy" in sql_params[1] or "timmy" in str(sql_params)
async def test_recall_error_returns_empty(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("timeout"))
with patch("brain.embeddings.get_embedder") as mock_emb:
mock_embedder = MagicMock()
mock_embedder.encode_single.return_value = b"\x00" * 16
mock_emb.return_value = mock_embedder
results = await client.recall("test")
assert results == []
async def test_get_recent_success(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [{"rows": [
[1, "recent memory", "test", '["tag1"]', '{}', "2026-03-06T00:00:00"],
]}]
}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
memories = await client.get_recent(hours=24, limit=10)
assert len(memories) == 1
assert memories[0]["content"] == "recent memory"
assert memories[0]["tags"] == ["tag1"]
async def test_get_recent_error_returns_empty(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("db error"))
result = await client.get_recent()
assert result == []
async def test_get_context(self):
client = self._make_client()
client.get_recent = AsyncMock(return_value=[
{"content": "Recent item 1"},
{"content": "Recent item 2"},
])
client.recall = AsyncMock(return_value=[
{"content": "Relevant item 1"},
])
ctx = await client.get_context("test query")
assert "Recent activity:" in ctx
assert "Recent item 1" in ctx
assert "Relevant memories:" in ctx
assert "Relevant item 1" in ctx
class TestBrainClientTasks:
"""Test task queue operations."""
def _make_client(self):
return BrainClient(rqlite_url="http://test:4001", node_id="test-node")
async def test_submit_task(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [{"last_insert_id": 7}]
}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
result = await client.submit_task("do something", task_type="shell")
assert result["id"] == 7
assert result["status"] == "queued"
async def test_submit_task_failure_raises(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("network error"))
with pytest.raises(Exception, match="network error"):
await client.submit_task("fail task")
async def test_claim_task_found(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [{"rows": [
[1, "task content", "shell", 5, '{"key": "val"}']
]}]
}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
task = await client.claim_task(["shell", "general"])
assert task is not None
assert task["id"] == 1
assert task["content"] == "task content"
assert task["metadata"] == {"key": "val"}
async def test_claim_task_none_available(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {"results": [{"rows": []}]}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
task = await client.claim_task(["shell"])
assert task is None
async def test_claim_task_error_returns_none(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("raft error"))
task = await client.claim_task(["general"])
assert task is None
async def test_complete_task(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock()
# Should not raise
await client.complete_task(1, success=True, result="done")
client._client.post.assert_awaited_once()
async def test_complete_task_failure(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock()
await client.complete_task(1, success=False, error="oops")
client._client.post.assert_awaited_once()
async def test_get_pending_tasks(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [{"rows": [
[1, "task 1", "general", 0, '{}', "2026-03-06"],
[2, "task 2", "shell", 5, '{}', "2026-03-06"],
]}]
}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
tasks = await client.get_pending_tasks()
assert len(tasks) == 2
async def test_get_pending_tasks_error(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("fail"))
result = await client.get_pending_tasks()
assert result == []
async def test_close(self):
client = self._make_client()
client._client = MagicMock()
client._client.aclose = AsyncMock()
await client.close()
client._client.aclose.assert_awaited_once()

View File

@@ -0,0 +1,238 @@
"""Tests for brain.worker — DistributedWorker capability detection + task execution."""
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
from brain.worker import DistributedWorker
class TestWorkerInit:
"""Test worker initialization and capability detection."""
@patch("brain.worker.DistributedWorker._detect_capabilities")
def test_init_defaults(self, mock_caps):
mock_caps.return_value = ["general"]
worker = DistributedWorker()
assert worker.running is False
assert worker.node_id # non-empty
assert "general" in worker.capabilities
@patch("brain.worker.DistributedWorker._detect_capabilities")
def test_custom_brain_client(self, mock_caps):
mock_caps.return_value = ["general"]
mock_client = MagicMock()
worker = DistributedWorker(brain_client=mock_client)
assert worker.brain is mock_client
@patch("brain.worker.DistributedWorker._detect_capabilities")
def test_default_handlers_registered(self, mock_caps):
mock_caps.return_value = ["general"]
worker = DistributedWorker()
assert "shell" in worker._handlers
assert "creative" in worker._handlers
assert "code" in worker._handlers
assert "research" in worker._handlers
assert "general" in worker._handlers
class TestCapabilityDetection:
"""Test individual capability detection methods."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def _make_worker(self, mock_caps):
return DistributedWorker()
@patch("brain.worker.subprocess.run")
def test_has_gpu_nvidia(self, mock_run):
worker = self._make_worker()
mock_run.return_value = MagicMock(returncode=0)
assert worker._has_gpu() is True
@patch("brain.worker.subprocess.run", side_effect=OSError("no nvidia-smi"))
@patch("brain.worker.os.path.exists", return_value=False)
@patch("brain.worker.os.uname")
def test_has_gpu_no_gpu(self, mock_uname, mock_exists, mock_run):
worker = self._make_worker()
mock_uname.return_value = MagicMock(sysname="Linux")
assert worker._has_gpu() is False
@patch("brain.worker.subprocess.run")
def test_has_internet_true(self, mock_run):
worker = self._make_worker()
mock_run.return_value = MagicMock(returncode=0)
assert worker._has_internet() is True
@patch("brain.worker.subprocess.run", side_effect=OSError("no curl"))
def test_has_internet_no_curl(self, mock_run):
worker = self._make_worker()
assert worker._has_internet() is False
@patch("brain.worker.subprocess.run")
def test_has_command_true(self, mock_run):
worker = self._make_worker()
mock_run.return_value = MagicMock(returncode=0)
assert worker._has_command("docker") is True
@patch("brain.worker.subprocess.run")
def test_has_command_false(self, mock_run):
worker = self._make_worker()
mock_run.return_value = MagicMock(returncode=1)
assert worker._has_command("nonexistent") is False
@patch("brain.worker.subprocess.run", side_effect=OSError)
def test_has_command_oserror(self, mock_run):
worker = self._make_worker()
assert worker._has_command("anything") is False
class TestRegisterHandler:
"""Test custom handler registration."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def test_register_adds_handler_and_capability(self, mock_caps):
worker = DistributedWorker()
async def custom_handler(content):
return "custom result"
worker.register_handler("custom_type", custom_handler)
assert "custom_type" in worker._handlers
assert "custom_type" in worker.capabilities
class TestTaskHandlers:
"""Test individual task handlers."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def _make_worker(self, mock_caps):
worker = DistributedWorker()
worker.brain = MagicMock()
worker.brain.remember = AsyncMock()
worker.brain.complete_task = AsyncMock()
return worker
async def test_handle_code(self):
worker = self._make_worker()
result = await worker._handle_code("write a function")
assert "write a function" in result
async def test_handle_research_no_internet(self):
worker = self._make_worker()
worker.capabilities = ["general"] # no "web"
with pytest.raises(Exception, match="Internet not available"):
await worker._handle_research("search query")
async def test_handle_creative_no_gpu(self):
worker = self._make_worker()
worker.capabilities = ["general"] # no "gpu"
with pytest.raises(Exception, match="GPU not available"):
await worker._handle_creative("make an image")
async def test_handle_general_no_ollama(self):
worker = self._make_worker()
worker.capabilities = ["general"] # but not "ollama"
# Remove "ollama" if present
if "ollama" in worker.capabilities:
worker.capabilities.remove("ollama")
with pytest.raises(Exception, match="Ollama not available"):
await worker._handle_general("answer this")
class TestExecuteTask:
"""Test execute_task orchestration."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def _make_worker(self, mock_caps):
worker = DistributedWorker()
worker.brain = MagicMock()
worker.brain.complete_task = AsyncMock()
return worker
async def test_execute_task_success(self):
worker = self._make_worker()
async def fake_handler(content):
return "result"
worker._handlers["test_type"] = fake_handler
result = await worker.execute_task({
"id": 1,
"type": "test_type",
"content": "do it",
})
assert result["success"] is True
assert result["result"] == "result"
worker.brain.complete_task.assert_awaited_once_with(1, success=True, result="result")
async def test_execute_task_failure(self):
worker = self._make_worker()
async def failing_handler(content):
raise RuntimeError("oops")
worker._handlers["fail_type"] = failing_handler
result = await worker.execute_task({
"id": 2,
"type": "fail_type",
"content": "fail",
})
assert result["success"] is False
assert "oops" in result["error"]
worker.brain.complete_task.assert_awaited_once()
async def test_execute_task_falls_back_to_general(self):
worker = self._make_worker()
async def general_handler(content):
return "general result"
worker._handlers["general"] = general_handler
result = await worker.execute_task({
"id": 3,
"type": "unknown_type",
"content": "something",
})
assert result["success"] is True
assert result["result"] == "general result"
class TestRunOnce:
"""Test run_once loop iteration."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def _make_worker(self, mock_caps):
worker = DistributedWorker()
worker.brain = MagicMock()
worker.brain.claim_task = AsyncMock()
worker.brain.complete_task = AsyncMock()
return worker
async def test_run_once_no_tasks(self):
worker = self._make_worker()
worker.brain.claim_task.return_value = None
had_work = await worker.run_once()
assert had_work is False
async def test_run_once_with_task(self):
worker = self._make_worker()
worker.brain.claim_task.return_value = {
"id": 1, "type": "code", "content": "write code"
}
had_work = await worker.run_once()
assert had_work is True
class TestStopWorker:
"""Test stop method."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def test_stop_sets_running_false(self, mock_caps):
worker = DistributedWorker()
worker.running = True
worker.stop()
assert worker.running is False

View File

@@ -10,6 +10,11 @@ PROD_PROJECT_DIR = Path("/home/ubuntu/prod-sovereign-stack")
PROD_VAULT_DIR = PROD_PROJECT_DIR / "TimmyVault"
SETUP_SCRIPT_PATH = Path("/home/ubuntu/setup_timmy.sh")
pytestmark = pytest.mark.skipif(
not SETUP_SCRIPT_PATH.exists(),
reason=f"Setup script not found at {SETUP_SCRIPT_PATH}",
)
@pytest.fixture(scope="module", autouse=True)
def setup_prod_env():
"""Ensure a clean environment and run the full installation."""

View File

@@ -0,0 +1,125 @@
"""Tests for infrastructure.error_capture module."""
import pytest
from unittest.mock import patch, MagicMock
from datetime import datetime, timezone
from infrastructure.error_capture import (
_stack_hash,
_is_duplicate,
_get_git_context,
capture_error,
_dedup_cache,
)
def _make_exception():
"""Helper that always raises from the same line for stable hashing."""
raise ValueError("test error")
class TestStackHash:
"""Test _stack_hash produces stable hashes."""
def test_hash_is_deterministic_for_same_exception(self):
"""Same exception object always produces the same hash."""
try:
_make_exception()
except ValueError as e:
hash1 = _stack_hash(e)
hash2 = _stack_hash(e)
assert hash1 == hash2
def test_different_exception_types_differ(self):
try:
raise ValueError("x")
except ValueError as e1:
hash1 = _stack_hash(e1)
try:
raise TypeError("x")
except TypeError as e2:
hash2 = _stack_hash(e2)
assert hash1 != hash2
def test_hash_is_hex_string(self):
try:
raise RuntimeError("test")
except RuntimeError as e:
h = _stack_hash(e)
assert len(h) == 16
assert all(c in "0123456789abcdef" for c in h)
class TestIsDuplicate:
"""Test deduplication logic."""
def setup_method(self):
_dedup_cache.clear()
def test_first_occurrence_not_duplicate(self):
assert _is_duplicate("hash_abc") is False
def test_second_occurrence_is_duplicate(self):
_is_duplicate("hash_dup")
assert _is_duplicate("hash_dup") is True
def test_different_hashes_not_duplicates(self):
_is_duplicate("hash_1")
assert _is_duplicate("hash_2") is False
def teardown_method(self):
_dedup_cache.clear()
class TestGetGitContext:
"""Test _get_git_context."""
def test_returns_dict_with_branch_and_commit(self):
"""Git context always returns a dict with branch and commit keys."""
ctx = _get_git_context()
assert "branch" in ctx
assert "commit" in ctx
assert isinstance(ctx["branch"], str)
assert isinstance(ctx["commit"], str)
class TestCaptureError:
"""Test the main capture_error function."""
def setup_method(self):
_dedup_cache.clear()
def test_duplicate_returns_none(self):
"""Second call with same exception is deduplicated."""
try:
_make_exception()
except ValueError as e:
# First call
capture_error(e, source="test")
# Second call — same hash, within dedup window
result = capture_error(e, source="test")
assert result is None
def test_capture_does_not_crash_on_missing_deps(self):
"""capture_error should never crash even if optional deps are missing."""
_dedup_cache.clear()
try:
raise IOError("graceful test")
except IOError as e:
# Should not raise even though swarm/event_log don't exist
capture_error(e, source="graceful")
def test_capture_with_context_does_not_crash(self):
"""capture_error with context dict should not crash."""
_dedup_cache.clear()
try:
raise RuntimeError("context test")
except RuntimeError as e:
capture_error(e, source="test_module", context={"path": "/api/foo"})
def teardown_method(self):
_dedup_cache.clear()

View File

@@ -0,0 +1,193 @@
"""Tests for the event broadcaster (infrastructure.events.broadcaster)."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from dataclasses import dataclass
from enum import Enum
from infrastructure.events.broadcaster import (
EventBroadcaster,
event_broadcaster,
get_event_icon,
get_event_label,
format_event_for_display,
EVENT_ICONS,
EVENT_LABELS,
)
# ── Fake EventLogEntry for testing ──────────────────────────────────────────
class FakeEventType(Enum):
TASK_CREATED = "task.created"
TASK_ASSIGNED = "task.assigned"
BID_SUBMITTED = "bid.submitted"
AGENT_JOINED = "agent.joined"
SYSTEM_INFO = "system.info"
@dataclass
class FakeEventLogEntry:
id: str = "evt-abc123"
event_type: FakeEventType = FakeEventType.TASK_CREATED
source: str = "test"
task_id: str = "task-1"
agent_id: str = "agent-1"
timestamp: str = "2026-03-06T12:00:00Z"
data: dict = None
def __post_init__(self):
if self.data is None:
self.data = {}
class TestEventBroadcaster:
"""Test EventBroadcaster class."""
def test_init(self):
b = EventBroadcaster()
assert b._ws_manager is None
async def test_broadcast_no_ws_manager(self):
b = EventBroadcaster()
# _get_ws_manager returns None => returns 0
count = await b.broadcast(FakeEventLogEntry())
assert count == 0
async def test_broadcast_with_ws_manager(self):
b = EventBroadcaster()
mock_ws = MagicMock()
mock_ws.broadcast_json = AsyncMock(return_value=3)
b._ws_manager = mock_ws
event = FakeEventLogEntry()
count = await b.broadcast(event)
assert count == 3
mock_ws.broadcast_json.assert_awaited_once()
# Verify payload structure
payload = mock_ws.broadcast_json.call_args[0][0]
assert payload["type"] == "event"
assert payload["payload"]["id"] == "evt-abc123"
assert payload["payload"]["event_type"] == "task.created"
async def test_broadcast_ws_error_returns_zero(self):
b = EventBroadcaster()
mock_ws = MagicMock()
mock_ws.broadcast_json = AsyncMock(side_effect=RuntimeError("ws down"))
b._ws_manager = mock_ws
count = await b.broadcast(FakeEventLogEntry())
assert count == 0
def test_broadcast_sync_no_loop(self):
"""broadcast_sync should not crash when no event loop is running."""
b = EventBroadcaster()
# This should silently pass (no event loop)
b.broadcast_sync(FakeEventLogEntry())
class TestEventIcons:
"""Test icon/label lookup functions."""
def test_known_icon(self):
assert get_event_icon("task.created") == "📝"
assert get_event_icon("agent.joined") == "🟢"
def test_unknown_icon_returns_bullet(self):
assert get_event_icon("nonexistent") == ""
def test_known_label(self):
assert get_event_label("task.created") == "New task"
assert get_event_label("task.failed") == "Task failed"
def test_unknown_label_returns_type(self):
assert get_event_label("custom.event") == "custom.event"
def test_all_icons_have_labels(self):
"""Every icon key should also have a label."""
for key in EVENT_ICONS:
assert key in EVENT_LABELS, f"Missing label for icon key: {key}"
class TestFormatEventForDisplay:
"""Test format_event_for_display helper."""
def test_task_created_truncates_description(self):
event = FakeEventLogEntry(
event_type=FakeEventType.TASK_CREATED,
data={"description": "A" * 100},
)
result = format_event_for_display(event)
assert result["description"].endswith("...")
assert len(result["description"]) <= 63
def test_task_created_short_description(self):
event = FakeEventLogEntry(
event_type=FakeEventType.TASK_CREATED,
data={"description": "Short task"},
)
result = format_event_for_display(event)
assert result["description"] == "Short task"
def test_task_assigned(self):
event = FakeEventLogEntry(
event_type=FakeEventType.TASK_ASSIGNED,
agent_id="agent-12345678-long",
data={"bid_sats": 500},
)
result = format_event_for_display(event)
assert "agent-12" in result["description"]
assert "500 sats" in result["description"]
def test_bid_submitted(self):
event = FakeEventLogEntry(
event_type=FakeEventType.BID_SUBMITTED,
data={"bid_sats": 250},
)
result = format_event_for_display(event)
assert "250 sats" in result["description"]
def test_agent_joined_with_persona(self):
event = FakeEventLogEntry(
event_type=FakeEventType.AGENT_JOINED,
data={"persona_id": "forge"},
)
result = format_event_for_display(event)
assert "forge" in result["description"]
def test_agent_joined_no_persona(self):
event = FakeEventLogEntry(
event_type=FakeEventType.AGENT_JOINED,
data={},
)
result = format_event_for_display(event)
assert result["description"] == "New agent"
def test_generic_event_with_message(self):
event = FakeEventLogEntry(
event_type=FakeEventType.SYSTEM_INFO,
data={"message": "All systems go"},
)
result = format_event_for_display(event)
assert result["description"] == "All systems go"
def test_generic_event_no_data(self):
event = FakeEventLogEntry(
event_type=FakeEventType.SYSTEM_INFO,
data={},
)
result = format_event_for_display(event)
assert result["description"] == ""
def test_output_structure(self):
event = FakeEventLogEntry()
result = format_event_for_display(event)
assert "id" in result
assert "icon" in result
assert "label" in result
assert "type" in result
assert "source" in result
assert "timestamp" in result
assert "time_short" in result
assert result["time_short"] == "12:00:00"

View File

@@ -0,0 +1,231 @@
"""Tests for the async event bus (infrastructure.events.bus)."""
import asyncio
import pytest
from infrastructure.events.bus import EventBus, Event, emit, on, event_bus
class TestEvent:
"""Test Event dataclass."""
def test_event_defaults(self):
e = Event(type="test.event", source="unit_test")
assert e.type == "test.event"
assert e.source == "unit_test"
assert e.data == {}
assert e.timestamp # auto-generated
assert e.id.startswith("evt_")
def test_event_custom_data(self):
e = Event(type="a.b", source="s", data={"key": "val"}, id="custom-id")
assert e.data == {"key": "val"}
assert e.id == "custom-id"
class TestEventBus:
"""Test EventBus subscribe/publish/history."""
def _fresh_bus(self) -> EventBus:
return EventBus()
# ── subscribe + publish ──────────────────────────────────────────────
async def test_exact_match_subscribe(self):
bus = self._fresh_bus()
received = []
@bus.subscribe("task.created")
async def handler(event: Event):
received.append(event)
count = await bus.publish(Event(type="task.created", source="test"))
assert count == 1
assert len(received) == 1
assert received[0].type == "task.created"
async def test_wildcard_subscribe(self):
bus = self._fresh_bus()
received = []
@bus.subscribe("agent.*")
async def handler(event: Event):
received.append(event)
await bus.publish(Event(type="agent.joined", source="test"))
await bus.publish(Event(type="agent.left", source="test"))
await bus.publish(Event(type="task.created", source="test")) # should NOT match
assert len(received) == 2
async def test_star_subscribes_to_all(self):
bus = self._fresh_bus()
received = []
@bus.subscribe("*")
async def handler(event: Event):
received.append(event)
await bus.publish(Event(type="anything.here", source="test"))
await bus.publish(Event(type="x", source="test"))
assert len(received) == 2
async def test_no_subscribers_returns_zero(self):
bus = self._fresh_bus()
count = await bus.publish(Event(type="orphan.event", source="test"))
assert count == 0
async def test_multiple_handlers_same_pattern(self):
bus = self._fresh_bus()
calls = {"a": 0, "b": 0}
@bus.subscribe("foo.bar")
async def handler_a(event):
calls["a"] += 1
@bus.subscribe("foo.bar")
async def handler_b(event):
calls["b"] += 1
await bus.publish(Event(type="foo.bar", source="test"))
assert calls["a"] == 1
assert calls["b"] == 1
# ── unsubscribe ──────────────────────────────────────────────────────
async def test_unsubscribe(self):
bus = self._fresh_bus()
received = []
@bus.subscribe("x.y")
async def handler(event):
received.append(event)
ok = bus.unsubscribe("x.y", handler)
assert ok is True
await bus.publish(Event(type="x.y", source="test"))
assert len(received) == 0
async def test_unsubscribe_nonexistent_pattern(self):
bus = self._fresh_bus()
async def dummy(event):
pass
assert bus.unsubscribe("nope", dummy) is False
async def test_unsubscribe_wrong_handler(self):
bus = self._fresh_bus()
@bus.subscribe("a.b")
async def handler_a(event):
pass
async def handler_b(event):
pass
assert bus.unsubscribe("a.b", handler_b) is False
# ── error handling ───────────────────────────────────────────────────
async def test_handler_error_does_not_break_other_handlers(self):
bus = self._fresh_bus()
received = []
@bus.subscribe("err.test")
async def bad_handler(event):
raise ValueError("boom")
@bus.subscribe("err.test")
async def good_handler(event):
received.append(event)
count = await bus.publish(Event(type="err.test", source="test"))
assert count == 2 # both were invoked
assert len(received) == 1 # good_handler still ran
# ── history ──────────────────────────────────────────────────────────
async def test_history_stores_events(self):
bus = self._fresh_bus()
await bus.publish(Event(type="h.a", source="s"))
await bus.publish(Event(type="h.b", source="s"))
history = bus.get_history()
assert len(history) == 2
async def test_history_filter_by_type(self):
bus = self._fresh_bus()
await bus.publish(Event(type="h.a", source="s"))
await bus.publish(Event(type="h.b", source="s"))
assert len(bus.get_history(event_type="h.a")) == 1
async def test_history_filter_by_source(self):
bus = self._fresh_bus()
await bus.publish(Event(type="h.a", source="x"))
await bus.publish(Event(type="h.b", source="y"))
assert len(bus.get_history(source="x")) == 1
async def test_history_limit(self):
bus = self._fresh_bus()
for i in range(5):
await bus.publish(Event(type="h.x", source="s"))
assert len(bus.get_history(limit=3)) == 3
async def test_history_max_cap(self):
bus = self._fresh_bus()
bus._max_history = 10
for i in range(15):
await bus.publish(Event(type="cap", source="s"))
assert len(bus._history) == 10
async def test_clear_history(self):
bus = self._fresh_bus()
await bus.publish(Event(type="x", source="s"))
bus.clear_history()
assert len(bus.get_history()) == 0
# ── pattern matching ─────────────────────────────────────────────────
def test_match_exact(self):
bus = self._fresh_bus()
assert bus._match_pattern("a.b.c", "a.b.c") is True
assert bus._match_pattern("a.b.c", "a.b.d") is False
def test_match_wildcard(self):
bus = self._fresh_bus()
assert bus._match_pattern("agent.joined", "agent.*") is True
assert bus._match_pattern("agent.left", "agent.*") is True
assert bus._match_pattern("task.created", "agent.*") is False
def test_match_star(self):
bus = self._fresh_bus()
assert bus._match_pattern("anything", "*") is True
class TestConvenienceFunctions:
"""Test module-level emit() and on() helpers."""
async def test_emit(self):
# Clear singleton history first
event_bus.clear_history()
event_bus._subscribers.clear()
received = []
@on("conv.test")
async def handler(event):
received.append(event)
count = await emit("conv.test", "unit", {"foo": "bar"})
assert count == 1
assert received[0].data == {"foo": "bar"}
# Cleanup
event_bus._subscribers.clear()
event_bus.clear_history()

View File

@@ -9,6 +9,11 @@ TEST_PROJECT_DIR = Path("/home/ubuntu/test-sovereign-stack")
TEST_VAULT_DIR = TEST_PROJECT_DIR / "TimmyVault"
SETUP_SCRIPT_PATH = Path("/home/ubuntu/setup_timmy.sh")
pytestmark = pytest.mark.skipif(
not SETUP_SCRIPT_PATH.exists(),
reason=f"Setup script not found at {SETUP_SCRIPT_PATH}",
)
@pytest.fixture(scope="module", autouse=True)
def cleanup_test_env():
"""Ensure a clean environment before and after tests."""

View File

@@ -0,0 +1,256 @@
"""Tests for timmy.agents.timmy — orchestrator, personas, context building."""
import sys
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
from pathlib import Path
# Ensure mcp.registry stub with tool_registry exists before importing agents
if "mcp" not in sys.modules:
_mock_mcp = MagicMock()
_mock_registry_mod = MagicMock()
_mock_tool_reg = MagicMock()
_mock_tool_reg.get_handler.return_value = None
_mock_registry_mod.tool_registry = _mock_tool_reg
sys.modules["mcp"] = _mock_mcp
sys.modules["mcp.registry"] = _mock_registry_mod
from timmy.agents.timmy import (
_load_hands_async,
build_timmy_context_sync,
build_timmy_context_async,
format_timmy_prompt,
TimmyOrchestrator,
create_timmy_swarm,
_PERSONAS,
ORCHESTRATOR_PROMPT_BASE,
)
class TestLoadHandsAsync:
"""Test _load_hands_async."""
async def test_returns_empty_list(self):
result = await _load_hands_async()
assert result == []
class TestBuildContext:
"""Test context building functions."""
@patch("timmy.agents.timmy.settings")
def test_build_context_sync_graceful_failures(self, mock_settings):
mock_settings.repo_root = "/nonexistent"
ctx = build_timmy_context_sync()
assert "timestamp" in ctx
assert isinstance(ctx["agents"], list)
assert isinstance(ctx["hands"], list)
# Git log should fall back gracefully
assert isinstance(ctx["git_log"], str)
# Memory should fall back gracefully
assert isinstance(ctx["memory"], str)
@patch("timmy.agents.timmy.settings")
async def test_build_context_async(self, mock_settings):
mock_settings.repo_root = "/nonexistent"
ctx = await build_timmy_context_async()
assert ctx["hands"] == []
@patch("timmy.agents.timmy.settings")
def test_build_context_reads_memory_file(self, mock_settings, tmp_path):
memory_file = tmp_path / "MEMORY.md"
memory_file.write_text("# Important memories\nRemember this.")
mock_settings.repo_root = str(tmp_path)
ctx = build_timmy_context_sync()
assert "Important memories" in ctx["memory"]
class TestFormatPrompt:
"""Test format_timmy_prompt."""
def test_inserts_context_block(self):
base = "Line one.\nLine two."
ctx = {
"timestamp": "2026-03-06T00:00:00Z",
"repo_root": "/home/user/project",
"git_log": "abc123 initial commit",
"agents": [],
"hands": [],
"memory": "some memory",
}
result = format_timmy_prompt(base, ctx)
assert "Line one." in result
assert "Line two." in result
assert "abc123 initial commit" in result
assert "some memory" in result
def test_agents_list_formatted(self):
ctx = {
"timestamp": "now",
"repo_root": "/tmp",
"git_log": "",
"agents": [
{"name": "Forge", "capabilities": "code", "status": "ready"},
{"name": "Seer", "capabilities": "research", "status": "ready"},
],
"hands": [],
"memory": "",
}
result = format_timmy_prompt("Base.", ctx)
assert "Forge" in result
assert "Seer" in result
def test_hands_list_formatted(self):
ctx = {
"timestamp": "now",
"repo_root": "/tmp",
"git_log": "",
"agents": [],
"hands": [
{"name": "backup", "schedule": "daily", "enabled": True},
],
"memory": "",
}
result = format_timmy_prompt("Base.", ctx)
assert "backup" in result
assert "enabled" in result
def test_repo_root_placeholder_replaced(self):
ctx = {
"timestamp": "now",
"repo_root": "/my/repo",
"git_log": "",
"agents": [],
"hands": [],
"memory": "",
}
result = format_timmy_prompt("Root is {REPO_ROOT}.", ctx)
assert "/my/repo" in result
assert "{REPO_ROOT}" not in result
class TestExtractAgent:
"""Test TimmyOrchestrator._extract_agent static method."""
def test_extracts_known_agents(self):
assert TimmyOrchestrator._extract_agent("Primary Agent: Seer") == "seer"
assert TimmyOrchestrator._extract_agent("Use Forge for this") == "forge"
assert TimmyOrchestrator._extract_agent("Route to quill") == "quill"
assert TimmyOrchestrator._extract_agent("echo can recall") == "echo"
assert TimmyOrchestrator._extract_agent("helm decides") == "helm"
def test_defaults_to_orchestrator(self):
assert TimmyOrchestrator._extract_agent("no agent mentioned") == "orchestrator"
def test_case_insensitive(self):
assert TimmyOrchestrator._extract_agent("Use FORGE") == "forge"
class TestTimmyOrchestrator:
"""Test TimmyOrchestrator init and methods."""
@patch("timmy.agents.timmy.settings")
def test_init(self, mock_settings):
mock_settings.repo_root = "/tmp"
mock_settings.ollama_model = "test"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.telemetry_enabled = False
orch = TimmyOrchestrator()
assert orch.agent_id == "orchestrator"
assert orch.name == "Orchestrator"
assert orch.sub_agents == {}
assert orch._session_initialized is False
@patch("timmy.agents.timmy.settings")
def test_register_sub_agent(self, mock_settings):
mock_settings.repo_root = "/tmp"
mock_settings.ollama_model = "test"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.telemetry_enabled = False
orch = TimmyOrchestrator()
from timmy.agents.base import SubAgent
agent = SubAgent(
agent_id="test-agent",
name="Test",
role="test",
system_prompt="You are a test agent.",
)
orch.register_sub_agent(agent)
assert "test-agent" in orch.sub_agents
@patch("timmy.agents.timmy.settings")
def test_get_swarm_status(self, mock_settings):
mock_settings.repo_root = "/tmp"
mock_settings.ollama_model = "test"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.telemetry_enabled = False
orch = TimmyOrchestrator()
status = orch.get_swarm_status()
assert "orchestrator" in status
assert status["total_agents"] == 1
@patch("timmy.agents.timmy.settings")
def test_get_enhanced_system_prompt_with_attr(self, mock_settings):
mock_settings.repo_root = "/tmp"
mock_settings.ollama_model = "test"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.telemetry_enabled = False
orch = TimmyOrchestrator()
# BaseAgent doesn't store system_prompt as attr; set it manually
orch.system_prompt = "Test prompt.\nWith context."
prompt = orch._get_enhanced_system_prompt()
assert isinstance(prompt, str)
assert "Test prompt." in prompt
class TestCreateTimmySwarm:
"""Test create_timmy_swarm factory."""
@patch("timmy.agents.timmy.settings")
def test_creates_all_personas(self, mock_settings):
mock_settings.repo_root = "/tmp"
mock_settings.ollama_model = "test"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.telemetry_enabled = False
swarm = create_timmy_swarm()
assert len(swarm.sub_agents) == len(_PERSONAS)
assert "seer" in swarm.sub_agents
assert "forge" in swarm.sub_agents
assert "quill" in swarm.sub_agents
assert "echo" in swarm.sub_agents
assert "helm" in swarm.sub_agents
class TestPersonas:
"""Test persona definitions."""
def test_all_personas_have_required_fields(self):
required = {"agent_id", "name", "role", "system_prompt"}
for persona in _PERSONAS:
assert required.issubset(persona.keys()), f"Missing fields in {persona['name']}"
def test_persona_ids_unique(self):
ids = [p["agent_id"] for p in _PERSONAS]
assert len(ids) == len(set(ids))
def test_five_personas(self):
assert len(_PERSONAS) == 5
class TestOrchestratorPrompt:
"""Test the ORCHESTRATOR_PROMPT_BASE constant."""
def test_contains_hard_rules(self):
assert "NEVER fabricate" in ORCHESTRATOR_PROMPT_BASE
assert "do not know" in ORCHESTRATOR_PROMPT_BASE.lower()
def test_contains_repo_root_placeholder(self):
assert "{REPO_ROOT}" in ORCHESTRATOR_PROMPT_BASE

View File

@@ -0,0 +1,182 @@
"""Tests for timmy.approvals — approval workflow and Golden Timmy rule."""
import pytest
from pathlib import Path
from datetime import datetime, timedelta, timezone
from timmy.approvals import (
GOLDEN_TIMMY,
ApprovalItem,
create_item,
list_pending,
list_all,
get_item,
approve,
reject,
expire_old,
_get_conn,
)
@pytest.fixture
def db_path(tmp_path):
"""Fresh SQLite DB for each test."""
return tmp_path / "test_approvals.db"
class TestGoldenTimmy:
"""Test the Golden Timmy constant."""
def test_golden_timmy_default_true(self):
assert GOLDEN_TIMMY is True
class TestApprovalItem:
"""Test ApprovalItem dataclass."""
def test_create(self):
item = ApprovalItem(
id="test-id",
title="Deploy update",
description="Deploy v2.0 to production",
proposed_action="git push && deploy",
impact="high",
created_at=datetime.now(timezone.utc),
status="pending",
)
assert item.id == "test-id"
assert item.status == "pending"
assert item.impact == "high"
class TestCreateItem:
"""Test create_item persistence."""
def test_create_and_retrieve(self, db_path):
item = create_item(
title="Test approval",
description="A test action",
proposed_action="run tests",
impact="low",
db_path=db_path,
)
assert item.id # UUID generated
assert item.status == "pending"
assert item.title == "Test approval"
# Retrieve it
retrieved = get_item(item.id, db_path)
assert retrieved is not None
assert retrieved.id == item.id
assert retrieved.title == "Test approval"
def test_create_default_impact(self, db_path):
item = create_item(
title="T",
description="D",
proposed_action="A",
db_path=db_path,
)
assert item.impact == "low"
class TestListPending:
"""Test list_pending."""
def test_empty_db(self, db_path):
items = list_pending(db_path)
assert items == []
def test_only_pending(self, db_path):
item1 = create_item("A", "D", "A", db_path=db_path)
item2 = create_item("B", "D", "A", db_path=db_path)
approve(item1.id, db_path)
pending = list_pending(db_path)
assert len(pending) == 1
assert pending[0].id == item2.id
def test_ordered_newest_first(self, db_path):
item1 = create_item("First", "D", "A", db_path=db_path)
item2 = create_item("Second", "D", "A", db_path=db_path)
pending = list_pending(db_path)
assert pending[0].title == "Second"
class TestListAll:
"""Test list_all."""
def test_includes_all_statuses(self, db_path):
item1 = create_item("A", "D", "A", db_path=db_path)
item2 = create_item("B", "D", "A", db_path=db_path)
approve(item1.id, db_path)
reject(item2.id, db_path)
all_items = list_all(db_path)
assert len(all_items) == 2
class TestApproveReject:
"""Test approve and reject operations."""
def test_approve_item(self, db_path):
item = create_item("T", "D", "A", db_path=db_path)
result = approve(item.id, db_path)
assert result.status == "approved"
def test_reject_item(self, db_path):
item = create_item("T", "D", "A", db_path=db_path)
result = reject(item.id, db_path)
assert result.status == "rejected"
def test_get_nonexistent_returns_none(self, db_path):
result = get_item("nonexistent-id", db_path)
assert result is None
class TestExpireOld:
"""Test expire_old cleanup."""
def test_expire_removes_old_pending(self, db_path):
# Create item and manually backdate it
item = create_item("Old", "D", "A", db_path=db_path)
conn = _get_conn(db_path)
old_date = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
conn.execute(
"UPDATE approval_items SET created_at = ? WHERE id = ?",
(old_date, item.id),
)
conn.commit()
conn.close()
count = expire_old(db_path)
assert count == 1
# Should be gone
assert get_item(item.id, db_path) is None
def test_expire_keeps_recent(self, db_path):
create_item("Recent", "D", "A", db_path=db_path)
count = expire_old(db_path)
assert count == 0
assert len(list_pending(db_path)) == 1
def test_expire_keeps_approved(self, db_path):
item = create_item("Approved", "D", "A", db_path=db_path)
approve(item.id, db_path)
# Backdate it
conn = _get_conn(db_path)
old_date = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
conn.execute(
"UPDATE approval_items SET created_at = ? WHERE id = ?",
(old_date, item.id),
)
conn.commit()
conn.close()
count = expire_old(db_path)
assert count == 0 # approved items not expired

View File

@@ -0,0 +1,135 @@
"""Tests for timmy.conversation — conversation context and tool routing."""
import pytest
from timmy.conversation import ConversationContext, ConversationManager
class TestConversationContext:
"""Test ConversationContext dataclass."""
def test_defaults(self):
ctx = ConversationContext()
assert ctx.user_name is None
assert ctx.current_topic is None
assert ctx.turn_count == 0
def test_update_topic(self):
ctx = ConversationContext()
ctx.update_topic("Bitcoin price")
assert ctx.current_topic == "Bitcoin price"
assert ctx.turn_count == 1
def test_set_user_name(self):
ctx = ConversationContext()
ctx.set_user_name("Alice")
assert ctx.user_name == "Alice"
def test_context_summary_empty(self):
ctx = ConversationContext()
assert ctx.get_context_summary() == ""
def test_context_summary_full(self):
ctx = ConversationContext()
ctx.set_user_name("Bob")
ctx.update_topic("coding")
summary = ctx.get_context_summary()
assert "Bob" in summary
assert "coding" in summary
assert "1" in summary # turn count
class TestConversationManager:
"""Test ConversationManager."""
def test_get_context_creates_new(self):
mgr = ConversationManager()
ctx = mgr.get_context("session-1")
assert isinstance(ctx, ConversationContext)
def test_get_context_returns_same(self):
mgr = ConversationManager()
ctx1 = mgr.get_context("s1")
ctx2 = mgr.get_context("s1")
assert ctx1 is ctx2
def test_clear_context(self):
mgr = ConversationManager()
mgr.get_context("s1")
mgr.clear_context("s1")
# New context should be fresh
ctx = mgr.get_context("s1")
assert ctx.turn_count == 0
def test_clear_nonexistent(self):
mgr = ConversationManager()
mgr.clear_context("nope") # Should not raise
class TestExtractUserName:
"""Test name extraction from messages."""
def test_my_name_is(self):
mgr = ConversationManager()
assert mgr.extract_user_name("My name is Alice") == "Alice"
def test_i_am(self):
mgr = ConversationManager()
assert mgr.extract_user_name("I am Bob") == "Bob"
def test_call_me(self):
mgr = ConversationManager()
assert mgr.extract_user_name("Call me Charlie") == "Charlie"
def test_im(self):
mgr = ConversationManager()
assert mgr.extract_user_name("I'm Dave") == "Dave"
def test_no_name(self):
mgr = ConversationManager()
assert mgr.extract_user_name("What is the weather?") is None
def test_strips_punctuation(self):
mgr = ConversationManager()
assert mgr.extract_user_name("My name is Eve.") == "Eve"
class TestShouldUseTools:
"""Test tool usage detection."""
def _check(self, message, expected):
mgr = ConversationManager()
ctx = ConversationContext()
assert mgr.should_use_tools(message, ctx) is expected
def test_search_needs_tools(self):
self._check("search for Python tutorials", True)
def test_calculate_needs_tools(self):
self._check("calculate 2 + 2", True)
def test_run_command_needs_tools(self):
self._check("run ls -la", True)
def test_hello_no_tools(self):
self._check("hello", False)
def test_who_are_you_no_tools(self):
self._check("who are you?", False)
def test_thanks_no_tools(self):
self._check("thanks!", False)
def test_simple_question_no_tools(self):
self._check("what is Python?", False)
def test_current_info_needs_tools(self):
self._check("what is the current price of Bitcoin today?", True)
def test_ambiguous_defaults_false(self):
self._check("tell me something interesting", False)
def test_latest_news_needs_tools(self):
self._check("what are the latest updates?", True)
def test_weather_needs_tools(self):
self._check("weather forecast please", True)

View File

@@ -0,0 +1,247 @@
"""Tests for timmy.semantic_memory — semantic search, chunking, indexing."""
import pytest
from pathlib import Path
from unittest.mock import patch
from timmy.semantic_memory import (
_simple_hash_embedding,
embed_text,
cosine_similarity,
SemanticMemory,
MemorySearcher,
MemoryChunk,
memory_search,
_get_embedding_model,
)
class TestSimpleHashEmbedding:
"""Test the fallback hash-based embedding."""
def test_returns_list_of_floats(self):
vec = _simple_hash_embedding("hello world")
assert isinstance(vec, list)
assert len(vec) == 128
assert all(isinstance(x, float) for x in vec)
def test_deterministic(self):
a = _simple_hash_embedding("same text")
b = _simple_hash_embedding("same text")
assert a == b
def test_different_texts_differ(self):
a = _simple_hash_embedding("hello world")
b = _simple_hash_embedding("goodbye universe")
assert a != b
def test_normalized(self):
import math
vec = _simple_hash_embedding("test normalization")
magnitude = math.sqrt(sum(x * x for x in vec))
assert abs(magnitude - 1.0) < 0.01
class TestEmbedText:
"""Test embed_text with fallback."""
def test_returns_embedding(self):
# TIMMY_SKIP_EMBEDDINGS=1 in conftest, so uses fallback
vec = embed_text("test text")
assert isinstance(vec, list)
assert len(vec) > 0
class TestCosineSimilarity:
"""Test cosine_similarity function."""
def test_identical_vectors(self):
v = [1.0, 0.0, 0.0]
assert cosine_similarity(v, v) == pytest.approx(1.0)
def test_orthogonal_vectors(self):
a = [1.0, 0.0]
b = [0.0, 1.0]
assert cosine_similarity(a, b) == pytest.approx(0.0)
def test_opposite_vectors(self):
a = [1.0, 0.0]
b = [-1.0, 0.0]
assert cosine_similarity(a, b) == pytest.approx(-1.0)
def test_zero_vector(self):
a = [0.0, 0.0]
b = [1.0, 0.0]
assert cosine_similarity(a, b) == 0.0
class TestSemanticMemory:
"""Test SemanticMemory class."""
@pytest.fixture
def mem(self, tmp_path):
sm = SemanticMemory()
sm.db_path = tmp_path / "test_semantic.db"
sm.vault_path = tmp_path / "vault"
sm.vault_path.mkdir()
sm._init_db()
return sm
def test_init_creates_db(self, mem):
assert mem.db_path.exists()
def test_split_into_chunks_short(self, mem):
text = "Short paragraph."
chunks = mem._split_into_chunks(text)
assert len(chunks) == 1
assert chunks[0] == "Short paragraph."
def test_split_into_chunks_multiple_paragraphs(self, mem):
text = "First paragraph.\n\nSecond paragraph.\n\nThird paragraph."
chunks = mem._split_into_chunks(text)
assert len(chunks) == 3
def test_split_into_chunks_long_paragraph(self, mem):
text = ". ".join([f"Sentence {i}" for i in range(50)])
chunks = mem._split_into_chunks(text, max_chunk_size=100)
assert len(chunks) > 1
def test_split_empty_text(self, mem):
assert mem._split_into_chunks("") == []
def test_index_file(self, mem):
md_file = mem.vault_path / "test.md"
md_file.write_text("# Title\n\nThis is a test document with enough content to index properly.\n\nAnother paragraph with more content here.")
count = mem.index_file(md_file)
assert count > 0
def test_index_nonexistent_file(self, mem):
count = mem.index_file(Path("/nonexistent/file.md"))
assert count == 0
def test_index_file_skips_already_indexed(self, mem):
md_file = mem.vault_path / "cached.md"
md_file.write_text("# Cached\n\nContent that should only be indexed once if unchanged.")
count1 = mem.index_file(md_file)
count2 = mem.index_file(md_file)
assert count1 > 0
assert count2 == 0 # Already indexed, same hash
def test_index_vault(self, mem):
(mem.vault_path / "a.md").write_text("# File A\n\nContent of file A with some meaningful text here.")
(mem.vault_path / "b.md").write_text("# File B\n\nContent of file B with different meaningful text.")
total = mem.index_vault()
assert total >= 2
def test_index_vault_skips_handoff(self, mem):
"""Verify handoff files are excluded from indexing."""
handoff = mem.vault_path / "last-session-handoff.md"
handoff.write_text("# Handoff\n\nThis should be skipped completely from indexing.")
real = mem.vault_path / "real.md"
real.write_text("# Real\n\nThis should be indexed with enough meaningful content.")
# index_file on the handoff file should NOT skip it
# (that's only index_vault logic), so test the vault logic directly
count = mem.index_file(handoff)
assert count > 0 # index_file indexes everything
# Wipe and re-test via index_vault
import sqlite3
conn = sqlite3.connect(str(mem.db_path))
conn.execute("DELETE FROM chunks")
conn.commit()
conn.close()
mem.index_vault()
conn = sqlite3.connect(str(mem.db_path))
rows = conn.execute("SELECT DISTINCT source FROM chunks").fetchall()
conn.close()
sources = [r[0] for r in rows]
# Only the real file should be indexed, not the handoff
assert any("real" in s for s in sources)
assert not any("last-session-handoff" in s for s in sources)
def test_search_returns_results(self, mem):
md = mem.vault_path / "searchable.md"
md.write_text("# Python\n\nPython is a programming language used for web development and data science.")
mem.index_file(md)
results = mem.search("programming language")
assert len(results) > 0
# Each result is (content, score)
assert isinstance(results[0], tuple)
assert len(results[0]) == 2
def test_search_empty_db(self, mem):
results = mem.search("anything")
assert results == []
def test_get_relevant_context(self, mem):
md = mem.vault_path / "context.md"
md.write_text("# Important\n\nThis is very important information about the system architecture.")
mem.index_file(md)
ctx = mem.get_relevant_context("architecture")
# May or may not match depending on hash-based similarity
assert isinstance(ctx, str)
def test_get_relevant_context_empty(self, mem):
assert mem.get_relevant_context("anything") == ""
def test_stats(self, mem):
stats = mem.stats()
assert "total_chunks" in stats
assert "total_files" in stats
assert stats["total_chunks"] == 0
class TestMemorySearcher:
"""Test MemorySearcher high-level interface."""
@pytest.fixture
def searcher(self, tmp_path):
ms = MemorySearcher()
ms.semantic.db_path = tmp_path / "searcher.db"
ms.semantic.vault_path = tmp_path / "vault"
ms.semantic.vault_path.mkdir()
ms.semantic._init_db()
return ms
def test_search_semantic_tier(self, searcher):
results = searcher.search("test query", tiers=["semantic"])
assert "semantic" in results
def test_search_defaults_to_semantic(self, searcher):
results = searcher.search("test")
assert "semantic" in results
def test_get_context_for_query_empty(self, searcher):
ctx = searcher.get_context_for_query("test")
assert ctx == "" # Empty DB
class TestMemorySearch:
"""Test module-level memory_search function."""
def test_no_results(self):
result = memory_search("something obscure that won't match anything")
assert isinstance(result, str)
def test_none_top_k_handled(self):
result = memory_search("test", top_k=None)
assert isinstance(result, str)
class TestMemoryChunk:
"""Test MemoryChunk dataclass."""
def test_create(self):
chunk = MemoryChunk(
id="c1",
source="/path/to/file.md",
content="chunk text",
embedding=[0.1, 0.2],
created_at="2026-03-06",
)
assert chunk.id == "c1"
assert chunk.content == "chunk text"

View File

@@ -0,0 +1,193 @@
"""Extended tests for timmy.tools — covers tool tracking, stats, and create_* functions."""
import pytest
from unittest.mock import patch, MagicMock
from timmy.tools import (
_track_tool_usage,
get_tool_stats,
calculator,
_TOOL_USAGE,
ToolStats,
AgentTools,
PersonaTools,
create_aider_tool,
)
class TestToolTracking:
"""Test _track_tool_usage and get_tool_stats."""
def setup_method(self):
_TOOL_USAGE.clear()
def test_track_tool_usage(self):
_track_tool_usage("agent-1", "web_search")
assert "agent-1" in _TOOL_USAGE
assert len(_TOOL_USAGE["agent-1"]) == 1
assert _TOOL_USAGE["agent-1"][0]["tool"] == "web_search"
assert _TOOL_USAGE["agent-1"][0]["success"] is True
def test_track_multiple_calls(self):
_track_tool_usage("agent-1", "tool_a")
_track_tool_usage("agent-1", "tool_b")
_track_tool_usage("agent-1", "tool_a", success=False)
assert len(_TOOL_USAGE["agent-1"]) == 3
def test_get_tool_stats_specific_agent(self):
_track_tool_usage("agent-x", "read_file")
_track_tool_usage("agent-x", "write_file")
stats = get_tool_stats("agent-x")
assert stats["agent_id"] == "agent-x"
assert stats["total_calls"] == 2
assert set(stats["tools_used"]) == {"read_file", "write_file"}
def test_get_tool_stats_no_data(self):
stats = get_tool_stats("nonexistent")
assert stats["total_calls"] == 0
assert stats["tools_used"] == []
def test_get_tool_stats_all_agents(self):
_track_tool_usage("a1", "t1")
_track_tool_usage("a2", "t2")
_track_tool_usage("a2", "t3")
stats = get_tool_stats()
assert "a1" in stats
assert stats["a1"]["total_calls"] == 1
assert stats["a2"]["total_calls"] == 2
def test_recent_calls_capped_at_10(self):
for i in range(15):
_track_tool_usage("agent-y", f"tool_{i}")
stats = get_tool_stats("agent-y")
assert len(stats["recent_calls"]) == 10
def teardown_method(self):
_TOOL_USAGE.clear()
class TestToolStats:
"""Test ToolStats dataclass."""
def test_defaults(self):
ts = ToolStats(tool_name="calc")
assert ts.call_count == 0
assert ts.last_used is None
assert ts.errors == 0
class TestAgentTools:
"""Test AgentTools dataclass and backward compat alias."""
def test_persona_tools_alias(self):
assert PersonaTools is AgentTools
class TestCalculatorExtended:
"""Extended tests for the calculator tool."""
def test_division(self):
assert calculator("10 / 3") == str(10 / 3)
def test_exponents(self):
assert calculator("2**10") == "1024"
def test_math_functions(self):
import math
assert calculator("math.sqrt(144)") == "12.0"
assert calculator("math.pi") == str(math.pi)
assert calculator("math.log(100, 10)") == str(math.log(100, 10))
def test_builtins_blocked(self):
result = calculator("__import__('os').system('ls')")
assert "Error" in result
def test_abs_allowed(self):
assert calculator("abs(-5)") == "5"
def test_round_allowed(self):
assert calculator("round(3.14159, 2)") == "3.14"
def test_min_max_allowed(self):
assert calculator("min(1, 2, 3)") == "1"
assert calculator("max(1, 2, 3)") == "3"
def test_invalid_expression(self):
result = calculator("not valid python")
assert "Error" in result
def test_division_by_zero(self):
result = calculator("1/0")
assert "Error" in result
class TestCreateToolFunctions:
"""Test that create_*_tools functions check availability."""
def test_create_research_tools_no_agno(self):
with patch("timmy.tools._AGNO_TOOLS_AVAILABLE", False):
with patch("timmy.tools._ImportError", ImportError("no agno")):
with pytest.raises(ImportError):
from timmy.tools import create_research_tools
create_research_tools()
def test_create_code_tools_no_agno(self):
with patch("timmy.tools._AGNO_TOOLS_AVAILABLE", False):
with patch("timmy.tools._ImportError", ImportError("no agno")):
with pytest.raises(ImportError):
from timmy.tools import create_code_tools
create_code_tools()
def test_create_data_tools_no_agno(self):
with patch("timmy.tools._AGNO_TOOLS_AVAILABLE", False):
with patch("timmy.tools._ImportError", ImportError("no agno")):
with pytest.raises(ImportError):
from timmy.tools import create_data_tools
create_data_tools()
def test_create_writing_tools_no_agno(self):
with patch("timmy.tools._AGNO_TOOLS_AVAILABLE", False):
with patch("timmy.tools._ImportError", ImportError("no agno")):
with pytest.raises(ImportError):
from timmy.tools import create_writing_tools
create_writing_tools()
class TestAiderTool:
"""Test AiderTool created by create_aider_tool."""
def test_create_aider_tool(self, tmp_path):
tool = create_aider_tool(tmp_path)
assert hasattr(tool, "run_aider")
assert tool.base_dir == tmp_path
@patch("subprocess.run")
def test_aider_success(self, mock_run, tmp_path):
tool = create_aider_tool(tmp_path)
mock_run.return_value = MagicMock(returncode=0, stdout="Changes applied")
result = tool.run_aider("add fibonacci function")
assert "Changes applied" in result
@patch("subprocess.run")
def test_aider_error(self, mock_run, tmp_path):
tool = create_aider_tool(tmp_path)
mock_run.return_value = MagicMock(returncode=1, stderr="something broke")
result = tool.run_aider("bad prompt")
assert "error" in result.lower()
@patch("subprocess.run", side_effect=FileNotFoundError)
def test_aider_not_installed(self, mock_run, tmp_path):
tool = create_aider_tool(tmp_path)
result = tool.run_aider("test")
assert "not installed" in result.lower()
@patch("subprocess.run")
def test_aider_timeout(self, mock_run, tmp_path):
import subprocess
mock_run.side_effect = subprocess.TimeoutExpired(cmd="aider", timeout=120)
tool = create_aider_tool(tmp_path)
result = tool.run_aider("slow task")
assert "timed out" in result.lower()

12
tox.ini
View File

@@ -7,13 +7,17 @@ allowlist_externals = poetry
commands_pre = poetry install --with dev --quiet
[testenv:unit]
description = Fast unit + integration tests (no Ollama, no external services)
commands = poetry run pytest tests/ -q --tb=short -m "not ollama and not docker and not selenium and not external_api"
description = Fast unit tests (no I/O, no external services)
commands = poetry run pytest tests/ -q --tb=short -m "unit and not ollama and not docker and not selenium and not external_api"
[testenv:integration]
description = Same as unit — alias for CI pipelines
commands = poetry run pytest tests/ -q --tb=short -m "not ollama and not docker and not selenium and not external_api"
description = Integration tests (may use SQLite, but no external services)
commands = poetry run pytest tests/ -q --tb=short -m "integration and not ollama and not docker and not selenium and not external_api"
[testenv:ollama]
description = Live LLM tests via Ollama (requires Ollama running with a tiny model)
commands = poetry run pytest tests/ -q --tb=short -m ollama --timeout=120
[testenv:coverage]
description = Run all tests with coverage reporting
commands = poetry run pytest tests/ -q --tb=short --cov=src --cov-report=term-missing --cov-fail-under=73 -p no:xdist -m "not ollama and not docker and not selenium and not external_api"