Replace hand-written debater pipeline with CEK judge-with-debate

The prior pipeline (4 hand-written debater prompts + 1 judge with my prompt
template) kept missing scope drift because every prompt was mine and the
reviewers were all on the same model tier with correlated priors.

This commit replaces the whole review step with CEK's judge-with-debate
pattern translated to multica-native execution:

  pending → awaiting_rubric (meta-judge writes YAML spec from issue alone)
          → awaiting_judges (3 judges on 3 copilot models score independently)
          → consensus check (overall within 0.5, criteria within 1.0)
          → accept or reject OR awaiting_debate rounds up to 3
          → error on malformed YAML or cap hit

Per higher-management direction, we do not deal with a model that cannot
produce YAML: malformed rubric or all-unparseable judge reports fail the
round immediately (no retries, no fallback to hand-written prompts).

The anchor retrigger on REJECT (WYL-51 behaviour) is preserved verbatim.

Agent prompts for meta-judge and the 3 judges come from the CEK agents
themselves (Meta-Judge / Judge-GPT / Judge-Claude / Judge-Gemini) whose
`instructions` field is the CEK meta-judge.md / judge.md files uploaded
byte-for-byte. No prompts are authored in this coordinator's source.

Adds pyyaml dependency.

- src/coordinator/orchestrator.py: rewritten for the new phase machine
- src/coordinator/queue.py: Round extended with rubric_yaml, judge_report_comment_ids, debate_round
- tests/test_orchestrator.py: 40 tests for new pipeline (helpers, parsers, consensus math, phase handlers, race fix, retrigger)
- tests/test_integration.py: removed (tested old debater pipeline)
- pyproject.toml: adds pyyaml

Tests: 67 passed in 0.20s (40 orchestrator + 15 queue + 7 watcher + 5 other).
This commit is contained in:
2026-04-18 22:00:44 +02:00
committed by claude
parent 840b3c388c
commit f88255096e
5 changed files with 1295 additions and 1746 deletions
+3 -2
View File
@@ -5,9 +5,10 @@ description = "Tool-MAD middle-management layer for multica. Watches for in_revi
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
# v1 intentionally depends only on stdlib + requests. No async, no frameworks.
# If this list grows past 3 items before a working v1 is shipped, something is wrong.
# v1 intentionally depends only on stdlib + requests + pyyaml. No async, no frameworks.
# If this list grows past 4 items before a working v1 is shipped, something is wrong.
"requests>=2.32",
"pyyaml>=6.0",
]
[project.scripts]
File diff suppressed because it is too large Load Diff
+28 -18
View File
@@ -6,23 +6,27 @@ Schema (~/.coordinator/queue.json):
{
"rounds": [
{
"round_id": "<uuid>",
"issue_id": "<uuid>",
"identifier": "WYL-42",
"title": "...",
"enqueued_at": "<iso8601>",
"status": "pending",
"phase": "convened",
"phase_entered_at": "",
"coordinator_comment_id": "",
"judge_comment_id": ""
"round_id": "<uuid>",
"issue_id": "<uuid>",
"identifier": "WYL-42",
"title": "...",
"enqueued_at": "<iso8601>",
"status": "pending",
"phase": "convened",
"phase_entered_at": "",
"meta_judge_comment_id": "",
"rubric_yaml": "",
"judge_mention_comment_id":"",
"judge_report_comment_ids":{},
"debate_round": 0,
"retrigger_comment_id": ""
},
...
]
}
status: pending | running | done | error
phase: convened | awaiting_debaters | awaiting_judge | accepted | rejected | error
phase: convened | awaiting_rubric | awaiting_judges | awaiting_debate | accepted | rejected | error
Only the watcher appends; only the orchestrator updates status/phase.
Completed/errored rounds stay in the file so it stays fully auditable.
@@ -52,11 +56,14 @@ class Round:
title: str
enqueued_at: str
status: str = "pending" # pending | running | done | error
phase: str = "convened" # convened | awaiting_debaters | awaiting_judge | accepted | rejected | error
phase_entered_at: str = "" # ISO8601 when current phase was entered (for timeout)
coordinator_comment_id: str = "" # ID of debater-mention comment posted by coordinator
judge_comment_id: str = "" # ID of judge-mention comment posted by coordinator
retrigger_comment_id: str = "" # ID of retrigger comment posted after REJECT (audit)
phase: str = "convened" # convened | awaiting_rubric | awaiting_judges | awaiting_debate | accepted | rejected | error
phase_entered_at: str = "" # ISO8601 when current phase was entered (for timeout)
meta_judge_comment_id: str = "" # coordinator comment mentioning meta-judge
rubric_yaml: str = "" # verbatim YAML returned by meta-judge
judge_mention_comment_id: str = "" # coordinator comment mentioning the 3 judges (or the debate-round comment on round N>0)
judge_report_comment_ids: dict[str, str] = field(default_factory=dict) # {judge_name: comment_id}
debate_round: int = 0 # 0 = initial round; 1..3 = debate rounds
retrigger_comment_id: str = "" # ID of retrigger comment posted after REJECT (audit)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@@ -72,8 +79,11 @@ class Round:
status=d.get("status", "pending"),
phase=d.get("phase", "convened"),
phase_entered_at=d.get("phase_entered_at", ""),
coordinator_comment_id=d.get("coordinator_comment_id", ""),
judge_comment_id=d.get("judge_comment_id", ""),
meta_judge_comment_id=d.get("meta_judge_comment_id", ""),
rubric_yaml=d.get("rubric_yaml", ""),
judge_mention_comment_id=d.get("judge_mention_comment_id", ""),
judge_report_comment_ids=dict(d.get("judge_report_comment_ids", {}) or {}),
debate_round=int(d.get("debate_round", 0)),
retrigger_comment_id=d.get("retrigger_comment_id", ""),
)
-354
View File
@@ -1,354 +0,0 @@
"""Integration tests for the debate round orchestration.
These tests hit the REAL multica API. They require the coordinator env vars:
COORDINATOR_SERVER_URL e.g. https://multica.example.com
COORDINATOR_WORKSPACE_ID
COORDINATOR_TOKEN
Run with:
python -m pytest tests/test_integration.py -m integration -v
The test creates a scratch issue, exercises the full orchestration lifecycle
with simulated debater/judge responses (posted via the coordinator's own token
using the content-based fallback detector), verifies the result, then cleans up.
"""
from __future__ import annotations
import html
import logging
import os
import time
from pathlib import Path
import pytest
pytestmark = pytest.mark.integration
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def cfg():
"""Load Config from env. Skip module if vars missing."""
required = [
"COORDINATOR_SERVER_URL",
"COORDINATOR_WORKSPACE_ID",
"COORDINATOR_TOKEN",
]
missing = [k for k in required if not os.environ.get(k)]
if missing:
pytest.skip(
f"Integration test requires env vars: {', '.join(missing)}\n"
"Put them in ~/.coordinator/env or export them."
)
# Load env file if present (same as Config.from_env)
from coordinator.config import load_env_file
load_env_file()
from coordinator.config import Config
return Config.from_env()
@pytest.fixture(scope="module")
def client(cfg):
from coordinator.multica_client import MulticaClient
return MulticaClient(cfg.server_url, cfg.workspace_id, cfg.token)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _post_fake_debater_replies(client, issue_id: str):
"""Post content-marker comments for each debater via the coordinator token.
The orchestrator's content-based fallback detector accepts these as valid
debater evidence replies.
"""
from coordinator.orchestrator import DEBATER_NAMES
for name in DEBATER_NAMES:
client.post_comment(
issue_id,
f"[{name} response]: Integration-test evidence for {name}. "
"This comment simulates the agent's evidence analysis.",
)
def _post_fake_judge_verdict(client, issue_id: str, verdict: str = "ACCEPT"):
client.post_comment(
issue_id,
f"VERDICT: {verdict}\n\n"
"Integration test verdict. The implementation satisfies the acceptance criteria.",
)
# ---------------------------------------------------------------------------
# Integration test: full round lifecycle
# ---------------------------------------------------------------------------
@pytest.mark.integration
def test_full_debate_round_lifecycle(cfg, client, tmp_path):
"""End-to-end debate round lifecycle against the real API.
Steps:
1. Create scratch issue, set to in_review
2. poll_once → verify round enqueued
3. _start_round → verify coordinator comment posted on real issue
4. Post simulated debater responses (content-marker fallback)
5. _advance_awaiting_debaters → verify judge comment posted
6. Post simulated judge verdict
7. _advance_awaiting_judge → verify issue status updated (RACE FIX verified)
8. Cleanup
"""
from coordinator.__main__ import poll_once
from coordinator.orchestrator import (
DEBATER_NAMES,
_advance_awaiting_debaters,
_advance_awaiting_judge,
_start_round,
)
from coordinator.queue import DebateQueue
from coordinator.state import SeenState
logger = logging.getLogger("test.integration")
issue_id = None
try:
# --- 1. Create scratch issue ---
issue = client.create_issue(
title="[DEBATE-SMOKE-TEST] Integration test — auto-delete",
description=(
"This issue is created by the coordinator integration test. "
"It will be cleaned up automatically.\n\n"
"Commit: https://git.wylab.me/multica/coordinator/commit/test000"
),
)
issue_id = issue["id"]
logger.info("created scratch issue %s", issue_id)
# Set to in_review so the watcher picks it up
client.update_issue_status(issue_id, "in_review")
# --- 2. poll_once → enqueue round ---
state = SeenState.load(tmp_path / "seen.json")
queue = DebateQueue.load(tmp_path / "queue.json")
poll_once(client, state, queue, logger)
assert queue.pending(), "Expected round enqueued after poll_once"
round_ = queue.pending()[0]
assert round_.issue_id == issue_id
assert round_.status == "pending"
# --- 3. _start_round → debater comment posted ---
_start_round(round_, client, queue, cfg, logger)
assert round_.status == "running", f"Expected running, got {round_.status}"
assert round_.phase == "awaiting_debaters", f"Expected awaiting_debaters, got {round_.phase}"
assert round_.coordinator_comment_id, "coordinator_comment_id must be set"
# Verify comment appeared on the real issue
real_comments = client.list_comments(issue_id)
coord_comment = next(
(c for c in real_comments if c.get("id") == round_.coordinator_comment_id),
None,
)
assert coord_comment is not None, "Coordinator comment not found in issue comments"
for name in DEBATER_NAMES:
assert name in coord_comment["content"], f"{name} not mentioned in coordinator comment"
# --- 4. Post simulated debater responses ---
_post_fake_debater_replies(client, issue_id)
# --- 5. _advance_awaiting_debaters → judge comment posted ---
_advance_awaiting_debaters(round_, client, queue, cfg, logger)
assert round_.phase == "awaiting_judge", f"Expected awaiting_judge, got {round_.phase}"
assert round_.judge_comment_id, "judge_comment_id must be set"
real_comments = client.list_comments(issue_id)
judge_comment = next(
(c for c in real_comments if c.get("id") == round_.judge_comment_id),
None,
)
assert judge_comment is not None, "Judge comment not found in issue comments"
assert "Verdict requested" in judge_comment["content"]
# --- 6. Post simulated judge verdict ---
_post_fake_judge_verdict(client, issue_id, verdict="ACCEPT")
# --- 7. _advance_awaiting_judge → issue status updated ---
_advance_awaiting_judge(round_, client, queue, cfg, logger)
assert round_.status == "done", f"Expected done, got {round_.status}"
assert round_.phase == "accepted", f"Expected accepted, got {round_.phase}"
# Verify issue status via real API (not just in-memory)
refreshed = client.get_issue(issue_id)
assert refreshed.get("status") == "done", (
f"Issue status should be 'done' after ACCEPT verdict, "
f"got {refreshed.get('status')!r}"
)
# RACE FIX: after _advance_awaiting_judge completes, issue is no longer in_review,
# so poll_once cannot double-enqueue this round.
# Note: other in_review issues in the workspace may also get enqueued — that's OK.
poll_once(client, state, queue, logger)
our_rounds = [r for r in queue.rounds if r.issue_id == issue_id]
assert len(our_rounds) == 1, (
f"poll_once created a second round for issue {issue_id} (double-enqueue)"
)
logger.info("VERDICT OK — full lifecycle passed")
finally:
# --- 8. Cleanup ---
if issue_id:
try:
client.update_issue_status(issue_id, "done")
logger.info("cleanup: issue %s set to done", issue_id)
except Exception as exc:
logger.warning("cleanup failed for issue %s: %s", issue_id, exc)
# ---------------------------------------------------------------------------
# Integration test: REJECT retrigger + anchor (WYL-51)
# ---------------------------------------------------------------------------
@pytest.mark.integration
def test_retrigger_on_reject_end_to_end(cfg, client, tmp_path):
"""End-to-end test: REJECT verdict triggers an anchor retrigger comment.
Steps:
1. Create scratch issue with multi-paragraph description + an agent assignee
2. Manually enqueue a Round at phase=awaiting_judge
3. Inject a VERDICT: REJECT comment via the content-marker fallback
4. Run _advance_awaiting_judge
5. Assert: issue=in_progress, retrigger comment exists with @mention,
verbatim description, and no-drift instruction
6. Cleanup
"""
from coordinator.orchestrator import (
REWORK_INSTRUCTIONS,
_advance_awaiting_judge,
)
from coordinator.queue import DebateQueue, Round
logger = logging.getLogger("test.integration.retrigger")
issue_id = None
# Multi-paragraph description with unique sentinel values for verbatim check
DESCRIPTION = (
"Integration test anchor paragraph one. Sentinel: ANCHOR-INTEG-PARA1.\n\n"
"Integration test anchor paragraph two. Sentinel: ANCHOR-INTEG-PARA2.\n\n"
"Acceptance criteria: must contain ANCHOR-INTEG-CRITERIA."
)
try:
# --- 1. Create scratch issue ---
issue = client.create_issue(
title="[DEBATE-SMOKE-TEST] WYL-51 retrigger smoke — auto-delete",
description=DESCRIPTION,
status="in_review",
)
issue_id = issue["id"]
logger.info("created scratch issue %s", issue_id)
# Set an agent assignee so retrigger fires with a mention.
# Pick the first available agent in the workspace.
agents = client.list_agents()
assignee = agents[0] if agents else None
if assignee:
client.update_issue(
issue_id,
assignee_id=assignee["id"],
assignee_type="agent",
)
logger.info("set assignee to %s (%s)", assignee.get("name"), assignee["id"])
# --- 2. Manually enqueue a Round at phase=awaiting_judge ---
queue = DebateQueue.load(tmp_path / "queue.json")
round_ = queue.enqueue(issue_id, "WYL-51-SMOKE", "Retrigger smoke test")
queue.update_status(round_.round_id, "running")
queue.update_phase(
round_.round_id,
"awaiting_judge",
phase_entered_at=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
coordinator_comment_id="",
)
# --- 3. Inject VERDICT: REJECT comment ---
verdict_body = (
"VERDICT: REJECT\n\n"
"Integration test rejection. The smoke test intentionally rejects this "
"delivery to verify the retrigger mechanism fires correctly."
)
client.post_comment(issue_id, verdict_body)
# Set judge_comment_id to empty so content-fallback triggers
round_.judge_comment_id = ""
queue.save()
# --- 4. Run _advance_awaiting_judge ---
_advance_awaiting_judge(round_, client, queue, cfg, logger)
# --- 5. Assertions ---
assert round_.status == "done", f"Expected round done, got {round_.status}"
assert round_.phase == "rejected", f"Expected rejected, got {round_.phase}"
# (a) Issue status must be in_progress
refreshed = client.get_issue(issue_id)
assert refreshed.get("status") == "in_progress", (
f"Issue should be in_progress after REJECT, got {refreshed.get('status')!r}"
)
# (b) A retrigger comment must exist on the issue
real_comments = client.list_comments(issue_id)
retrigger_cid = round_.retrigger_comment_id
if assignee:
# Agent path: retrigger_comment_id must be set
assert retrigger_cid, "retrigger_comment_id must be persisted on the round"
retrigger_comment = next(
(c for c in real_comments if c.get("id") == retrigger_cid),
None,
)
assert retrigger_comment is not None, (
f"Retrigger comment {retrigger_cid} not found in issue comments"
)
# The API HTML-encodes some chars (e.g. " → &#34;, > → &gt;) in returned
# content. Decode before comparing so assertions match plain-string constants.
body = html.unescape(retrigger_comment["content"])
# (b) @-mention present
assert assignee["id"] in body, (
"Retrigger comment must contain the assignee agent ID"
)
# (c) Full verbatim description present
for line in DESCRIPTION.splitlines():
if line.strip():
assert line in body, (
f"Description line {line!r} not found verbatim in retrigger comment"
)
# (d) No-drift instruction present
assert REWORK_INSTRUCTIONS in body, (
"REWORK_INSTRUCTIONS must appear verbatim in retrigger comment"
)
logger.info(
"retrigger comment (first 400 chars):\n%s",
body[:400],
)
logger.info("VERDICT OK — retrigger end-to-end passed")
finally:
# --- 6. Cleanup ---
if issue_id:
try:
client.update_issue_status(issue_id, "done")
logger.info("cleanup: issue %s set to done", issue_id)
except Exception as exc:
logger.warning("cleanup failed for issue %s: %s", issue_id, exc)
+538 -834
View File
File diff suppressed because it is too large Load Diff