WYL-42: Python skeleton + in_review watcher loop

Minimum viable structure for the Tool-MAD coordinator:
- coordinator.config: env-loaded Config dataclass, writes state to ~/.coordinator/
- coordinator.multica_client: thin requests wrapper for issues/comments/agents
- coordinator.state: flat-json SeenState tracking issue_id -> last_seen_updated_at
- coordinator.__main__: watch_loop() that polls in_review and logs candidates
- README.md: why this exists + how to run

v0 only detects in_review transitions; convening debate rounds is WYL-45.
Dependencies: stdlib + requests (nothing else until a working v1 ships).
This commit is contained in:
2026-04-15 23:04:06 +02:00
parent 3935102d96
commit 6da434039c
8 changed files with 361 additions and 1 deletions
+8
View File
@@ -0,0 +1,8 @@
__pycache__/
*.py[cod]
*.egg-info/
.venv/
venv/
build/
dist/
.coordinator/
+46 -1
View File
@@ -1,3 +1,48 @@
# coordinator # coordinator
Tool-MAD middle-management layer for multica: watches in_review transitions, convenes debate rounds, actions verdicts. Tool-MAD middle-management layer for multica.
## Why this exists
Multica's agents produce work and self-set issue status to `in_review`. Nothing in multica ever looks at that state — it's a dead end. Reviewers, when mentioned manually, tend to catch surface issues (length, structure) and miss semantic ones (scope drift, fabricated experiments, answering the wrong question).
This daemon is the missing middle-management layer. When an issue transitions to `in_review`, it:
1. Convenes a **debate round**: posts one comment on the issue mentioning a fixed set of debaters, each given a role-specific evidence-gathering prompt (e.g. Senior Developer greps the committed code for LLM API calls; Code Reviewer diffs the described method against the source; Project Manager Senior checks scope satisfaction against the original description).
2. **Waits** for every debater to reply (up to a timeout).
3. Posts a second comment mentioning the **judge** (Reality Checker), with the assembled debater transcript + the original issue body, and a hardcoded decision rule.
4. **Parses** the judge's structured verdict (`VERDICT: ACCEPT` or `VERDICT: REJECT\n- R<n>: <failure>`) and acts:
- `ACCEPT` → PUT status `done`, post acceptance summary
- `REJECT` → PUT status `in_progress`, post rejection listing every failure, re-trigger the original assignee
The pattern is **Tool-MAD (Multi-Agent Debate with heterogeneous tool augmentation)**. Reference: [arxiv 2601.04742](https://arxiv.org/html/2601.04742v1).
## Why debaters catch what a single judge misses
Each debater runs on a different agent (different runtime, different tool access, different role prompt) and is forced to ground its argument in a specific tool's output — not in its own recollection of the text. Example: if the question is "does this paper describe real LLM agent experiments", Senior Developer's grep for `anthropic|openai|ollama|model=` in the committed code either returns hits or it doesn't — that's a hard fact, not an opinion. The judge reads 4 grounded arguments and applies the decision rule "any debater reporting evidence of scope drift ⇒ REJECT."
A naive LLM-as-a-judge reviewer reads the paper and scores it on surface dimensions. An agent-as-judge driven by debaters catches the underlying substitution. See WYL-41 for the live failure case that motivated this build.
## Run
```bash
pip install -e .
cat > ~/.coordinator/env <<'EOF'
COORDINATOR_SERVER_URL=http://localhost:8089
COORDINATOR_WORKSPACE_ID=<wid>
COORDINATOR_TOKEN=<coordinator-member-pat>
EOF
chmod 600 ~/.coordinator/env
coordinator
```
Logs go to stderr and to `~/.coordinator/coordinator.log`. State file is `~/.coordinator/seen.json`.
## Status
- [x] WYL-42 skeleton + watcher (this commit)
- [ ] WYL-43 dedicated admin PAT
- [ ] WYL-44 hook watcher to real round trigger
- [ ] WYL-45 debate round orchestration
- [ ] WYL-46 verdict parser + action executor
- [ ] WYL-47 dry-run against WYL-41
+21
View File
@@ -0,0 +1,21 @@
[project]
name = "coordinator"
version = "0.1.0"
description = "Tool-MAD middle-management layer for multica. Watches for in_review transitions, convenes debate rounds, and actions judge verdicts."
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
# v1 intentionally depends only on stdlib + requests. No async, no frameworks.
# If this list grows past 3 items before a working v1 is shipped, something is wrong.
"requests>=2.32",
]
[project.scripts]
coordinator = "coordinator.__main__:main"
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
+3
View File
@@ -0,0 +1,3 @@
"""Tool-MAD middle-management layer for multica."""
__version__ = "0.1.0"
+74
View File
@@ -0,0 +1,74 @@
"""Coordinator daemon entrypoint.
For v0 this only does the watcher loop — debate orchestration and verdict handling
land in WYL-45 and WYL-46. Running it now will just tail the `in_review` list and
log what would have triggered a round.
"""
from __future__ import annotations
import logging
import sys
import time
from coordinator.config import Config
from coordinator.multica_client import MulticaClient
from coordinator.state import SeenState
def _setup_logging(log_file) -> logging.Logger:
logger = logging.getLogger("coordinator")
logger.setLevel(logging.INFO)
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s", "%Y-%m-%dT%H:%M:%S")
stderr = logging.StreamHandler(sys.stderr)
stderr.setFormatter(fmt)
logger.addHandler(stderr)
try:
fh = logging.FileHandler(log_file)
fh.setFormatter(fmt)
logger.addHandler(fh)
except OSError as e:
logger.warning("could not open log file %s: %s", log_file, e)
return logger
def watch_loop(cfg: Config, client: MulticaClient, state: SeenState, logger: logging.Logger) -> None:
logger.info(
"coordinator starting server=%s workspace=%s interval=%ds",
cfg.server_url, cfg.workspace_id, cfg.poll_interval_s,
)
while True:
try:
issues = client.list_issues_by_status("in_review")
except Exception as e:
logger.error("poll failed: %s", e)
time.sleep(cfg.poll_interval_s)
continue
for issue in issues:
iid = issue["id"]
updated = issue.get("updated_at", "")
if state.last_seen(iid) == updated:
continue
logger.info(
"candidate issue %s status=in_review updated_at=%s title=%r",
issue.get("identifier", iid), updated, issue.get("title", ""),
)
# v0: only mark-and-log. Round convening is WYL-45.
state.mark_seen(iid, updated)
time.sleep(cfg.poll_interval_s)
def main() -> None:
cfg = Config.from_env()
logger = _setup_logging(cfg.log_file)
client = MulticaClient(cfg.server_url, cfg.workspace_id, cfg.token)
state = SeenState.load(cfg.seen_file)
try:
watch_loop(cfg, client, state, logger)
except KeyboardInterrupt:
logger.info("shutting down")
if __name__ == "__main__":
main()
+65
View File
@@ -0,0 +1,65 @@
"""Configuration loading for the coordinator daemon.
Everything comes from environment variables (loaded from ~/.coordinator/env at
startup if present). No flags, no config files. Keep it boring.
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
ENV_FILE = Path.home() / ".coordinator" / "env"
STATE_DIR = Path.home() / ".coordinator"
def load_env_file(path: Path = ENV_FILE) -> None:
"""Populate os.environ from a simple KEY=VALUE file. Silent if missing."""
if not path.is_file():
return
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
@dataclass(frozen=True)
class Config:
server_url: str
workspace_id: str
token: str
poll_interval_s: int
round_timeout_s: int
max_concurrent_rounds: int
seen_file: Path
log_file: Path
@classmethod
def from_env(cls) -> "Config":
load_env_file()
required = ("COORDINATOR_SERVER_URL", "COORDINATOR_WORKSPACE_ID", "COORDINATOR_TOKEN")
missing = [k for k in required if not os.environ.get(k)]
if missing:
raise SystemExit(
f"coordinator: missing required env vars: {', '.join(missing)}\n"
f"put them in {ENV_FILE} or export them before running"
)
STATE_DIR.mkdir(parents=True, exist_ok=True)
return cls(
server_url=os.environ["COORDINATOR_SERVER_URL"].rstrip("/"),
workspace_id=os.environ["COORDINATOR_WORKSPACE_ID"],
token=os.environ["COORDINATOR_TOKEN"],
poll_interval_s=int(os.environ.get("COORDINATOR_POLL_INTERVAL_S", "30")),
round_timeout_s=int(os.environ.get("COORDINATOR_ROUND_TIMEOUT_S", "600")),
max_concurrent_rounds=int(os.environ.get("COORDINATOR_MAX_CONCURRENT_ROUNDS", "3")),
seen_file=STATE_DIR / "seen.json",
log_file=STATE_DIR / "coordinator.log",
)
+100
View File
@@ -0,0 +1,100 @@
"""Thin REST wrapper for multica's API. Only the endpoints the coordinator needs."""
from __future__ import annotations
from typing import Any, Iterable
import requests
class MulticaClient:
def __init__(self, server_url: str, workspace_id: str, token: str, timeout: float = 15.0):
self.server_url = server_url.rstrip("/")
self.workspace_id = workspace_id
self.timeout = timeout
self._session = requests.Session()
self._session.headers.update(
{
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
)
# ---- issues ------------------------------------------------------------
def list_issues_by_status(self, status: str, limit: int = 200) -> list[dict[str, Any]]:
"""Return all issues in the workspace matching a given status."""
params = {"workspace_id": self.workspace_id, "status": status, "limit": str(limit)}
r = self._session.get(f"{self.server_url}/api/issues", params=params, timeout=self.timeout)
r.raise_for_status()
payload = r.json()
if isinstance(payload, dict):
return payload.get("issues", [])
return payload # some versions return a bare list
def get_issue(self, issue_id: str) -> dict[str, Any]:
params = {"workspace_id": self.workspace_id}
r = self._session.get(
f"{self.server_url}/api/issues/{issue_id}", params=params, timeout=self.timeout
)
r.raise_for_status()
return r.json()
def update_issue(self, issue_id: str, **fields: Any) -> dict[str, Any]:
params = {"workspace_id": self.workspace_id}
r = self._session.put(
f"{self.server_url}/api/issues/{issue_id}",
params=params,
json=fields,
timeout=self.timeout,
)
r.raise_for_status()
return r.json()
# ---- comments ----------------------------------------------------------
def list_comments(self, issue_id: str) -> list[dict[str, Any]]:
params = {"workspace_id": self.workspace_id}
r = self._session.get(
f"{self.server_url}/api/issues/{issue_id}/comments",
params=params,
timeout=self.timeout,
)
r.raise_for_status()
payload = r.json()
if isinstance(payload, list):
return payload
return payload.get("comments", [])
def post_comment(self, issue_id: str, content: str) -> dict[str, Any]:
params = {"workspace_id": self.workspace_id}
r = self._session.post(
f"{self.server_url}/api/issues/{issue_id}/comments",
params=params,
json={"content": content},
timeout=self.timeout,
)
r.raise_for_status()
return r.json()
# ---- agents ------------------------------------------------------------
def list_agents(self) -> list[dict[str, Any]]:
params = {"workspace_id": self.workspace_id}
r = self._session.get(
f"{self.server_url}/api/agents", params=params, timeout=self.timeout
)
r.raise_for_status()
payload = r.json()
if isinstance(payload, list):
return payload
return payload.get("agents", [])
def find_agents_by_name(self, names: Iterable[str]) -> dict[str, str]:
"""Return {name: agent_id} for every agent in `names` present in the workspace."""
wanted = set(names)
found: dict[str, str] = {}
for agent in self.list_agents():
if agent.get("name") in wanted:
found[agent["name"]] = agent["id"]
return found
+44
View File
@@ -0,0 +1,44 @@
"""Flat-file state tracking. Dumb JSON, human-readable, debuggable."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@dataclass
class SeenState:
"""Tracks which issues the coordinator has already acted on.
Schema:
issues: { issue_id: { last_seen_updated_at: iso8601, round_id: str|null } }
"""
path: Path
issues: dict[str, dict[str, Any]] = field(default_factory=dict)
@classmethod
def load(cls, path: Path) -> "SeenState":
if not path.exists():
return cls(path=path)
try:
data = json.loads(path.read_text())
except (json.JSONDecodeError, OSError):
return cls(path=path)
return cls(path=path, issues=data.get("issues", {}))
def save(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
self.path.write_text(json.dumps({"issues": self.issues}, indent=2, sort_keys=True))
def last_seen(self, issue_id: str) -> str | None:
entry = self.issues.get(issue_id)
return entry.get("last_seen_updated_at") if entry else None
def mark_seen(self, issue_id: str, updated_at: str, round_id: str | None = None) -> None:
self.issues[issue_id] = {
"last_seen_updated_at": updated_at,
"round_id": round_id,
}
self.save()