Implement debate round orchestration (WYL-45)

New module: src/coordinator/orchestrator.py
- DEBATER_NAMES, JUDGE_NAME, DEBATER_PROMPTS, JUDGE_PROMPT_TEMPLATE hardcoded for v1
- Per-debater prompts tell each debater exactly which tool output to ground evidence in
- orchestrate_pending() is the main entry point called from watch_loop
- _start_round(): pending→running, posts debater mention comment, phase→awaiting_debaters
- _advance_awaiting_debaters(): polls for replies, handles timeout with partial evidence,
  posts judge comment, phase→awaiting_judge
- _advance_awaiting_judge(): polls for verdict; RACE FIX — update_issue_status() called
  BEFORE queue.update_status("done") so poll_once can never double-enqueue
- Detection: primary=author_id match, fallback=[{name} response]: content marker (enables tests)
- Restart-safe: phase field persisted on every mutation; in-flight rounds resume correctly

Extended src/coordinator/queue.py:
- Round gains phase, phase_entered_at, coordinator_comment_id, judge_comment_id fields
- DebateQueue.update_phase() and running() added
- All new fields default-empty so existing queue.json files load cleanly

Extended src/coordinator/multica_client.py:
- update_issue_status() convenience wrapper
- create_issue() for integration / smoke tests

Updated src/coordinator/__main__.py:
- _orchestrate_pending stub replaced with real import from orchestrator

Tests:
- tests/test_orchestrator.py: 32 new unit tests covering phase transitions, timeouts,
  race fix ordering, restart resume, full lifecycle
- tests/test_integration.py: @pytest.mark.integration test against real API
- smoke_test.py: standalone end-to-end script; ran against real API, verdict OK

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-15 21:43:17 +00:00
parent 8c9a174ddc
commit 0e44846032
8 changed files with 1809 additions and 15 deletions
+5
View File
@@ -19,3 +19,8 @@ build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
markers = [
"integration: mark test as integration test (requires real API credentials)",
]
+205
View File
@@ -0,0 +1,205 @@
#!/usr/bin/env python3
"""Standalone smoke test for debate round orchestration (WYL-45).
Runs one full round end-to-end against the real multica API, cleans up,
and prints verdict OK or FAIL.
Usage:
python smoke_test.py
Requires env vars (or ~/.coordinator/env):
COORDINATOR_SERVER_URL
COORDINATOR_WORKSPACE_ID
COORDINATOR_TOKEN
"""
from __future__ import annotations
import logging
import os
import sys
import tempfile
from pathlib import Path
def main() -> int:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
stream=sys.stderr,
)
log = logging.getLogger("smoke_test")
# ---- load coordinator package ------------------------------------------
src = Path(__file__).parent / "src"
if src.is_dir():
sys.path.insert(0, str(src))
try:
from coordinator.config import Config, load_env_file
from coordinator.multica_client import MulticaClient
from coordinator.queue import DebateQueue
from coordinator.state import SeenState
from coordinator.__main__ import poll_once
from coordinator.orchestrator import (
DEBATER_NAMES,
_advance_awaiting_debaters,
_advance_awaiting_judge,
_start_round,
)
except ImportError as exc:
print(f"FAIL: cannot import coordinator package: {exc}", file=sys.stderr)
print(" Run 'pip install -e .' in the coordinator directory first.")
return 1
load_env_file()
required = ["COORDINATOR_SERVER_URL", "COORDINATOR_WORKSPACE_ID", "COORDINATOR_TOKEN"]
missing = [k for k in required if not os.environ.get(k)]
if missing:
print(f"FAIL: missing env vars: {', '.join(missing)}", file=sys.stderr)
return 1
try:
cfg = Config.from_env()
except SystemExit as exc:
print(f"FAIL: {exc}", file=sys.stderr)
return 1
client = MulticaClient(cfg.server_url, cfg.workspace_id, cfg.token)
issue_id: str | None = None
failures: list[str] = []
def check(cond: bool, msg: str) -> None:
if not cond:
failures.append(msg)
log.error("ASSERTION FAILED: %s", msg)
with tempfile.TemporaryDirectory() as td:
tmp = Path(td)
state = SeenState.load(tmp / "seen.json")
queue = DebateQueue.load(tmp / "queue.json")
try:
# 1. Create scratch issue
log.info("creating scratch issue...")
issue = client.create_issue(
title="[DEBATE-SMOKE-TEST] WYL-45 smoke — auto-delete",
description=(
"Smoke test issue for WYL-45. Auto-deleted after test.\n\n"
"Commit: https://git.wylab.me/multica/coordinator/commit/smoke000"
),
)
issue_id = issue["id"]
log.info("created issue %s", issue_id)
# 2. Set in_review and enqueue via poll_once
client.update_issue_status(issue_id, "in_review")
poll_once(client, state, queue, log)
check(bool(queue.pending()), "round not enqueued after poll_once")
if not queue.pending():
return 1
round_ = queue.pending()[0]
check(round_.issue_id == issue_id, "wrong issue_id in round")
# 3. Start the round
log.info("starting round...")
_start_round(round_, client, queue, cfg, log)
check(round_.status == "running", f"status={round_.status!r} (want 'running')")
check(round_.phase == "awaiting_debaters",
f"phase={round_.phase!r} (want 'awaiting_debaters')")
check(bool(round_.coordinator_comment_id), "coordinator_comment_id not set")
# Verify comment on real issue
real_comments = client.list_comments(issue_id)
coord_comment = next(
(c for c in real_comments if c.get("id") == round_.coordinator_comment_id),
None,
)
check(coord_comment is not None, "coordinator comment not found in issue")
if coord_comment:
for name in DEBATER_NAMES:
check(name in coord_comment["content"],
f"debater '{name}' not mentioned in coordinator comment")
# 4. Simulate debater responses (content-marker fallback)
log.info("posting fake debater responses...")
for name in DEBATER_NAMES:
client.post_comment(
issue_id,
f"[{name} response]: Smoke-test evidence. "
f"This simulates {name}'s analysis.",
)
# 5. Advance to awaiting_judge
log.info("advancing past debaters...")
_advance_awaiting_debaters(round_, client, queue, cfg, log)
check(round_.phase == "awaiting_judge",
f"phase={round_.phase!r} (want 'awaiting_judge')")
check(bool(round_.judge_comment_id), "judge_comment_id not set")
real_comments = client.list_comments(issue_id)
judge_comment = next(
(c for c in real_comments if c.get("id") == round_.judge_comment_id),
None,
)
check(judge_comment is not None, "judge comment not found in issue")
if judge_comment:
check("Verdict requested" in judge_comment["content"],
"judge comment missing 'Verdict requested'")
# 6. Simulate judge verdict
log.info("posting fake judge verdict...")
client.post_comment(
issue_id,
"VERDICT: ACCEPT\n\nSmoke test passed. Implementation is complete.",
)
# 7. Advance past judge
log.info("processing verdict...")
_advance_awaiting_judge(round_, client, queue, cfg, log)
check(round_.status == "done", f"status={round_.status!r} (want 'done')")
check(round_.phase == "accepted", f"phase={round_.phase!r} (want 'accepted')")
refreshed = client.get_issue(issue_id)
check(refreshed.get("status") == "done",
f"issue status={refreshed.get('status')!r} (want 'done')")
# 8. Race-fix check: poll_once must not double-enqueue OUR issue.
# (Other in_review issues may also be enqueued — that's expected.)
rounds_before = len(queue.rounds)
poll_once(client, state, queue, log)
our_rounds_after = [
r for r in queue.rounds if r.issue_id == issue_id
]
check(
len(our_rounds_after) == 1,
f"double-enqueue: issue {issue_id} appears {len(our_rounds_after)} time(s) "
f"in queue (want 1)",
)
except Exception as exc:
log.exception("unexpected error: %s", exc)
failures.append(f"unexpected exception: {exc}")
finally:
if issue_id:
try:
client.update_issue_status(issue_id, "done")
log.info("cleanup: issue %s → done", issue_id)
except Exception as exc:
log.warning("cleanup failed: %s", exc)
if failures:
print(f"\nFAIL ({len(failures)} assertion(s)):")
for f in failures:
print(f" - {f}")
return 1
print("\nverdict OK")
return 0
if __name__ == "__main__":
sys.exit(main())
+8 -7
View File
@@ -3,8 +3,10 @@
Watcher loop (WYL-44): polls in_review issues every 30 s, detects transitions,
enqueues debate rounds into ~/.coordinator/queue.json.
Debate orchestration (WYL-45) and verdict handling (WYL-46) are stubs here —
they will be filled in by those tasks.
Debate orchestration (WYL-45): orchestrate_pending() handles the full round
lifecycle — debater mentions, evidence polling, judge verdict, issue status update.
Verdict handling (WYL-46) will consume the accepted/rejected phase information.
"""
from __future__ import annotations
@@ -14,6 +16,7 @@ import time
from coordinator.config import Config
from coordinator.multica_client import MulticaClient
from coordinator.orchestrator import orchestrate_pending
from coordinator.queue import DebateQueue
from coordinator.state import SeenState
@@ -35,14 +38,12 @@ def _setup_logging(log_file) -> logging.Logger:
# ---------------------------------------------------------------------------
# Stub hooks for WYL-45 / WYL-46
# Orchestration hook (WYL-45)
# ---------------------------------------------------------------------------
def _orchestrate_pending(queue: DebateQueue, cfg: Config, logger: logging.Logger) -> None:
"""Process pending debate rounds. Implemented in WYL-45."""
pending = queue.pending()
if pending:
logger.debug("orchestrator stub: %d pending round(s) (WYL-45 not yet implemented)", len(pending))
"""Process pending and in-flight debate rounds."""
orchestrate_pending(queue, cfg, logger)
# ---------------------------------------------------------------------------
+20
View File
@@ -51,6 +51,26 @@ class MulticaClient:
r.raise_for_status()
return r.json()
def update_issue_status(self, issue_id: str, status: str) -> dict[str, Any]:
"""Convenience wrapper: update only the issue status."""
return self.update_issue(issue_id, status=status)
def create_issue(
self,
title: str,
description: str = "",
status: str = "todo",
) -> dict[str, Any]:
"""Create a new issue in the workspace (used by integration tests)."""
r = self._session.post(
f"{self.server_url}/api/issues",
params={"workspace_id": self.workspace_id},
json={"title": title, "description": description, "status": status},
timeout=self.timeout,
)
r.raise_for_status()
return r.json()
# ---- comments ----------------------------------------------------------
def list_comments(self, issue_id: str) -> list[dict[str, Any]]:
+577
View File
@@ -0,0 +1,577 @@
"""Debate round orchestration (WYL-45).
Round lifecycle
---------------
pending → _start_round() → running / phase=awaiting_debaters
awaiting_debaters → _advance_awaiting_debaters()
all replied (or timeout) → phase=awaiting_judge
awaiting_judge → _advance_awaiting_judge()
verdict received → issue status updated FIRST,
then round marked done/phase=accepted|rejected
timeout → round marked error, issue left in_review
Restart safety
--------------
DebateQueue persists phase + comment IDs on every mutation. On restart,
running rounds resume from the correct phase without re-posting comments.
Race fix (WYL-44 note)
----------------------
In _advance_awaiting_judge, client.update_issue_status() is called BEFORE
queue.update_status("done"). This guarantees poll_once cannot see
(is_issue_pending_or_running=False AND issue=in_review) at the same time.
"""
from __future__ import annotations
import logging
import re
from datetime import datetime, timezone
from typing import Any
from coordinator.config import Config
from coordinator.multica_client import MulticaClient
from coordinator.queue import DebateQueue, Round
# ---------------------------------------------------------------------------
# Hardcoded v1 constants
# ---------------------------------------------------------------------------
DEBATER_NAMES: list[str] = [
"Senior Developer",
"Code Reviewer",
"AI Engineer",
"Project Manager Senior",
]
JUDGE_NAME: str = "Reality Checker"
# Per-debater evidence prompts.
# Placeholders: {issue_title}, {issue_description}, {commit_url}
# Each prompt tells the debater exactly which tool output to ground its argument in.
DEBATER_PROMPTS: dict[str, str] = {
"Senior Developer": (
"**Your task:** grep the committed code for LLM API calls "
"(`openai`, `anthropic`, `requests.post` to AI endpoints, etc.). Report:\n"
"1. Yes/no — does the code make LLM API calls?\n"
"2. The exact grep command you ran.\n"
"3. The full grep output (or 'no matches').\n\n"
"Max 300 words."
),
"Code Reviewer": (
"**Your task:** diff the committed implementation method-by-method against "
"every requirement in the issue description. For each requirement: state it, "
"then label MET or MISSING with one line of evidence from the diff. "
"Overall confidence: high / medium / low.\n\n"
"Max 300 words."
),
"AI Engineer": (
"**Your task:** ground-check every factual claim in the issue description "
"against the committed code. For each claim: state it, then verify or refute "
"with a direct code reference. Label each VERIFIED or UNVERIFIED.\n\n"
"Max 300 words."
),
"Project Manager Senior": (
"**Your task:** check the committed work against the original acceptance "
"criteria. For each criterion: satisfied (with evidence) or not met. "
"State your recommendation: ACCEPT or REJECT.\n\n"
"Max 300 words."
),
}
JUDGE_PROMPT_TEMPLATE: str = """\
**Verdict requested — {issue_title}**
**Original issue:**
{issue_description}
**Commit:** {commit_url}
---
## Debater evidence
{transcript}
---
Deliver your verdict. Begin your reply with exactly one of:
- `VERDICT: ACCEPT` — implementation satisfies all critical acceptance criteria
- `VERDICT: REJECT` — it does not
Follow with reasoning (max 400 words).\
"""
_COMMIT_URL_RE = re.compile(
r"https?://\S+/commit/[0-9a-f]+", re.IGNORECASE
)
_VERDICT_RE = re.compile(
r"^VERDICT:\s*(ACCEPT|REJECT)", re.MULTILINE | re.IGNORECASE
)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _utcnow() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _parse_dt(iso: str) -> datetime:
return datetime.fromisoformat(iso.replace("Z", "+00:00"))
def _elapsed_s(since_iso: str) -> float:
"""Seconds elapsed since an ISO8601 timestamp. Returns 0 on parse error."""
if not since_iso:
return 0.0
try:
return (datetime.now(timezone.utc) - _parse_dt(since_iso)).total_seconds()
except Exception:
return 0.0
def _find_commit_url(comments: list[dict[str, Any]]) -> str:
"""Scan comments newest-first for a commit URL. Returns '' if not found."""
for comment in reversed(comments):
m = _COMMIT_URL_RE.search(comment.get("content", ""))
if m:
return m.group(0)
return ""
def _parse_verdict(content: str) -> str | None:
"""Return 'ACCEPT', 'REJECT', or None."""
m = _VERDICT_RE.search(content)
return m.group(1).upper() if m else None
def _cutoff_iso(comments: list[dict[str, Any]], after_comment_id: str) -> str:
"""Return the created_at of the comment with the given ID, or '' if not found."""
for c in comments:
if c.get("id") == after_comment_id:
return c.get("created_at", "")
return ""
def _collect_debater_replies(
comments: list[dict[str, Any]],
agent_map: dict[str, str], # name → agent_id
after_comment_id: str,
) -> dict[str, str]:
"""Return {debater_name: reply_content} for debaters who replied.
Detection (in priority order):
1. comment.author_id matches a debater's agent_id
2. comment content starts with ``[{name} response]:`` (fallback for testing)
Only comments posted *after* after_comment_id are considered.
"""
id_to_name: dict[str, str] = {v: k for k, v in agent_map.items()}
cutoff = _cutoff_iso(comments, after_comment_id)
replies: dict[str, str] = {}
for comment in comments:
# Skip the coordinator's own mention comment and anything before it.
if comment.get("id") == after_comment_id:
continue
created = comment.get("created_at", "")
if cutoff and created < cutoff:
continue
content = comment.get("content", "")
author_id = comment.get("author_id", "")
# Primary: agent ID match
if author_id in id_to_name:
name = id_to_name[author_id]
if name not in replies:
replies[name] = content
continue
# Fallback: content-based marker (enables integration / smoke tests)
for name in DEBATER_NAMES:
marker = f"[{name} response]:".lower()
if content.strip().lower().startswith(marker) and name not in replies:
replies[name] = content
return replies
def _collect_judge_reply(
comments: list[dict[str, Any]],
judge_agent_id: str,
after_comment_id: str,
) -> str | None:
"""Return judge's comment content if found after after_comment_id, else None.
Fallback: any comment containing ``VERDICT: ACCEPT|REJECT`` is accepted
(enables smoke / integration tests where the judge is impersonated).
"""
cutoff = _cutoff_iso(comments, after_comment_id)
for comment in comments:
if comment.get("id") == after_comment_id:
continue
created = comment.get("created_at", "")
if cutoff and created < cutoff:
continue
content = comment.get("content", "")
author_id = comment.get("author_id", "")
if judge_agent_id and author_id == judge_agent_id:
return content
# Fallback: content-based verdict detection
if _parse_verdict(content) is not None:
return content
return None
def _build_debater_comment(
agent_map: dict[str, str],
issue_title: str,
issue_description: str,
commit_url: str,
) -> str:
commit_ref = commit_url or "(no commit link found in comments)"
parts = [
f"**Debate round opened — {issue_title}**\n",
f"Commit under review: {commit_ref}\n",
"---",
]
for name in DEBATER_NAMES:
agent_id = agent_map.get(name, "")
mention = (
f"[@{name}](mention://agent/{agent_id})" if agent_id else f"@{name}"
)
prompt = DEBATER_PROMPTS.get(name, "Please provide your evidence analysis.")
snippet = issue_description[:800].strip()
parts.append(
f"\n{mention}\n\n"
f"**Issue:** {issue_title}\n"
f"**Description (excerpt):** {snippet}\n"
f"**Commit:** {commit_ref}\n\n"
f"{prompt}"
)
parts.append("\n---")
return "\n".join(parts)
def _build_judge_comment(
judge_agent_id: str,
issue_title: str,
issue_description: str,
commit_url: str,
transcript: str,
) -> str:
mention = (
f"[@{JUDGE_NAME}](mention://agent/{judge_agent_id})"
if judge_agent_id
else f"@{JUDGE_NAME}"
)
body = JUDGE_PROMPT_TEMPLATE.format(
issue_title=issue_title,
issue_description=issue_description[:2000].strip(),
commit_url=commit_url or "N/A",
transcript=transcript,
)
return f"{mention}\n\n{body}"
# ---------------------------------------------------------------------------
# Phase handlers
# ---------------------------------------------------------------------------
def _start_round(
round_: Round,
client: MulticaClient,
queue: DebateQueue,
cfg: Config,
logger: logging.Logger,
) -> None:
"""pending → running, post debater comment, phase → awaiting_debaters."""
logger.info("starting round %s for %s", round_.round_id, round_.identifier)
# Mark running immediately so poll_once won't double-enqueue on a bump.
queue.update_status(round_.round_id, "running")
try:
issue = client.get_issue(round_.issue_id)
except Exception as exc:
logger.error("round %s: cannot fetch issue %s: %s", round_.round_id, round_.issue_id, exc)
queue.update_status(round_.round_id, "error")
return
issue_title = issue.get("title", round_.title)
issue_description = issue.get("description", "") or ""
try:
all_comments = client.list_comments(round_.issue_id)
except Exception as exc:
logger.warning("round %s: cannot list comments: %s", round_.round_id, exc)
all_comments = []
commit_url = _find_commit_url(all_comments)
try:
agent_map = client.find_agents_by_name(DEBATER_NAMES)
except Exception as exc:
logger.warning("round %s: cannot look up debater agents: %s", round_.round_id, exc)
agent_map = {}
body = _build_debater_comment(agent_map, issue_title, issue_description, commit_url)
try:
posted = client.post_comment(round_.issue_id, body)
coord_cid = posted.get("id", "")
except Exception as exc:
logger.error("round %s: cannot post debater mention comment: %s", round_.round_id, exc)
queue.update_status(round_.round_id, "error")
return
queue.update_phase(
round_.round_id,
"awaiting_debaters",
phase_entered_at=_utcnow(),
coordinator_comment_id=coord_cid,
)
logger.info(
"round %s: debater comment posted (id=%s), awaiting debaters",
round_.round_id, coord_cid,
)
def _advance_awaiting_debaters(
round_: Round,
client: MulticaClient,
queue: DebateQueue,
cfg: Config,
logger: logging.Logger,
) -> None:
"""Poll debater replies → on all-replied or timeout, post judge comment."""
elapsed = _elapsed_s(round_.phase_entered_at)
timed_out = elapsed >= cfg.round_timeout_s
try:
comments = client.list_comments(round_.issue_id)
except Exception as exc:
logger.error("round %s: cannot list comments: %s", round_.round_id, exc)
return
try:
issue = client.get_issue(round_.issue_id)
except Exception as exc:
logger.warning("round %s: cannot fetch issue: %s", round_.round_id, exc)
issue = {}
issue_title = issue.get("title", round_.title)
issue_description = issue.get("description", "") or ""
commit_url = _find_commit_url(comments)
try:
agent_map = client.find_agents_by_name(DEBATER_NAMES)
except Exception as exc:
logger.warning("round %s: cannot look up debater agents: %s", round_.round_id, exc)
agent_map = {}
replies = _collect_debater_replies(
comments, agent_map, round_.coordinator_comment_id
)
missing = [n for n in DEBATER_NAMES if n not in replies]
if missing and not timed_out:
logger.debug(
"round %s: still waiting for debaters %s (%.0fs / %ds)",
round_.round_id, missing, elapsed, cfg.round_timeout_s,
)
return
if missing:
logger.warning(
"round %s: debater timeout after %.0fs — proceeding with partial evidence "
"(missing: %s)",
round_.round_id, elapsed, missing,
)
else:
logger.info(
"round %s: all %d debaters replied, assembling transcript",
round_.round_id, len(DEBATER_NAMES),
)
# Assemble transcript
parts = []
for name in DEBATER_NAMES:
if name in replies:
parts.append(f"### {name}\n\n{replies[name]}")
else:
parts.append(f"### {name}\n\n*(no response — timed out)*")
transcript = "\n\n".join(parts)
try:
judge_map = client.find_agents_by_name([JUDGE_NAME])
judge_agent_id = judge_map.get(JUDGE_NAME, "")
except Exception as exc:
logger.warning("round %s: cannot look up judge agent: %s", round_.round_id, exc)
judge_agent_id = ""
judge_body = _build_judge_comment(
judge_agent_id, issue_title, issue_description, commit_url, transcript
)
try:
posted = client.post_comment(round_.issue_id, judge_body)
judge_cid = posted.get("id", "")
except Exception as exc:
logger.error("round %s: cannot post judge comment: %s", round_.round_id, exc)
return
queue.update_phase(
round_.round_id,
"awaiting_judge",
phase_entered_at=_utcnow(),
judge_comment_id=judge_cid,
)
logger.info(
"round %s: judge comment posted (id=%s), awaiting verdict",
round_.round_id, judge_cid,
)
def _advance_awaiting_judge(
round_: Round,
client: MulticaClient,
queue: DebateQueue,
cfg: Config,
logger: logging.Logger,
) -> None:
"""Poll for judge verdict.
On verdict: update issue status FIRST (race fix), then mark round done.
On timeout: mark round error, leave issue in_review for human escalation.
"""
elapsed = _elapsed_s(round_.phase_entered_at)
if elapsed >= cfg.round_timeout_s:
logger.error(
"round %s: judge timeout after %.0fs — leaving issue in_review for human escalation",
round_.round_id, elapsed,
)
queue.update_phase(round_.round_id, "error")
queue.update_status(round_.round_id, "error")
return
try:
comments = client.list_comments(round_.issue_id)
except Exception as exc:
logger.error("round %s: cannot list comments: %s", round_.round_id, exc)
return
try:
judge_map = client.find_agents_by_name([JUDGE_NAME])
judge_agent_id = judge_map.get(JUDGE_NAME, "")
except Exception as exc:
logger.warning("round %s: cannot look up judge agent: %s", round_.round_id, exc)
judge_agent_id = ""
judge_reply = _collect_judge_reply(
comments, judge_agent_id, round_.judge_comment_id
)
if judge_reply is None:
logger.debug(
"round %s: waiting for judge verdict (%.0fs / %ds)",
round_.round_id, elapsed, cfg.round_timeout_s,
)
return
verdict = _parse_verdict(judge_reply)
if verdict is None:
logger.warning(
"round %s: judge replied but no VERDICT: ACCEPT/REJECT found — treating as REJECT",
round_.round_id,
)
verdict = "REJECT"
new_issue_status = "done" if verdict == "ACCEPT" else "in_progress"
logger.info(
"round %s: verdict=%s → issue status=%s",
round_.round_id, verdict, new_issue_status,
)
# RACE FIX: issue status MUST be updated before marking round done.
# If poll_once runs after this line, the issue is no longer in_review,
# so no double-enqueue is possible regardless of round status.
try:
client.update_issue_status(round_.issue_id, new_issue_status)
except Exception as exc:
logger.error(
"round %s: cannot update issue status — will retry next cycle: %s",
round_.round_id, exc,
)
return # do NOT mark done; retry next cycle
phase = "accepted" if verdict == "ACCEPT" else "rejected"
queue.update_phase(round_.round_id, phase)
queue.update_status(round_.round_id, "done")
logger.info("round %s: complete (phase=%s)", round_.round_id, phase)
def _advance_round(
round_: Round,
client: MulticaClient,
queue: DebateQueue,
cfg: Config,
logger: logging.Logger,
) -> None:
"""Dispatch to the correct phase handler for a running round."""
if round_.phase == "convened":
# Should be caught by _start_round, but handle gracefully on restart.
_start_round(round_, client, queue, cfg, logger)
elif round_.phase == "awaiting_debaters":
_advance_awaiting_debaters(round_, client, queue, cfg, logger)
elif round_.phase == "awaiting_judge":
_advance_awaiting_judge(round_, client, queue, cfg, logger)
elif round_.phase in ("accepted", "rejected", "error"):
# Terminal phase but status still running — fix it.
logger.warning(
"round %s is in terminal phase %r but status=%r — correcting",
round_.round_id, round_.phase, round_.status,
)
final = "done" if round_.phase in ("accepted", "rejected") else "error"
queue.update_status(round_.round_id, final)
else:
logger.error("round %s: unknown phase %r", round_.round_id, round_.phase)
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def orchestrate_pending(
queue: DebateQueue,
cfg: Config,
logger: logging.Logger,
client: MulticaClient | None = None,
) -> None:
"""Process pending and in-flight debate rounds.
Called from watch_loop after each poll_once. Idempotent and restart-safe:
running rounds resume from their persisted phase.
``client`` may be injected for testing; if omitted a real client is created
from cfg.
"""
if client is None:
client = MulticaClient(cfg.server_url, cfg.workspace_id, cfg.token)
# Start any pending rounds (pending → running/awaiting_debaters).
for round_ in list(queue.pending()):
try:
_start_round(round_, client, queue, cfg, logger)
except Exception as exc:
logger.error("error starting round %s: %s", round_.round_id, exc)
# Advance all running rounds (covers newly-started and in-flight).
for round_ in list(queue.running()):
try:
_advance_round(round_, client, queue, cfg, logger)
except Exception as exc:
logger.error("error advancing round %s: %s", round_.round_id, exc)
+42 -3
View File
@@ -8,16 +8,23 @@ Schema (~/.coordinator/queue.json):
{
"round_id": "<uuid>",
"issue_id": "<uuid>",
"identifier": "WYL-42", # human-readable, for logs
"identifier": "WYL-42",
"title": "...",
"enqueued_at": "<iso8601>",
"status": "pending" # pending | running | done | error
"status": "pending",
"phase": "convened",
"phase_entered_at": "",
"coordinator_comment_id": "",
"judge_comment_id": ""
},
...
]
}
Only the watcher appends; only the orchestrator updates status.
status: pending | running | done | error
phase: convened | awaiting_debaters | awaiting_judge | accepted | rejected | error
Only the watcher appends; only the orchestrator updates status/phase.
Completed/errored rounds stay in the file so it stays fully auditable.
"""
from __future__ import annotations
@@ -42,6 +49,10 @@ class Round:
title: str
enqueued_at: str
status: str = "pending" # pending | running | done | error
phase: str = "convened" # convened | awaiting_debaters | awaiting_judge | accepted | rejected | error
phase_entered_at: str = "" # ISO8601 when current phase was entered (for timeout)
coordinator_comment_id: str = "" # ID of debater-mention comment posted by coordinator
judge_comment_id: str = "" # ID of judge-mention comment posted by coordinator
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@@ -55,6 +66,10 @@ class Round:
title=d.get("title", ""),
enqueued_at=d["enqueued_at"],
status=d.get("status", "pending"),
phase=d.get("phase", "convened"),
phase_entered_at=d.get("phase_entered_at", ""),
coordinator_comment_id=d.get("coordinator_comment_id", ""),
judge_comment_id=d.get("judge_comment_id", ""),
)
@@ -113,12 +128,36 @@ class DebateQueue:
return
raise KeyError(f"round {round_id!r} not found in queue")
def update_phase(self, round_id: str, phase: str, **kwargs: str) -> None:
"""Update round phase (and optionally other string fields) in place and persist.
Extra keyword arguments are applied as field updates when the field exists on Round.
Typical usage::
queue.update_phase(rid, "awaiting_debaters",
phase_entered_at="2026-01-01T00:00:00Z",
coordinator_comment_id="abc123")
"""
for r in self.rounds:
if r.round_id == round_id:
r.phase = phase
for k, v in kwargs.items():
if hasattr(r, k):
setattr(r, k, v)
self.save()
return
raise KeyError(f"round {round_id!r} not found in queue")
# ---- read ops ----------------------------------------------------------
def pending(self) -> list[Round]:
"""Return all rounds still waiting to be orchestrated."""
return [r for r in self.rounds if r.status == "pending"]
def running(self) -> list[Round]:
"""Return all rounds currently being orchestrated."""
return [r for r in self.rounds if r.status == "running"]
def is_issue_pending_or_running(self, issue_id: str) -> bool:
"""True if a round for this issue is already in the queue and active."""
return any(
+211
View File
@@ -0,0 +1,211 @@
"""Integration tests for the debate round orchestration.
These tests hit the REAL multica API. They require the coordinator env vars:
COORDINATOR_SERVER_URL e.g. https://multica.example.com
COORDINATOR_WORKSPACE_ID
COORDINATOR_TOKEN
Run with:
python -m pytest tests/test_integration.py -m integration -v
The test creates a scratch issue, exercises the full orchestration lifecycle
with simulated debater/judge responses (posted via the coordinator's own token
using the content-based fallback detector), verifies the result, then cleans up.
"""
from __future__ import annotations
import logging
import os
import time
from pathlib import Path
import pytest
pytestmark = pytest.mark.integration
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def cfg():
"""Load Config from env. Skip module if vars missing."""
required = [
"COORDINATOR_SERVER_URL",
"COORDINATOR_WORKSPACE_ID",
"COORDINATOR_TOKEN",
]
missing = [k for k in required if not os.environ.get(k)]
if missing:
pytest.skip(
f"Integration test requires env vars: {', '.join(missing)}\n"
"Put them in ~/.coordinator/env or export them."
)
# Load env file if present (same as Config.from_env)
from coordinator.config import load_env_file
load_env_file()
from coordinator.config import Config
return Config.from_env()
@pytest.fixture(scope="module")
def client(cfg):
from coordinator.multica_client import MulticaClient
return MulticaClient(cfg.server_url, cfg.workspace_id, cfg.token)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _post_fake_debater_replies(client, issue_id: str):
"""Post content-marker comments for each debater via the coordinator token.
The orchestrator's content-based fallback detector accepts these as valid
debater evidence replies.
"""
from coordinator.orchestrator import DEBATER_NAMES
for name in DEBATER_NAMES:
client.post_comment(
issue_id,
f"[{name} response]: Integration-test evidence for {name}. "
"This comment simulates the agent's evidence analysis.",
)
def _post_fake_judge_verdict(client, issue_id: str, verdict: str = "ACCEPT"):
client.post_comment(
issue_id,
f"VERDICT: {verdict}\n\n"
"Integration test verdict. The implementation satisfies the acceptance criteria.",
)
# ---------------------------------------------------------------------------
# Integration test: full round lifecycle
# ---------------------------------------------------------------------------
@pytest.mark.integration
def test_full_debate_round_lifecycle(cfg, client, tmp_path):
"""End-to-end debate round lifecycle against the real API.
Steps:
1. Create scratch issue, set to in_review
2. poll_once → verify round enqueued
3. _start_round → verify coordinator comment posted on real issue
4. Post simulated debater responses (content-marker fallback)
5. _advance_awaiting_debaters → verify judge comment posted
6. Post simulated judge verdict
7. _advance_awaiting_judge → verify issue status updated (RACE FIX verified)
8. Cleanup
"""
from coordinator.__main__ import poll_once
from coordinator.orchestrator import (
DEBATER_NAMES,
_advance_awaiting_debaters,
_advance_awaiting_judge,
_start_round,
)
from coordinator.queue import DebateQueue
from coordinator.state import SeenState
logger = logging.getLogger("test.integration")
issue_id = None
try:
# --- 1. Create scratch issue ---
issue = client.create_issue(
title="[DEBATE-SMOKE-TEST] Integration test — auto-delete",
description=(
"This issue is created by the coordinator integration test. "
"It will be cleaned up automatically.\n\n"
"Commit: https://git.wylab.me/multica/coordinator/commit/test000"
),
)
issue_id = issue["id"]
logger.info("created scratch issue %s", issue_id)
# Set to in_review so the watcher picks it up
client.update_issue_status(issue_id, "in_review")
# --- 2. poll_once → enqueue round ---
state = SeenState.load(tmp_path / "seen.json")
queue = DebateQueue.load(tmp_path / "queue.json")
poll_once(client, state, queue, logger)
assert queue.pending(), "Expected round enqueued after poll_once"
round_ = queue.pending()[0]
assert round_.issue_id == issue_id
assert round_.status == "pending"
# --- 3. _start_round → debater comment posted ---
_start_round(round_, client, queue, cfg, logger)
assert round_.status == "running", f"Expected running, got {round_.status}"
assert round_.phase == "awaiting_debaters", f"Expected awaiting_debaters, got {round_.phase}"
assert round_.coordinator_comment_id, "coordinator_comment_id must be set"
# Verify comment appeared on the real issue
real_comments = client.list_comments(issue_id)
coord_comment = next(
(c for c in real_comments if c.get("id") == round_.coordinator_comment_id),
None,
)
assert coord_comment is not None, "Coordinator comment not found in issue comments"
for name in DEBATER_NAMES:
assert name in coord_comment["content"], f"{name} not mentioned in coordinator comment"
# --- 4. Post simulated debater responses ---
_post_fake_debater_replies(client, issue_id)
# --- 5. _advance_awaiting_debaters → judge comment posted ---
_advance_awaiting_debaters(round_, client, queue, cfg, logger)
assert round_.phase == "awaiting_judge", f"Expected awaiting_judge, got {round_.phase}"
assert round_.judge_comment_id, "judge_comment_id must be set"
real_comments = client.list_comments(issue_id)
judge_comment = next(
(c for c in real_comments if c.get("id") == round_.judge_comment_id),
None,
)
assert judge_comment is not None, "Judge comment not found in issue comments"
assert "Verdict requested" in judge_comment["content"]
# --- 6. Post simulated judge verdict ---
_post_fake_judge_verdict(client, issue_id, verdict="ACCEPT")
# --- 7. _advance_awaiting_judge → issue status updated ---
_advance_awaiting_judge(round_, client, queue, cfg, logger)
assert round_.status == "done", f"Expected done, got {round_.status}"
assert round_.phase == "accepted", f"Expected accepted, got {round_.phase}"
# Verify issue status via real API (not just in-memory)
refreshed = client.get_issue(issue_id)
assert refreshed.get("status") == "done", (
f"Issue status should be 'done' after ACCEPT verdict, "
f"got {refreshed.get('status')!r}"
)
# RACE FIX: after _advance_awaiting_judge completes, issue is no longer in_review,
# so poll_once cannot double-enqueue this round.
# Note: other in_review issues in the workspace may also get enqueued — that's OK.
poll_once(client, state, queue, logger)
our_rounds = [r for r in queue.rounds if r.issue_id == issue_id]
assert len(our_rounds) == 1, (
f"poll_once created a second round for issue {issue_id} (double-enqueue)"
)
logger.info("VERDICT OK — full lifecycle passed")
finally:
# --- 8. Cleanup ---
if issue_id:
try:
client.update_issue_status(issue_id, "done")
logger.info("cleanup: issue %s set to done", issue_id)
except Exception as exc:
logger.warning("cleanup failed for issue %s: %s", issue_id, exc)
+736
View File
@@ -0,0 +1,736 @@
"""Unit tests for coordinator.orchestrator (WYL-45)."""
from __future__ import annotations
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any
import pytest
from coordinator.orchestrator import (
DEBATER_NAMES,
JUDGE_NAME,
_advance_awaiting_debaters,
_advance_awaiting_judge,
_collect_debater_replies,
_collect_judge_reply,
_find_commit_url,
_parse_verdict,
_start_round,
orchestrate_pending,
)
from coordinator.queue import DebateQueue, Round
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _utcnow() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _past(seconds: float) -> str:
t = datetime.now(timezone.utc) - timedelta(seconds=seconds)
return t.strftime("%Y-%m-%dT%H:%M:%SZ")
@dataclass
class FakeConfig:
server_url: str = "http://fake"
workspace_id: str = "ws-1"
token: str = "tok"
poll_interval_s: int = 30
round_timeout_s: int = 600
max_concurrent_rounds: int = 3
seen_file: Path = Path("/tmp/seen.json")
queue_file: Path = Path("/tmp/queue.json")
log_file: Path = Path("/tmp/coordinator.log")
@dataclass
class FakeClient:
"""Controllable stand-in for MulticaClient."""
issue: dict[str, Any] = field(default_factory=lambda: {
"id": "issue-1",
"title": "Test issue",
"description": "Test description. Commit: https://git.example.com/commit/abc123",
"status": "in_review",
})
comments: list[dict[str, Any]] = field(default_factory=list)
agents: list[dict[str, Any]] = field(default_factory=list)
posted_comments: list[str] = field(default_factory=list)
status_updates: list[tuple[str, str]] = field(default_factory=list)
post_comment_returns_id: str = "comment-coord"
def get_issue(self, issue_id: str) -> dict[str, Any]:
return self.issue
def list_comments(self, issue_id: str) -> list[dict[str, Any]]:
return list(self.comments)
def post_comment(self, issue_id: str, content: str) -> dict[str, Any]:
self.posted_comments.append(content)
cid = self.post_comment_returns_id
# Advance the ID for subsequent calls
self.post_comment_returns_id = "comment-" + str(len(self.posted_comments))
ts = _utcnow()
new_comment = {"id": cid, "content": content, "created_at": ts,
"author_id": "coord", "author_type": "member"}
self.comments.append(new_comment)
return new_comment
def list_agents(self) -> list[dict[str, Any]]:
return list(self.agents)
def find_agents_by_name(self, names) -> dict[str, str]:
wanted = set(names)
return {a["name"]: a["id"] for a in self.agents if a["name"] in wanted}
def update_issue_status(self, issue_id: str, status: str) -> dict[str, Any]:
self.status_updates.append((issue_id, status))
self.issue["status"] = status
return self.issue
def update_issue(self, issue_id: str, **fields: Any) -> dict[str, Any]:
self.issue.update(fields)
return self.issue
def _make_round(
*,
status: str = "pending",
phase: str = "convened",
phase_entered_at: str = "",
coordinator_comment_id: str = "",
judge_comment_id: str = "",
) -> Round:
return Round(
round_id=str(uuid.uuid4()),
issue_id="issue-1",
identifier="WYL-99",
title="Test issue",
enqueued_at=_utcnow(),
status=status,
phase=phase,
phase_entered_at=phase_entered_at or _utcnow(),
coordinator_comment_id=coordinator_comment_id,
judge_comment_id=judge_comment_id,
)
def _make_queue(tmp_path: Path, *rounds: Round) -> DebateQueue:
q = DebateQueue.load(tmp_path / "queue.json")
for r in rounds:
q.rounds.append(r)
q.save()
return q
_logger = logging.getLogger("test.orchestrator")
# ---------------------------------------------------------------------------
# _parse_verdict
# ---------------------------------------------------------------------------
def test_parse_verdict_accept():
assert _parse_verdict("VERDICT: ACCEPT\n\nGreat work.") == "ACCEPT"
def test_parse_verdict_reject():
assert _parse_verdict("VERDICT: REJECT\n\nMissing tests.") == "REJECT"
def test_parse_verdict_case_insensitive():
assert _parse_verdict("verdict: accept") == "ACCEPT"
def test_parse_verdict_none_when_absent():
assert _parse_verdict("No verdict here.") is None
# ---------------------------------------------------------------------------
# _find_commit_url
# ---------------------------------------------------------------------------
def test_find_commit_url_found():
comments = [
{"content": "Watcher implemented. Commit: https://git.example.com/multica/foo/commit/abc123"},
]
assert _find_commit_url(comments) == "https://git.example.com/multica/foo/commit/abc123"
def test_find_commit_url_returns_last():
comments = [
{"content": "Commit: https://git.example.com/multica/foo/commit/aaa111"},
{"content": "Follow-up commit: https://git.example.com/multica/foo/commit/bbb222"},
]
assert _find_commit_url(comments) == "https://git.example.com/multica/foo/commit/bbb222"
def test_find_commit_url_empty_when_absent():
assert _find_commit_url([{"content": "No link here."}]) == ""
# ---------------------------------------------------------------------------
# _collect_debater_replies
# ---------------------------------------------------------------------------
def _make_comment(
cid: str,
content: str,
author_id: str = "anon",
ts: str = "",
) -> dict[str, Any]:
return {
"id": cid,
"content": content,
"author_id": author_id,
"author_type": "agent",
"created_at": ts or _utcnow(),
}
def test_collect_replies_by_agent_id():
cutoff_ts = _past(10)
agent_map = {"Senior Developer": "agent-sd"}
comments = [
{"id": "coord", "content": "Debate opened", "author_id": "coord",
"author_type": "member", "created_at": cutoff_ts},
_make_comment("c1", "Evidence here.", author_id="agent-sd",
ts=_utcnow()),
]
replies = _collect_debater_replies(comments, agent_map, "coord")
assert "Senior Developer" in replies
def test_collect_replies_content_fallback():
"""Content-based marker accepted when agent ID not in map."""
cutoff_ts = _past(10)
agent_map = {}
comments = [
{"id": "coord", "content": "Debate opened", "author_id": "coord",
"author_type": "member", "created_at": cutoff_ts},
_make_comment(
"c1",
"[Senior Developer response]: grep output here.",
author_id="someone-else",
ts=_utcnow(),
),
]
replies = _collect_debater_replies(comments, agent_map, "coord")
assert "Senior Developer" in replies
def test_collect_replies_skips_before_cutoff():
"""Comments before coordinator's mention are ignored."""
early = _past(20)
cutoff_ts = _past(10)
agent_map = {"Senior Developer": "agent-sd"}
comments = [
_make_comment("early", "Early reply", author_id="agent-sd", ts=early),
{"id": "coord", "content": "Debate opened", "author_id": "coord",
"author_type": "member", "created_at": cutoff_ts},
]
replies = _collect_debater_replies(comments, agent_map, "coord")
assert "Senior Developer" not in replies
def test_collect_replies_no_duplicate_per_debater():
cutoff_ts = _past(10)
agent_map = {"Senior Developer": "agent-sd"}
comments = [
{"id": "coord", "content": "x", "author_id": "coord",
"author_type": "member", "created_at": cutoff_ts},
_make_comment("c1", "First reply", author_id="agent-sd", ts=_utcnow()),
_make_comment("c2", "Second reply", author_id="agent-sd", ts=_utcnow()),
]
replies = _collect_debater_replies(comments, agent_map, "coord")
assert replies["Senior Developer"] == "First reply"
# ---------------------------------------------------------------------------
# _collect_judge_reply
# ---------------------------------------------------------------------------
def test_collect_judge_reply_by_agent_id():
cutoff_ts = _past(10)
comments = [
{"id": "jc", "content": "Verdict requested", "author_id": "coord",
"created_at": cutoff_ts},
_make_comment("j1", "VERDICT: ACCEPT\n\nGreat.", author_id="agent-judge",
ts=_utcnow()),
]
result = _collect_judge_reply(comments, "agent-judge", "jc")
assert result is not None
assert "ACCEPT" in result
def test_collect_judge_reply_content_fallback():
cutoff_ts = _past(10)
comments = [
{"id": "jc", "content": "x", "author_id": "coord", "created_at": cutoff_ts},
_make_comment("j1", "VERDICT: REJECT\n\nMissing tests.", author_id="anyone",
ts=_utcnow()),
]
result = _collect_judge_reply(comments, "", "jc")
assert result is not None
assert "REJECT" in result
def test_collect_judge_reply_none_when_absent():
cutoff_ts = _past(10)
comments = [
{"id": "jc", "content": "x", "author_id": "coord", "created_at": cutoff_ts},
]
assert _collect_judge_reply(comments, "agent-judge", "jc") is None
# ---------------------------------------------------------------------------
# _start_round
# ---------------------------------------------------------------------------
def test_start_round_marks_running_before_post(tmp_path):
"""Status must be 'running' before any API call (early guard against double-enqueue)."""
call_order: list[str] = []
class TrackingQueue(DebateQueue):
def update_status(self, round_id, status):
call_order.append(f"update_status:{status}")
super().update_status(round_id, status)
class TrackingClient(FakeClient):
def post_comment(self, issue_id, content):
call_order.append("post_comment")
return super().post_comment(issue_id, content)
r = _make_round()
q = TrackingQueue.load(tmp_path / "queue.json")
q.rounds.append(r)
q.save()
_start_round(r, TrackingClient(), q, FakeConfig(), _logger)
# update_status("running") must precede post_comment
running_idx = next(i for i, v in enumerate(call_order) if v == "update_status:running")
post_idx = next(i for i, v in enumerate(call_order) if v == "post_comment")
assert running_idx < post_idx
def test_start_round_posts_debater_comment(tmp_path):
r = _make_round()
q = _make_queue(tmp_path, r)
client = FakeClient()
_start_round(r, client, q, FakeConfig(), _logger)
assert len(client.posted_comments) == 1
comment = client.posted_comments[0]
assert "Debate round opened" in comment
for name in DEBATER_NAMES:
assert name in comment
def test_start_round_sets_phase_awaiting_debaters(tmp_path):
r = _make_round()
q = _make_queue(tmp_path, r)
_start_round(r, FakeClient(), q, FakeConfig(), _logger)
assert r.phase == "awaiting_debaters"
assert r.status == "running"
assert r.coordinator_comment_id # must be set
def test_start_round_error_on_api_failure(tmp_path):
class BrokenClient(FakeClient):
def post_comment(self, issue_id, content):
raise RuntimeError("API down")
r = _make_round()
q = _make_queue(tmp_path, r)
_start_round(r, BrokenClient(), q, FakeConfig(), _logger)
assert r.status == "error"
# ---------------------------------------------------------------------------
# _advance_awaiting_debaters — debater replies
# ---------------------------------------------------------------------------
def _debater_round(tmp_path: Path) -> tuple[Round, DebateQueue, FakeClient]:
"""Return a round already in awaiting_debaters phase."""
r = _make_round(
status="running",
phase="awaiting_debaters",
phase_entered_at=_utcnow(),
coordinator_comment_id="coord-comment",
)
q = _make_queue(tmp_path, r)
client = FakeClient()
# Seed coordinator comment in the comment list
client.comments.append({
"id": "coord-comment",
"content": "Debate opened",
"author_id": "coord",
"author_type": "member",
"created_at": _past(5),
})
# Seed debater agents
client.agents = [{"name": name, "id": f"agent-{i}"} for i, name in enumerate(DEBATER_NAMES)]
client.agents.append({"name": JUDGE_NAME, "id": "agent-judge"})
return r, q, client
def test_advance_debaters_waits_when_not_all_replied(tmp_path):
r, q, client = _debater_round(tmp_path)
_advance_awaiting_debaters(r, client, q, FakeConfig(), _logger)
# Not enough replies → no judge comment posted
assert r.phase == "awaiting_debaters"
assert all("Verdict requested" not in c for c in client.posted_comments)
def test_advance_debaters_proceeds_when_all_replied(tmp_path):
r, q, client = _debater_round(tmp_path)
# Add a reply from each debater
for i, name in enumerate(DEBATER_NAMES):
client.comments.append({
"id": f"reply-{i}",
"content": f"[{name} response]: Evidence here.",
"author_id": f"agent-{i}",
"author_type": "agent",
"created_at": _utcnow(),
})
_advance_awaiting_debaters(r, client, q, FakeConfig(), _logger)
assert r.phase == "awaiting_judge"
assert r.judge_comment_id
judge_comment = client.posted_comments[-1]
assert "Verdict requested" in judge_comment
for name in DEBATER_NAMES:
assert name in judge_comment
def test_advance_debaters_timeout_with_partial_evidence(tmp_path):
"""After timeout, proceed with partial evidence (missing debaters noted in transcript)."""
r, q, client = _debater_round(tmp_path)
# Only one debater replies
client.comments.append({
"id": "r1",
"content": f"[{DEBATER_NAMES[0]} response]: My evidence.",
"author_id": "agent-0",
"author_type": "agent",
"created_at": _utcnow(),
})
# Simulate timeout by setting phase_entered_at far in the past
r.phase_entered_at = _past(700)
q.save()
cfg = FakeConfig()
_advance_awaiting_debaters(r, client, q, cfg, _logger)
assert r.phase == "awaiting_judge"
judge_comment = client.posted_comments[-1]
assert "timed out" in judge_comment or "no response" in judge_comment
# ---------------------------------------------------------------------------
# _advance_awaiting_judge — verdict handling
# ---------------------------------------------------------------------------
def _judge_round(tmp_path: Path, *, phase_entered_at: str = "") -> tuple[Round, DebateQueue, FakeClient]:
r = _make_round(
status="running",
phase="awaiting_judge",
phase_entered_at=phase_entered_at or _utcnow(),
coordinator_comment_id="coord-comment",
judge_comment_id="judge-comment",
)
q = _make_queue(tmp_path, r)
client = FakeClient()
client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}]
client.comments = [
{"id": "judge-comment", "content": "Verdict requested",
"author_id": "coord", "author_type": "member", "created_at": _past(5)},
]
return r, q, client
def test_advance_judge_waits_when_no_verdict(tmp_path):
r, q, client = _judge_round(tmp_path)
_advance_awaiting_judge(r, client, q, FakeConfig(), _logger)
assert r.phase == "awaiting_judge"
assert r.status == "running"
assert not client.status_updates
def test_advance_judge_accept_updates_issue_status_to_done(tmp_path):
r, q, client = _judge_round(tmp_path)
client.comments.append({
"id": "verdict1",
"content": "VERDICT: ACCEPT\n\nLooks good.",
"author_id": "agent-judge",
"author_type": "agent",
"created_at": _utcnow(),
})
_advance_awaiting_judge(r, client, q, FakeConfig(), _logger)
assert r.status == "done"
assert r.phase == "accepted"
assert client.status_updates == [("issue-1", "done")]
def test_advance_judge_reject_updates_issue_status_to_in_progress(tmp_path):
r, q, client = _judge_round(tmp_path)
client.comments.append({
"id": "verdict1",
"content": "VERDICT: REJECT\n\nMissing tests.",
"author_id": "agent-judge",
"author_type": "agent",
"created_at": _utcnow(),
})
_advance_awaiting_judge(r, client, q, FakeConfig(), _logger)
assert r.status == "done"
assert r.phase == "rejected"
assert client.status_updates == [("issue-1", "in_progress")]
def test_advance_judge_timeout_marks_error(tmp_path):
"""Judge timeout: round → error, issue left in_review for human escalation."""
r, q, client = _judge_round(tmp_path, phase_entered_at=_past(700))
_advance_awaiting_judge(r, client, q, FakeConfig(), _logger)
assert r.status == "error"
assert r.phase == "error"
# Issue status must NOT be changed — leave in_review for humans
assert not client.status_updates
# ---------------------------------------------------------------------------
# Race condition: issue status before round done (CRITICAL)
# ---------------------------------------------------------------------------
def test_issue_status_updated_before_round_marked_done(tmp_path):
"""RACE FIX: client.update_issue_status MUST precede queue.update_status('done')."""
call_order: list[str] = []
class TrackingClient(FakeClient):
def update_issue_status(self, issue_id, status):
call_order.append(f"issue:{status}")
return super().update_issue_status(issue_id, status)
class TrackingQueue(DebateQueue):
def update_status(self, round_id, status):
call_order.append(f"round:{status}")
super().update_status(round_id, status)
r = _make_round(
status="running",
phase="awaiting_judge",
phase_entered_at=_utcnow(),
coordinator_comment_id="coord-comment",
judge_comment_id="judge-comment",
)
q = TrackingQueue.load(tmp_path / "queue.json")
q.rounds.append(r)
q.save()
client = TrackingClient()
client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}]
client.comments = [
{"id": "judge-comment", "content": "x", "author_id": "coord",
"created_at": _past(5)},
{"id": "v1", "content": "VERDICT: ACCEPT\n\nAll good.",
"author_id": "agent-judge", "author_type": "agent", "created_at": _utcnow()},
]
_advance_awaiting_judge(r, client, q, FakeConfig(), _logger)
# Both calls must have happened
assert "issue:done" in call_order
assert "round:done" in call_order
# Issue status MUST precede round-done
assert call_order.index("issue:done") < call_order.index("round:done"), (
f"Expected issue:done before round:done, got order: {call_order}"
)
def test_round_not_marked_done_if_issue_update_fails(tmp_path):
"""If issue status update fails, don't mark round done (retry next cycle)."""
class FailingClient(FakeClient):
def update_issue_status(self, issue_id, status):
raise RuntimeError("network error")
r = _make_round(
status="running",
phase="awaiting_judge",
phase_entered_at=_utcnow(),
coordinator_comment_id="coord-comment",
judge_comment_id="judge-comment",
)
q = _make_queue(tmp_path, r)
client = FailingClient()
client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}]
client.comments = [
{"id": "judge-comment", "content": "x", "author_id": "coord", "created_at": _past(5)},
{"id": "v1", "content": "VERDICT: ACCEPT", "author_id": "agent-judge",
"created_at": _utcnow()},
]
_advance_awaiting_judge(r, client, q, FakeConfig(), _logger)
# Round must remain running for retry
assert r.status == "running"
# ---------------------------------------------------------------------------
# Phase transitions: full cycle
# ---------------------------------------------------------------------------
def test_full_phase_cycle(tmp_path):
"""pending → running/awaiting_debaters → awaiting_judge → accepted/done."""
r = _make_round()
q = _make_queue(tmp_path, r)
# Step 1: start_round
client = FakeClient()
client.agents = [
*[{"name": n, "id": f"agent-{i}"} for i, n in enumerate(DEBATER_NAMES)],
{"name": JUDGE_NAME, "id": "agent-judge"},
]
_start_round(r, client, q, FakeConfig(), _logger)
assert r.status == "running"
assert r.phase == "awaiting_debaters"
coord_cid = r.coordinator_comment_id
# Step 2: debaters reply
for i, name in enumerate(DEBATER_NAMES):
client.comments.append({
"id": f"reply-{i}", "content": f"[{name} response]: Evidence.",
"author_id": f"agent-{i}", "author_type": "agent", "created_at": _utcnow(),
})
_advance_awaiting_debaters(r, client, q, FakeConfig(), _logger)
assert r.phase == "awaiting_judge"
judge_cid = r.judge_comment_id
# Step 3: judge replies
client.comments.append({
"id": "verdict1", "content": "VERDICT: ACCEPT\n\nShipped.",
"author_id": "agent-judge", "author_type": "agent", "created_at": _utcnow(),
})
_advance_awaiting_judge(r, client, q, FakeConfig(), _logger)
assert r.phase == "accepted"
assert r.status == "done"
assert client.issue["status"] == "done"
# ---------------------------------------------------------------------------
# Restart resume: in-flight rounds resume from correct phase
# ---------------------------------------------------------------------------
def test_restart_resumes_awaiting_debaters(tmp_path):
"""On restart, a running/awaiting_debaters round picks up without re-posting comment."""
r = _make_round(
status="running",
phase="awaiting_debaters",
phase_entered_at=_utcnow(),
coordinator_comment_id="existing-coord-comment",
)
q = _make_queue(tmp_path, r)
client = FakeClient()
client.comments = [
{"id": "existing-coord-comment", "content": "Debate opened",
"author_id": "coord", "created_at": _past(60)},
]
orchestrate_pending(q, FakeConfig(), _logger, client=client)
# Must NOT post another debater comment
assert all("Debate round opened" not in c for c in client.posted_comments)
# Phase should still be awaiting_debaters (no replies)
assert r.phase == "awaiting_debaters"
def test_restart_resumes_awaiting_judge(tmp_path):
"""On restart, a running/awaiting_judge round resumes without re-posting judge comment."""
r = _make_round(
status="running",
phase="awaiting_judge",
phase_entered_at=_utcnow(),
coordinator_comment_id="coord-c",
judge_comment_id="judge-c",
)
q = _make_queue(tmp_path, r)
client = FakeClient()
client.comments = [
{"id": "judge-c", "content": "Verdict requested",
"author_id": "coord", "created_at": _past(30)},
]
orchestrate_pending(q, FakeConfig(), _logger, client=client)
# Must NOT post another judge comment
assert all("Verdict requested" not in c for c in client.posted_comments)
assert r.phase == "awaiting_judge"
# ---------------------------------------------------------------------------
# orchestrate_pending: pending rounds are started, running rounds advanced
# ---------------------------------------------------------------------------
def test_orchestrate_pending_starts_pending(tmp_path):
r = _make_round()
q = _make_queue(tmp_path, r)
client = FakeClient()
orchestrate_pending(q, FakeConfig(), _logger, client=client)
assert r.status == "running"
assert any("Debate round opened" in c for c in client.posted_comments)
def test_orchestrate_pending_advances_running(tmp_path):
"""Running/awaiting_judge round with a verdict is completed."""
r = _make_round(
status="running",
phase="awaiting_judge",
phase_entered_at=_utcnow(),
coordinator_comment_id="coord-c",
judge_comment_id="judge-c",
)
q = _make_queue(tmp_path, r)
client = FakeClient()
client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}]
client.comments = [
{"id": "judge-c", "content": "x", "author_id": "coord", "created_at": _past(10)},
{"id": "v1", "content": "VERDICT: REJECT\n\nNeeds work.",
"author_id": "agent-judge", "created_at": _utcnow()},
]
orchestrate_pending(q, FakeConfig(), _logger, client=client)
assert r.status == "done"
assert r.phase == "rejected"