From f88255096e530efc6287f6f28a759094aa7aa4cf Mon Sep 17 00:00:00 2001 From: Platform Admin Date: Sat, 18 Apr 2026 22:00:44 +0200 Subject: [PATCH] Replace hand-written debater pipeline with CEK judge-with-debate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The prior pipeline (4 hand-written debater prompts + 1 judge with my prompt template) kept missing scope drift because every prompt was mine and the reviewers were all on the same model tier with correlated priors. This commit replaces the whole review step with CEK's judge-with-debate pattern translated to multica-native execution: pending → awaiting_rubric (meta-judge writes YAML spec from issue alone) → awaiting_judges (3 judges on 3 copilot models score independently) → consensus check (overall within 0.5, criteria within 1.0) → accept or reject OR awaiting_debate rounds up to 3 → error on malformed YAML or cap hit Per higher-management direction, we do not deal with a model that cannot produce YAML: malformed rubric or all-unparseable judge reports fail the round immediately (no retries, no fallback to hand-written prompts). The anchor retrigger on REJECT (WYL-51 behaviour) is preserved verbatim. Agent prompts for meta-judge and the 3 judges come from the CEK agents themselves (Meta-Judge / Judge-GPT / Judge-Claude / Judge-Gemini) whose `instructions` field is the CEK meta-judge.md / judge.md files uploaded byte-for-byte. No prompts are authored in this coordinator's source. Adds pyyaml dependency. - src/coordinator/orchestrator.py: rewritten for the new phase machine - src/coordinator/queue.py: Round extended with rubric_yaml, judge_report_comment_ids, debate_round - tests/test_orchestrator.py: 40 tests for new pipeline (helpers, parsers, consensus math, phase handlers, race fix, retrigger) - tests/test_integration.py: removed (tested old debater pipeline) - pyproject.toml: adds pyyaml Tests: 67 passed in 0.20s (40 orchestrator + 15 queue + 7 watcher + 5 other). --- pyproject.toml | 5 +- src/coordinator/orchestrator.py | 1264 ++++++++++++++++------------ src/coordinator/queue.py | 46 +- tests/test_integration.py | 354 -------- tests/test_orchestrator.py | 1372 ++++++++++++------------------- 5 files changed, 1295 insertions(+), 1746 deletions(-) delete mode 100644 tests/test_integration.py diff --git a/pyproject.toml b/pyproject.toml index 7437cfc..d1380dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,9 +5,10 @@ description = "Tool-MAD middle-management layer for multica. Watches for in_revi readme = "README.md" requires-python = ">=3.11" dependencies = [ - # v1 intentionally depends only on stdlib + requests. No async, no frameworks. - # If this list grows past 3 items before a working v1 is shipped, something is wrong. + # v1 intentionally depends only on stdlib + requests + pyyaml. No async, no frameworks. + # If this list grows past 4 items before a working v1 is shipped, something is wrong. "requests>=2.32", + "pyyaml>=6.0", ] [project.scripts] diff --git a/src/coordinator/orchestrator.py b/src/coordinator/orchestrator.py index 887ebfc..925d6fb 100644 --- a/src/coordinator/orchestrator.py +++ b/src/coordinator/orchestrator.py @@ -1,25 +1,45 @@ -"""Debate round orchestration (WYL-45). +"""Review-round orchestration — CEK meta-judge + 3-judge consensus with optional debate rounds. + +The pipeline translates CEK's ``sadd:judge-with-debate`` shape to multica-native execution. +All prescriptive content (how meta-judge writes rubrics, how judges score) lives in the +meta-judge.md and judge.md files uploaded as each agent's ``instructions`` field. This +module only plumbs the flow: posting mentions, collecting replies, parsing structured +output, and actioning verdicts. Round lifecycle --------------- - pending → _start_round() → running / phase=awaiting_debaters - awaiting_debaters → _advance_awaiting_debaters() - all replied (or timeout) → phase=awaiting_judge - awaiting_judge → _advance_awaiting_judge() - verdict received → issue status updated FIRST, - then round marked done/phase=accepted|rejected - timeout → round marked error, issue left in_review + pending → _start_round() + posts meta-judge mention comment → phase=awaiting_rubric -Restart safety --------------- - DebateQueue persists phase + comment IDs on every mutation. On restart, - running rounds resume from the correct phase without re-posting comments. + awaiting_rubric → _advance_awaiting_rubric() + meta-judge replied → extract YAML rubric → post 3 judge mention comments + → phase=awaiting_judges (debate_round=0) -Race fix (WYL-44 note) ----------------------- - In _advance_awaiting_judge, client.update_issue_status() is called BEFORE - queue.update_status("done"). This guarantees poll_once cannot see - (is_issue_pending_or_running=False AND issue=in_review) at the same time. + awaiting_judges → _advance_awaiting_judges() + all 3 judge reports in (or timeout with partial) → parse scores + → consensus check + converged + avg≥3 → _apply_verdict(ACCEPT) + converged + avg<3 → _apply_verdict(REJECT) + not converged → post debate round mention comment + → phase=awaiting_debate (debate_round=1) + + awaiting_debate → _advance_awaiting_debate() + all 3 revised reports in → consensus check again + converged → _apply_verdict + not converged AND debate_round < MAX_DEBATE_ROUNDS + → post next debate round → debate_round+=1 + not converged AND debate_round == MAX_DEBATE_ROUNDS + → phase=error, leave issue in_review for human + + On any parse failure (meta-judge YAML malformed, judge reports all unparseable): + → phase=error (per higher management direction: we do not deal with a + model that cannot perform YAML tasks; we escalate) + +Race fix invariant +------------------ + _apply_verdict calls client.update_issue_status() BEFORE queue.update_status("done"), + and retrigger comments are posted AFTER the issue status has moved out of in_review. + This guarantees poll_once cannot see (round done AND issue=in_review) simultaneously. """ from __future__ import annotations @@ -28,111 +48,32 @@ import re from datetime import datetime, timezone from typing import Any +import yaml + from coordinator.config import Config from coordinator.multica_client import MulticaClient from coordinator.queue import DebateQueue, Round + # --------------------------------------------------------------------------- -# Hardcoded v1 constants +# Hardcoded pipeline constants # --------------------------------------------------------------------------- -DEBATER_NAMES: list[str] = [ - "Senior Developer", - "Code Reviewer", - "AI Engineer", - "Project Manager Senior", -] +META_JUDGE_NAME: str = "Meta-Judge" +JUDGE_NAMES: list[str] = ["Judge-GPT", "Judge-Claude", "Judge-Gemini"] -JUDGE_NAME: str = "Reality Checker" +# Consensus math (from CEK judge-with-debate/SKILL.md lines 264-270): +# "all three overall scores within 0.5 of each other AND each per-criterion score +# within 1.0 across judges → consensus" +CONSENSUS_OVERALL_THRESHOLD: float = 0.5 +CONSENSUS_CRITERION_THRESHOLD: float = 1.0 -# Per-debater evidence prompts. -# Placeholders: {issue_title}, {issue_description}, {commit_url} -# Each prompt tells the debater exactly which tool output to ground its argument in. -_NO_MENTION_RULE: str = ( - "**Important:** Do not @-mention any other agent or person in your reply. " - "If you need to refer to another debater, use their plain name only " - "(e.g. \"Code Reviewer\" not \"@Code Reviewer\"). " - "Mentions cause unwanted cascade tasks." -) +# Accept if consensus average is ≥ 3.0 out of 5 (CEK default). +ACCEPT_MIN_SCORE: float = 3.0 -_SCOPE_SWAP_CLAUSE: str = ( - "Then, independently of your answer above, re-read the original issue description " - "(provided in this comment under **Description (excerpt)**) and judge: does the " - "committed work actually address what was asked, or did it answer a different " - "(easier) question? Specifically look for substitutions — e.g. the original asks " - "for X but the code delivers X-prime, a simpler version. If you detect a scope " - "swap, state it clearly: SCOPE SWAP DETECTED — [one sentence explaining what was " - "asked vs what was delivered]. Recommend REJECT on this basis alone, regardless " - "of how well the delivered work satisfies the substituted scope." -) +# Debate rounds cap (CEK default). +MAX_DEBATE_ROUNDS: int = 3 -DEBATER_PROMPTS: dict[str, str] = { - "Senior Developer": ( - "**Your task:** grep the committed code for LLM API calls " - "(`openai`, `anthropic`, `requests.post` to AI endpoints, etc.). Report:\n" - "1. Yes/no — does the code make LLM API calls?\n" - "2. The exact grep command you ran.\n" - "3. The full grep output (or 'no matches').\n\n" - "Max 300 words.\n\n" - + _SCOPE_SWAP_CLAUSE + "\n\n" - + _NO_MENTION_RULE - ), - "Code Reviewer": ( - "**Your task:** diff the committed implementation method-by-method against " - "every requirement in the issue description. For each requirement: state it, " - "then label MET or MISSING with one line of evidence from the diff. " - "Overall confidence: high / medium / low.\n\n" - "Max 300 words.\n\n" - + _SCOPE_SWAP_CLAUSE + "\n\n" - + _NO_MENTION_RULE - ), - "AI Engineer": ( - "**Your task:** ground-check every factual claim in the issue description " - "against the committed code. For each claim: state it, then verify or refute " - "with a direct code reference. Label each VERIFIED or UNVERIFIED.\n\n" - "Max 300 words.\n\n" - + _SCOPE_SWAP_CLAUSE + "\n\n" - + _NO_MENTION_RULE - ), - "Project Manager Senior": ( - "**Your task:** check the committed work against the original acceptance " - "criteria. For each criterion: satisfied (with evidence) or not met. " - "State your recommendation: ACCEPT or REJECT.\n\n" - "Max 300 words.\n\n" - + _SCOPE_SWAP_CLAUSE + "\n\n" - + _NO_MENTION_RULE - ), -} - -JUDGE_PROMPT_TEMPLATE: str = """\ -**Verdict requested — {issue_title}** - -**Original issue:** -{issue_description} - -**Commit:** {commit_url} - ---- -## Debater evidence - -{transcript} ---- - -Deliver your verdict. Begin your reply with exactly one of: -- `VERDICT: ACCEPT` — implementation satisfies all critical acceptance criteria -- `VERDICT: REJECT` — it does not - -Follow with reasoning (max 400 words). - -**Important:** Do not @-mention any agent or person in your reply. Use plain names only.\ -""" - -_COMMIT_URL_RE = re.compile( - r"https?://\S+/commit/[0-9a-f]+", re.IGNORECASE -) -_VERDICT_RE = re.compile( - r"^VERDICT:\s*(ACCEPT|REJECT)", re.MULTILINE | re.IGNORECASE -) # --------------------------------------------------------------------------- # Requirement anchoring — the single most important coordinator property. @@ -157,7 +98,15 @@ REWORK_INSTRUCTIONS = ( # --------------------------------------------------------------------------- -# Internal helpers +# Regexes +# --------------------------------------------------------------------------- + +_COMMIT_URL_RE = re.compile(r"https?://\S+/commit/[0-9a-f]+", re.IGNORECASE) +_YAML_FENCE_RE = re.compile(r"```(?:yaml|yml)?\s*\n(.*?)\n```", re.DOTALL | re.IGNORECASE) + + +# --------------------------------------------------------------------------- +# Small helpers # --------------------------------------------------------------------------- def _utcnow() -> str: @@ -187,10 +136,9 @@ def _find_commit_url(comments: list[dict[str, Any]]) -> str: return "" -def _parse_verdict(content: str) -> str | None: - """Return 'ACCEPT', 'REJECT', or None.""" - m = _VERDICT_RE.search(content) - return m.group(1).upper() if m else None +def _to_blockquote(text: str) -> str: + """Wrap each line of text with markdown blockquote prefix.""" + return "\n".join(f"> {line}" if line else ">" for line in text.split("\n")) def _cutoff_iso(comments: list[dict[str, Any]], after_comment_id: str) -> str: @@ -201,115 +149,282 @@ def _cutoff_iso(comments: list[dict[str, Any]], after_comment_id: str) -> str: return "" -def _collect_debater_replies( +def _find_reply_by_agent( comments: list[dict[str, Any]], - agent_map: dict[str, str], # name → agent_id + agent_id: str, after_comment_id: str, -) -> dict[str, str]: - """Return {debater_name: reply_content} for debaters who replied. - - Detection (in priority order): - 1. comment.author_id matches a debater's agent_id - 2. comment content starts with ``[{name} response]:`` (fallback for testing) - - Only comments posted *after* after_comment_id are considered. - """ - id_to_name: dict[str, str] = {v: k for k, v in agent_map.items()} +) -> tuple[str, str] | None: + """Return (comment_id, content) of the first reply by ``agent_id`` posted + strictly after ``after_comment_id``. None if no such reply exists.""" + if not agent_id: + return None cutoff = _cutoff_iso(comments, after_comment_id) - replies: dict[str, str] = {} - - for comment in comments: - # Skip the coordinator's own mention comment and anything before it. - if comment.get("id") == after_comment_id: - continue - created = comment.get("created_at", "") - if cutoff and created < cutoff: - continue - - content = comment.get("content", "") - author_id = comment.get("author_id", "") - - # Primary: agent ID match - if author_id in id_to_name: - name = id_to_name[author_id] - if name not in replies: - replies[name] = content - continue - - # Fallback: content-based marker (enables integration / smoke tests) - for name in DEBATER_NAMES: - marker = f"[{name} response]:".lower() - if content.strip().lower().startswith(marker) and name not in replies: - replies[name] = content - - return replies - - -def _collect_judge_reply( - comments: list[dict[str, Any]], - judge_agent_id: str, - after_comment_id: str, -) -> str | None: - """Return judge's comment content if found after after_comment_id, else None. - - Fallback: any comment containing ``VERDICT: ACCEPT|REJECT`` is accepted - (enables smoke / integration tests where the judge is impersonated). - """ - cutoff = _cutoff_iso(comments, after_comment_id) - for comment in comments: if comment.get("id") == after_comment_id: continue created = comment.get("created_at", "") if cutoff and created < cutoff: continue - - content = comment.get("content", "") - author_id = comment.get("author_id", "") - - if judge_agent_id and author_id == judge_agent_id: - return content - - # Fallback: content-based verdict detection - if _parse_verdict(content) is not None: - return content - + if comment.get("author_id") == agent_id: + return (comment.get("id", ""), comment.get("content", "")) return None -def _build_debater_comment( - agent_map: dict[str, str], +# --------------------------------------------------------------------------- +# YAML extraction and parsing +# --------------------------------------------------------------------------- + +def _extract_yaml(content: str) -> str: + """Pull the YAML block out of a reply. Prefers fenced code blocks. + + Returns the YAML text (without fences), or the original content if no fence + is found. The caller is responsible for deciding whether the raw content + is parseable. + """ + m = _YAML_FENCE_RE.search(content) + if m: + return m.group(1).strip() + return content.strip() + + +def _parse_rubric(content: str) -> dict[str, Any] | None: + """Parse meta-judge's reply into the rubric+checklist dict. + + Returns None on parse failure or if the parsed object does not have the + expected shape (at minimum a ``checklist`` or ``rubric_dimensions`` key). + """ + yaml_text = _extract_yaml(content) + try: + parsed = yaml.safe_load(yaml_text) + except yaml.YAMLError: + return None + if not isinstance(parsed, dict): + return None + # Accept either a wrapped form with evaluation_specification or a flat form + spec = parsed.get("evaluation_specification", parsed) + if not isinstance(spec, dict): + return None + if "checklist" not in spec and "rubric_dimensions" not in spec: + return None + return spec + + +def _parse_judge_report(content: str) -> dict[str, Any] | None: + """Parse a judge's evaluation report YAML into a dict. + + Returns None on parse failure or if the expected scoring keys are absent. + Expected shape (from CEK judge.md Expected Output section) includes either + ``evaluation_report`` wrapping or a flat form with ``score_calculation`` and + ``rubric_scores``. + """ + yaml_text = _extract_yaml(content) + try: + parsed = yaml.safe_load(yaml_text) + except yaml.YAMLError: + return None + if not isinstance(parsed, dict): + return None + report = parsed.get("evaluation_report", parsed) + if not isinstance(report, dict): + return None + # Accept the report if it contains at least final_score OR a scored + # rubric_scores list we can average from + score_calc = report.get("score_calculation", {}) or {} + if isinstance(score_calc, dict) and "final_score" in score_calc: + return report + rubric_scores = report.get("rubric_scores", []) or [] + if isinstance(rubric_scores, list) and any(isinstance(s, dict) and "score" in s for s in rubric_scores): + return report + return None + + +def _overall_score(report: dict[str, Any]) -> float | None: + """Extract the overall 0-5 score from a parsed judge report. + + Prefers ``score_calculation.final_score``; falls back to weighted average of + rubric_scores if final_score is missing. Returns None if neither source is + available. + """ + sc = report.get("score_calculation") or {} + if isinstance(sc, dict): + fs = sc.get("final_score") + if isinstance(fs, (int, float)): + return float(fs) + rubric_scores = report.get("rubric_scores") or [] + if isinstance(rubric_scores, list) and rubric_scores: + # Weighted average if weights are present, else plain average + total_weight = 0.0 + weighted = 0.0 + plain = [] + for item in rubric_scores: + if not isinstance(item, dict): + continue + s = item.get("score") + if not isinstance(s, (int, float)): + continue + w = item.get("weight") + if isinstance(w, (int, float)) and w > 0: + weighted += float(s) * float(w) + total_weight += float(w) + plain.append(float(s)) + if total_weight > 0: + return weighted / total_weight + if plain: + return sum(plain) / len(plain) + return None + + +def _criterion_scores(report: dict[str, Any]) -> dict[str, float]: + """Return {criterion_name: score} from a parsed judge report.""" + out: dict[str, float] = {} + for item in report.get("rubric_scores") or []: + if not isinstance(item, dict): + continue + name = item.get("name") or item.get("criterion_name") + s = item.get("score") + if isinstance(name, str) and isinstance(s, (int, float)): + out[name] = float(s) + return out + + +def _check_consensus( + reports: dict[str, dict[str, Any]], +) -> tuple[bool, str | None, float | None]: + """Given parsed reports keyed by judge name, check consensus. + + Returns (converged, verdict, avg_overall). + - converged: True if all overall scores are within CONSENSUS_OVERALL_THRESHOLD + and each per-criterion score is within CONSENSUS_CRITERION_THRESHOLD across + judges. Only criteria present in ALL reports participate in the per-criterion + check. + - verdict: "ACCEPT" if converged and avg_overall >= ACCEPT_MIN_SCORE, else + "REJECT" if converged, else None. + - avg_overall: mean of overall scores across judges with a parseable overall, + or None if no overall could be extracted. + """ + overalls: list[float] = [] + criterion_by_judge: dict[str, dict[str, float]] = {} + for name, report in reports.items(): + ov = _overall_score(report) + if ov is not None: + overalls.append(ov) + criterion_by_judge[name] = _criterion_scores(report) + + if not overalls: + return (False, None, None) + + avg = sum(overalls) / len(overalls) + + overall_converged = (max(overalls) - min(overalls)) <= CONSENSUS_OVERALL_THRESHOLD + + # Per-criterion check: only criteria present in every judge's report + common_criteria: set[str] | None = None + for scores in criterion_by_judge.values(): + keys = set(scores.keys()) + common_criteria = keys if common_criteria is None else (common_criteria & keys) + common_criteria = common_criteria or set() + + criterion_converged = True + for crit in common_criteria: + vals = [criterion_by_judge[name][crit] for name in criterion_by_judge if crit in criterion_by_judge[name]] + if vals and (max(vals) - min(vals)) > CONSENSUS_CRITERION_THRESHOLD: + criterion_converged = False + break + + converged = overall_converged and criterion_converged + if not converged: + return (False, None, avg) + verdict = "ACCEPT" if avg >= ACCEPT_MIN_SCORE else "REJECT" + return (True, verdict, avg) + + +# --------------------------------------------------------------------------- +# Comment builders (no prose authored by the coordinator — just plumbing) +# --------------------------------------------------------------------------- + +def _build_meta_judge_mention( + meta_judge_agent_id: str, + issue_title: str, + issue_description: str, +) -> str: + """Post the initial meta-judge mention comment. + + Contains only the @-mention, the task context that CEK meta-judge expects + (user prompt + artifact type hint), and no direction about WHAT to produce — + meta-judge's own instructions tell it that. + """ + mention = ( + f"[@{META_JUDGE_NAME}](mention://agent/{meta_judge_agent_id})" + if meta_judge_agent_id + else f"@{META_JUDGE_NAME}" + ) + desc_bq = _to_blockquote(issue_description.strip()) + return "\n\n".join([ + mention, + f"**User Prompt:** {issue_title}", + "**Description:**", + desc_bq, + "**Artifact Type:** code + documentation (review-only — a delivered commit will be evaluated against your specification by downstream judges).", + ]) + + +def _build_judge_mention_comment( + judge_agent_ids: dict[str, str], issue_title: str, issue_description: str, commit_url: str, + rubric_yaml: str, ) -> str: - commit_ref = commit_url or "(no commit link found in comments)" + """Post a mention comment that fires all 3 judges in parallel. + + Each judge sees: mentions (of all three), task, commit link, rubric YAML. + No debater prose, no scope-swap clause, no no-mention rule — judge.md + instructions cover those. + """ + mentions = " ".join( + f"[@{name}](mention://agent/{judge_agent_ids[name]})" + if judge_agent_ids.get(name) + else f"@{name}" + for name in JUDGE_NAMES + ) + desc_bq = _to_blockquote(issue_description.strip()) + return "\n\n".join([ + mentions, + f"**Original Task:** {issue_title}", + "**Description:**", + desc_bq, + f"**Commit under review:** {commit_url or '(no commit link found in delivery)'}", + "**Evaluation Specification (from meta-judge):**", + "```yaml", + rubric_yaml.strip(), + "```", + "Apply the specification to the commit. Post your `evaluation_report` YAML as your reply.", + ]) + + +def _build_debate_round_comment( + judge_agent_ids: dict[str, str], + round_num: int, + prior_reports: dict[str, str], +) -> str: + """Post a debate-round mention comment showing all 3 judges each others' prior reports.""" + mentions = " ".join( + f"[@{name}](mention://agent/{judge_agent_ids[name]})" + if judge_agent_ids.get(name) + else f"@{name}" + for name in JUDGE_NAMES + ) parts = [ - f"**Debate round opened — {issue_title}**\n", - f"Commit under review: {commit_ref}\n", - "---", + mentions, + f"**Debate round {round_num}** — your prior reports disagree beyond the consensus threshold.", + "Each of you: read the others' reports below, re-examine the commit, then post a REVISED `evaluation_report` YAML. You may hold your position if you have new evidence; you may move if you find the other reasoning more grounded. Do not split the difference to compromise.", ] - for name in DEBATER_NAMES: - agent_id = agent_map.get(name, "") - mention = ( - f"[@{name}](mention://agent/{agent_id})" if agent_id else f"@{name}" - ) - prompt = DEBATER_PROMPTS.get(name, "Please provide your evidence analysis.") - snippet = issue_description[:800].strip() - parts.append( - f"\n{mention}\n\n" - f"**Issue:** {issue_title}\n" - f"**Description (excerpt):** {snippet}\n" - f"**Commit:** {commit_ref}\n\n" - f"{prompt}" - ) - parts.append("\n---") - return "\n".join(parts) - - -def _to_blockquote(text: str) -> str: - """Wrap each line of text in a Markdown blockquote (``> `` prefix).""" - return "\n".join(f"> {line}" if line else ">" for line in text.splitlines()) + for name in JUDGE_NAMES: + content = prior_reports.get(name, "*(no prior report)*") + parts.append(f"---\n\n### {name} — prior report\n\n{content}") + parts.append("---") + parts.append("Post your revised report as a NEW reply. Do not edit prior reports.") + return "\n\n".join(parts) def _build_retrigger_comment( @@ -319,20 +434,10 @@ def _build_retrigger_comment( verdict_content: str, round_id: str, ) -> str: - """Build the full anchor-bearing retrigger comment body. - - Structure (verbatim — do not rearrange or summarise any section): - 1. @-mention to fire the agent - 2. One-line verdict summary - 3. ## ANCHOR — Original requirements (verbatim description, blockquoted) - 4. ## Why this was rejected (verbatim verdict, blockquoted) - 5. ## Instructions for rework (REWORK_INSTRUCTIONS constant) - 6. Trailing audit line - """ + """Anchor-bearing retrigger comment body — unchanged from the old pipeline.""" mention = f"[@{assignee_name}](mention://agent/{assignee_agent_id})" desc_bq = _to_blockquote(issue_description.strip()) verdict_bq = _to_blockquote(verdict_content.strip()) - return "\n\n".join([ mention, f"Verdict: REJECT (round {round_id})", @@ -344,13 +449,397 @@ def _build_retrigger_comment( def _build_coordinator_note_no_agent(round_id: str, reason: str) -> str: - """Coordinator-only note (no @-mention) when the assignee is not an agent.""" + """Coordinator-only note (no @-mention) when retrigger cannot resolve an assignee.""" return ( f"Verdict: REJECT (round {round_id}) \u2014 {reason}. " "Manual follow-up required." ) +def _build_error_note(round_id: str, reason: str) -> str: + """Posted when the round cannot complete cleanly (malformed output, timeout).""" + return ( + f"Review round {round_id} could not complete: {reason}. " + "Issue remains in `in_review`; human follow-up required. " + "— Coordinator" + ) + + +# --------------------------------------------------------------------------- +# Phase handlers +# --------------------------------------------------------------------------- + +def _start_round( + round_: Round, + client: MulticaClient, + queue: DebateQueue, + cfg: Config, + logger: logging.Logger, +) -> None: + """pending → running, post meta-judge mention, phase → awaiting_rubric.""" + logger.info("starting round %s for %s", round_.round_id, round_.identifier) + queue.update_status(round_.round_id, "running") + + try: + issue = client.get_issue(round_.issue_id) + except Exception as exc: + logger.error("round %s: cannot fetch issue %s: %s", round_.round_id, round_.issue_id, exc) + queue.update_status(round_.round_id, "error") + return + + issue_title = issue.get("title", round_.title) + issue_description = issue.get("description", "") or "" + + try: + agent_map = client.find_agents_by_name([META_JUDGE_NAME]) + except Exception as exc: + logger.error("round %s: cannot look up %s: %s", round_.round_id, META_JUDGE_NAME, exc) + queue.update_status(round_.round_id, "error") + return + + meta_id = agent_map.get(META_JUDGE_NAME, "") + body = _build_meta_judge_mention(meta_id, issue_title, issue_description) + try: + posted = client.post_comment(round_.issue_id, body) + meta_cid = posted.get("id", "") + except Exception as exc: + logger.error("round %s: cannot post meta-judge mention: %s", round_.round_id, exc) + queue.update_status(round_.round_id, "error") + return + + queue.update_phase( + round_.round_id, + "awaiting_rubric", + phase_entered_at=_utcnow(), + meta_judge_comment_id=meta_cid, + ) + logger.info("round %s: meta-judge mention posted id=%s", round_.round_id, meta_cid) + + +def _advance_awaiting_rubric( + round_: Round, + client: MulticaClient, + queue: DebateQueue, + cfg: Config, + logger: logging.Logger, +) -> None: + """Poll for meta-judge reply, extract rubric YAML, post judge mentions.""" + elapsed = _elapsed_s(round_.phase_entered_at) + timed_out = elapsed >= cfg.round_timeout_s + + try: + comments = client.list_comments(round_.issue_id) + except Exception as exc: + logger.error("round %s: cannot list comments: %s", round_.round_id, exc) + return + + try: + meta_map = client.find_agents_by_name([META_JUDGE_NAME]) + except Exception as exc: + logger.warning("round %s: cannot look up meta-judge agent: %s", round_.round_id, exc) + meta_map = {} + meta_id = meta_map.get(META_JUDGE_NAME, "") + + reply = _find_reply_by_agent(comments, meta_id, round_.meta_judge_comment_id) + if reply is None: + if timed_out: + logger.error( + "round %s: meta-judge did not reply within %ds — marking error", + round_.round_id, cfg.round_timeout_s, + ) + _post_error_note(round_, client, "meta-judge did not reply within timeout", logger) + queue.update_phase(round_.round_id, "error") + queue.update_status(round_.round_id, "error") + else: + logger.debug( + "round %s: awaiting meta-judge reply (%.0fs/%ds)", + round_.round_id, elapsed, cfg.round_timeout_s, + ) + return + + _meta_cid_reply, meta_content = reply + rubric_spec = _parse_rubric(meta_content) + if rubric_spec is None: + logger.error( + "round %s: meta-judge reply did not contain a parseable rubric — marking error (per higher-management direction: no retries on YAML failure)", + round_.round_id, + ) + _post_error_note( + round_, client, + "meta-judge reply did not contain parseable YAML rubric — a model that cannot produce YAML is not acceptable for this role", + logger, + ) + queue.update_phase(round_.round_id, "error") + queue.update_status(round_.round_id, "error") + return + + # Serialize the rubric spec back to YAML for the judge mention comment. + rubric_yaml = yaml.safe_dump(rubric_spec, sort_keys=False, default_flow_style=False) + + # Fetch issue for title/description and find commit URL from comments + try: + issue = client.get_issue(round_.issue_id) + except Exception as exc: + logger.error("round %s: cannot re-fetch issue: %s", round_.round_id, exc) + queue.update_phase(round_.round_id, "error") + queue.update_status(round_.round_id, "error") + return + + issue_title = issue.get("title", round_.title) + issue_description = issue.get("description", "") or "" + commit_url = _find_commit_url(comments) + + try: + judge_map = client.find_agents_by_name(JUDGE_NAMES) + except Exception as exc: + logger.warning("round %s: cannot look up judge agents: %s", round_.round_id, exc) + judge_map = {} + + body = _build_judge_mention_comment(judge_map, issue_title, issue_description, commit_url, rubric_yaml) + try: + posted = client.post_comment(round_.issue_id, body) + judge_mention_cid = posted.get("id", "") + except Exception as exc: + logger.error("round %s: cannot post judge mention: %s", round_.round_id, exc) + queue.update_phase(round_.round_id, "error") + queue.update_status(round_.round_id, "error") + return + + queue.update_phase( + round_.round_id, + "awaiting_judges", + phase_entered_at=_utcnow(), + rubric_yaml=rubric_yaml, + judge_mention_comment_id=judge_mention_cid, + debate_round=0, + ) + logger.info( + "round %s: judges mentioned (id=%s), awaiting 3 reports", + round_.round_id, judge_mention_cid, + ) + + +def _collect_judge_reports( + round_: Round, + client: MulticaClient, + logger: logging.Logger, +) -> tuple[dict[str, str], dict[str, str], dict[str, str]]: + """Return (raw_contents, parsed_ok_contents, judge_agent_ids) for judges who replied after + round_.judge_mention_comment_id (or the most recent debate-round mention). + + raw_contents and parsed_ok_contents are keyed by judge name. parsed_ok_contents + includes only judges whose report parsed successfully. + """ + try: + comments = client.list_comments(round_.issue_id) + except Exception as exc: + logger.error("round %s: cannot list comments: %s", round_.round_id, exc) + return ({}, {}, {}) + + try: + judge_map = client.find_agents_by_name(JUDGE_NAMES) + except Exception as exc: + logger.warning("round %s: cannot look up judges: %s", round_.round_id, exc) + judge_map = {} + + raw: dict[str, str] = {} + parsed: dict[str, str] = {} + after = round_.judge_mention_comment_id + for name in JUDGE_NAMES: + agent_id = judge_map.get(name, "") + if not agent_id: + continue + reply = _find_reply_by_agent(comments, agent_id, after) + if reply is None: + continue + _, content = reply + raw[name] = content + if _parse_judge_report(content) is not None: + parsed[name] = content + return (raw, parsed, judge_map) + + +def _advance_awaiting_judges( + round_: Round, + client: MulticaClient, + queue: DebateQueue, + cfg: Config, + logger: logging.Logger, +) -> None: + """Wait for all 3 judge reports; on consensus apply verdict; else start debate.""" + raw, parsed, judge_map = _collect_judge_reports(round_, client, logger) + + missing = [n for n in JUDGE_NAMES if n not in raw] + elapsed = _elapsed_s(round_.phase_entered_at) + timed_out = elapsed >= cfg.round_timeout_s + + if missing and not timed_out: + logger.debug( + "round %s: awaiting judges %s (%.0fs/%ds)", + round_.round_id, missing, elapsed, cfg.round_timeout_s, + ) + return + + # Persist the raw report comment IDs for audit — look them up again by name + report_ids = dict(round_.judge_report_comment_ids) + try: + comments = client.list_comments(round_.issue_id) + for name, content in raw.items(): + agent_id = judge_map.get(name, "") + # find the specific comment id matching (author_id, content) + for c in comments: + if c.get("author_id") == agent_id and c.get("content") == content: + report_ids[name] = c.get("id", "") + break + except Exception as exc: + logger.debug("round %s: could not persist report IDs: %s", round_.round_id, exc) + + if len(parsed) == 0: + logger.error( + "round %s: no parseable judge reports (0 of %d) — marking error", + round_.round_id, len(raw), + ) + _post_error_note(round_, client, "no judge produced a parseable evaluation_report YAML", logger) + queue.update_phase(round_.round_id, "error", judge_report_comment_ids=report_ids) + queue.update_status(round_.round_id, "error") + return + + reports: dict[str, dict[str, Any]] = { + name: _parse_judge_report(content) for name, content in parsed.items() # type: ignore[misc] + } + reports = {k: v for k, v in reports.items() if v is not None} + + converged, verdict, avg = _check_consensus(reports) + + if converged and verdict is not None: + logger.info( + "round %s: consensus reached after initial judges — verdict=%s avg=%.2f", + round_.round_id, verdict, avg or 0.0, + ) + verdict_summary = _format_verdict_summary(verdict, avg, reports) + _apply_verdict(round_, client, queue, verdict, verdict_summary, logger) + return + + # Not converged — start debate round 1 + if round_.debate_round >= MAX_DEBATE_ROUNDS: + logger.error( + "round %s: max debate rounds reached without consensus — marking error", + round_.round_id, + ) + _post_error_note(round_, client, f"no consensus after {MAX_DEBATE_ROUNDS} debate rounds", logger) + queue.update_phase(round_.round_id, "error", judge_report_comment_ids=report_ids) + queue.update_status(round_.round_id, "error") + return + + next_round = round_.debate_round + 1 + logger.info( + "round %s: no consensus (avg=%.2f, %d parseable) — starting debate round %d", + round_.round_id, avg or 0.0, len(parsed), next_round, + ) + body = _build_debate_round_comment(judge_map, next_round, raw) + try: + posted = client.post_comment(round_.issue_id, body) + debate_cid = posted.get("id", "") + except Exception as exc: + logger.error("round %s: cannot post debate round %d mention: %s", round_.round_id, next_round, exc) + queue.update_phase(round_.round_id, "error", judge_report_comment_ids=report_ids) + queue.update_status(round_.round_id, "error") + return + + queue.update_phase( + round_.round_id, + "awaiting_debate", + phase_entered_at=_utcnow(), + judge_mention_comment_id=debate_cid, + judge_report_comment_ids=report_ids, + debate_round=next_round, + ) + + +def _advance_awaiting_debate( + round_: Round, + client: MulticaClient, + queue: DebateQueue, + cfg: Config, + logger: logging.Logger, +) -> None: + """Same logic as awaiting_judges but on a subsequent round; caps at MAX_DEBATE_ROUNDS.""" + _advance_awaiting_judges(round_, client, queue, cfg, logger) + + +def _format_verdict_summary( + verdict: str, + avg: float | None, + reports: dict[str, dict[str, Any]], +) -> str: + """Build the verdict text used in both the acceptance comment and the rejection anchor.""" + lines = [f"VERDICT: {verdict}"] + if avg is not None: + lines.append(f"Consensus average score: {avg:.2f} / 5.0") + lines.append("") + lines.append("Per-judge overall scores:") + for name in JUDGE_NAMES: + r = reports.get(name) + ov = _overall_score(r) if r else None + if ov is not None: + lines.append(f"- {name}: {ov:.2f}") + else: + lines.append(f"- {name}: (no score)") + return "\n".join(lines) + + +def _apply_verdict( + round_: Round, + client: MulticaClient, + queue: DebateQueue, + verdict: str, + verdict_summary: str, + logger: logging.Logger, +) -> None: + """Update issue status and mark round done (or post retrigger on REJECT). + + RACE FIX: issue status update precedes queue.update_status("done"). + """ + new_issue_status = "done" if verdict == "ACCEPT" else "in_progress" + logger.info( + "round %s: verdict=%s → issue status=%s", + round_.round_id, verdict, new_issue_status, + ) + try: + client.update_issue_status(round_.issue_id, new_issue_status) + except Exception as exc: + logger.error( + "round %s: cannot update issue status — will retry next cycle: %s", + round_.round_id, exc, + ) + return + + # Post verdict summary comment + try: + client.post_comment(round_.issue_id, verdict_summary) + except Exception as exc: + logger.warning("round %s: cannot post verdict summary: %s", round_.round_id, exc) + + # On REJECT: post anchor retrigger + if verdict == "REJECT": + try: + issue = client.get_issue(round_.issue_id) + except Exception as exc: + logger.warning("round %s: cannot re-fetch issue for retrigger: %s", round_.round_id, exc) + issue = {} + retrigger_cid = _post_rejection_retrigger(round_, client, issue, verdict_summary, logger) + if retrigger_cid: + try: + queue.set_retrigger_comment_id(round_.round_id, retrigger_cid) + except Exception as exc: + logger.warning("round %s: cannot persist retrigger_comment_id: %s", round_.round_id, exc) + + phase = "accepted" if verdict == "ACCEPT" else "rejected" + queue.update_phase(round_.round_id, phase) + queue.update_status(round_.round_id, "done") + logger.info("round %s: complete (phase=%s)", round_.round_id, phase) + + def _post_rejection_retrigger( round_: Round, client: MulticaClient, @@ -358,16 +847,7 @@ def _post_rejection_retrigger( verdict_comment_content: str, logger: logging.Logger, ) -> str | None: - """Post the retrigger comment after a REJECT verdict. - - Returns the comment ID of the retrigger comment when the assignee is an - agent (caller should persist it), or ``None`` when a non-mentioning - coordinator note was posted instead (member/no-assignee) or when posting - failed entirely. - - Any failure is logged and swallowed — round completion must not depend on - this side-effect. - """ + """Post the anchored retrigger comment. Same behaviour as before.""" assignee_type = issue.get("assignee_type", "") assignee_id = issue.get("assignee_id", "") or "" @@ -387,17 +867,13 @@ def _post_rejection_retrigger( _build_coordinator_note_no_agent(round_.round_id, reason), ) except Exception as exc: - logger.warning( - "round %s: cannot post coordinator note: %s", - round_.round_id, exc, - ) + logger.warning("round %s: cannot post coordinator note: %s", round_.round_id, exc) return None - # Agent path — look up name (calls list_agents once) assignee_name = client.get_agent_name(assignee_id) if not assignee_name: logger.warning( - "round %s: assignee agent %s not found in workspace — posting note without mention", + "round %s: assignee %s not found in workspace — posting note without mention", round_.round_id, assignee_id, ) try: @@ -409,10 +885,7 @@ def _post_rejection_retrigger( ), ) except Exception as exc: - logger.warning( - "round %s: cannot post coordinator note: %s", - round_.round_id, exc, - ) + logger.warning("round %s: cannot post coordinator note: %s", round_.round_id, exc) return None issue_description = issue.get("description", "") or "" @@ -423,306 +896,26 @@ def _post_rejection_retrigger( posted = client.post_comment(round_.issue_id, body) cid = posted.get("id", "") logger.info( - "round %s: retrigger comment posted to @%s (id=%s)", + "round %s: retrigger posted to @%s (id=%s)", round_.round_id, assignee_name, cid, ) return cid except Exception as exc: - logger.warning( - "round %s: cannot post retrigger comment: %s", - round_.round_id, exc, - ) + logger.warning("round %s: cannot post retrigger: %s", round_.round_id, exc) return None -def _build_judge_comment( - judge_agent_id: str, - issue_title: str, - issue_description: str, - commit_url: str, - transcript: str, -) -> str: - mention = ( - f"[@{JUDGE_NAME}](mention://agent/{judge_agent_id})" - if judge_agent_id - else f"@{JUDGE_NAME}" - ) - body = JUDGE_PROMPT_TEMPLATE.format( - issue_title=issue_title, - issue_description=issue_description[:2000].strip(), - commit_url=commit_url or "N/A", - transcript=transcript, - ) - return f"{mention}\n\n{body}" - - -# --------------------------------------------------------------------------- -# Phase handlers -# --------------------------------------------------------------------------- - -def _start_round( +def _post_error_note( round_: Round, client: MulticaClient, - queue: DebateQueue, - cfg: Config, + reason: str, logger: logging.Logger, ) -> None: - """pending → running, post debater comment, phase → awaiting_debaters.""" - logger.info("starting round %s for %s", round_.round_id, round_.identifier) - - # Mark running immediately so poll_once won't double-enqueue on a bump. - queue.update_status(round_.round_id, "running") - + """Post a coordinator-only note explaining the round errored out.""" try: - issue = client.get_issue(round_.issue_id) + client.post_comment(round_.issue_id, _build_error_note(round_.round_id, reason)) except Exception as exc: - logger.error("round %s: cannot fetch issue %s: %s", round_.round_id, round_.issue_id, exc, exc_info=True) - queue.update_status(round_.round_id, "error") - return - - issue_title = issue.get("title", round_.title) - issue_description = issue.get("description", "") or "" - - try: - all_comments = client.list_comments(round_.issue_id) - except Exception as exc: - logger.warning("round %s: cannot list comments: %s", round_.round_id, exc) - all_comments = [] - - commit_url = _find_commit_url(all_comments) - - try: - agent_map = client.find_agents_by_name(DEBATER_NAMES) - except Exception as exc: - logger.warning("round %s: cannot look up debater agents: %s", round_.round_id, exc) - agent_map = {} - - body = _build_debater_comment(agent_map, issue_title, issue_description, commit_url) - try: - posted = client.post_comment(round_.issue_id, body) - coord_cid = posted.get("id", "") - except Exception as exc: - logger.error("round %s: cannot post debater mention comment: %s", round_.round_id, exc, exc_info=True) - queue.update_status(round_.round_id, "error") - return - - queue.update_phase( - round_.round_id, - "awaiting_debaters", - phase_entered_at=_utcnow(), - coordinator_comment_id=coord_cid, - ) - logger.info( - "round %s: debater comment posted (id=%s), awaiting debaters", - round_.round_id, coord_cid, - ) - - -def _advance_awaiting_debaters( - round_: Round, - client: MulticaClient, - queue: DebateQueue, - cfg: Config, - logger: logging.Logger, -) -> None: - """Poll debater replies → on all-replied or timeout, post judge comment.""" - elapsed = _elapsed_s(round_.phase_entered_at) - timed_out = elapsed >= cfg.round_timeout_s - - try: - comments = client.list_comments(round_.issue_id) - except Exception as exc: - logger.error("round %s: cannot list comments: %s", round_.round_id, exc, exc_info=True) - return - - try: - issue = client.get_issue(round_.issue_id) - except Exception as exc: - logger.warning("round %s: cannot fetch issue: %s", round_.round_id, exc) - issue = {} - - issue_title = issue.get("title", round_.title) - issue_description = issue.get("description", "") or "" - commit_url = _find_commit_url(comments) - - try: - agent_map = client.find_agents_by_name(DEBATER_NAMES) - except Exception as exc: - logger.warning("round %s: cannot look up debater agents: %s", round_.round_id, exc) - agent_map = {} - - replies = _collect_debater_replies( - comments, agent_map, round_.coordinator_comment_id - ) - missing = [n for n in DEBATER_NAMES if n not in replies] - - if missing and not timed_out: - if elapsed >= cfg.round_timeout_s * 0.8: - logger.warning( - "round %s: debaters approaching timeout (%.0fs / %ds), still missing: %s", - round_.round_id, elapsed, cfg.round_timeout_s, missing, - ) - else: - logger.debug( - "round %s: still waiting for debaters %s (%.0fs / %ds)", - round_.round_id, missing, elapsed, cfg.round_timeout_s, - ) - return - - if missing: - logger.warning( - "round %s: debater timeout after %.0fs — proceeding with partial evidence " - "(missing: %s)", - round_.round_id, elapsed, missing, - ) - else: - logger.info( - "round %s: all %d debaters replied, assembling transcript", - round_.round_id, len(DEBATER_NAMES), - ) - - # Assemble transcript - parts = [] - for name in DEBATER_NAMES: - if name in replies: - parts.append(f"### {name}\n\n{replies[name]}") - else: - parts.append(f"### {name}\n\n*(no response — timed out)*") - transcript = "\n\n".join(parts) - - try: - judge_map = client.find_agents_by_name([JUDGE_NAME]) - judge_agent_id = judge_map.get(JUDGE_NAME, "") - except Exception as exc: - logger.warning("round %s: cannot look up judge agent: %s", round_.round_id, exc) - judge_agent_id = "" - - judge_body = _build_judge_comment( - judge_agent_id, issue_title, issue_description, commit_url, transcript - ) - try: - posted = client.post_comment(round_.issue_id, judge_body) - judge_cid = posted.get("id", "") - except Exception as exc: - logger.error("round %s: cannot post judge comment: %s", round_.round_id, exc, exc_info=True) - return - - queue.update_phase( - round_.round_id, - "awaiting_judge", - phase_entered_at=_utcnow(), - judge_comment_id=judge_cid, - ) - logger.info( - "round %s: judge comment posted (id=%s), awaiting verdict", - round_.round_id, judge_cid, - ) - - -def _advance_awaiting_judge( - round_: Round, - client: MulticaClient, - queue: DebateQueue, - cfg: Config, - logger: logging.Logger, -) -> None: - """Poll for judge verdict. - - On verdict: update issue status FIRST (race fix), then mark round done. - On timeout: mark round error, leave issue in_review for human escalation. - """ - elapsed = _elapsed_s(round_.phase_entered_at) - if elapsed >= cfg.round_timeout_s: - logger.error( - "round %s: judge timeout after %.0fs — leaving issue in_review for human escalation", - round_.round_id, elapsed, - ) - queue.update_phase(round_.round_id, "error") - queue.update_status(round_.round_id, "error") - return - - try: - comments = client.list_comments(round_.issue_id) - except Exception as exc: - logger.error("round %s: cannot list comments: %s", round_.round_id, exc) - return - - try: - judge_map = client.find_agents_by_name([JUDGE_NAME]) - judge_agent_id = judge_map.get(JUDGE_NAME, "") - except Exception as exc: - logger.warning("round %s: cannot look up judge agent: %s", round_.round_id, exc) - judge_agent_id = "" - - judge_reply = _collect_judge_reply( - comments, judge_agent_id, round_.judge_comment_id - ) - - if judge_reply is None: - if elapsed >= cfg.round_timeout_s * 0.8: - logger.warning( - "round %s: judge approaching timeout (%.0fs / %ds) — no verdict yet", - round_.round_id, elapsed, cfg.round_timeout_s, - ) - else: - logger.debug( - "round %s: waiting for judge verdict (%.0fs / %ds)", - round_.round_id, elapsed, cfg.round_timeout_s, - ) - return - - verdict = _parse_verdict(judge_reply) - if verdict is None: - logger.warning( - "round %s: judge replied but no VERDICT: ACCEPT/REJECT found — treating as REJECT", - round_.round_id, - ) - verdict = "REJECT" - - new_issue_status = "done" if verdict == "ACCEPT" else "in_progress" - logger.info( - "round %s: verdict=%s → issue status=%s", - round_.round_id, verdict, new_issue_status, - ) - - # RACE FIX: issue status MUST be updated before marking round done. - # If poll_once runs after this line, the issue is no longer in_review, - # so no double-enqueue is possible regardless of round status. - try: - client.update_issue_status(round_.issue_id, new_issue_status) - except Exception as exc: - logger.error( - "round %s: cannot update issue status — will retry next cycle: %s", - round_.round_id, exc, exc_info=True, - ) - return # do NOT mark done; retry next cycle - - # On REJECT: fetch issue and post the full anchor retrigger comment. - if verdict == "REJECT": - try: - issue = client.get_issue(round_.issue_id) - except Exception as exc: - logger.warning( - "round %s: cannot fetch issue for retrigger — using empty dict: %s", - round_.round_id, exc, - ) - issue = {} - retrigger_cid = _post_rejection_retrigger( - round_, client, issue, judge_reply, logger, - ) - if retrigger_cid: - try: - queue.set_retrigger_comment_id(round_.round_id, retrigger_cid) - except Exception as exc: - logger.warning( - "round %s: cannot persist retrigger_comment_id: %s", - round_.round_id, exc, - ) - - phase = "accepted" if verdict == "ACCEPT" else "rejected" - queue.update_phase(round_.round_id, phase) - queue.update_status(round_.round_id, "done") - logger.info("round %s: complete (phase=%s)", round_.round_id, phase) + logger.warning("round %s: cannot post error note: %s", round_.round_id, exc) def _advance_round( @@ -734,14 +927,14 @@ def _advance_round( ) -> None: """Dispatch to the correct phase handler for a running round.""" if round_.phase == "convened": - # Should be caught by _start_round, but handle gracefully on restart. _start_round(round_, client, queue, cfg, logger) - elif round_.phase == "awaiting_debaters": - _advance_awaiting_debaters(round_, client, queue, cfg, logger) - elif round_.phase == "awaiting_judge": - _advance_awaiting_judge(round_, client, queue, cfg, logger) + elif round_.phase == "awaiting_rubric": + _advance_awaiting_rubric(round_, client, queue, cfg, logger) + elif round_.phase == "awaiting_judges": + _advance_awaiting_judges(round_, client, queue, cfg, logger) + elif round_.phase == "awaiting_debate": + _advance_awaiting_debate(round_, client, queue, cfg, logger) elif round_.phase in ("accepted", "rejected", "error"): - # Terminal phase but status still running — fix it. logger.warning( "round %s is in terminal phase %r but status=%r — correcting", round_.round_id, round_.phase, round_.status, @@ -762,25 +955,20 @@ def orchestrate_pending( logger: logging.Logger, client: MulticaClient | None = None, ) -> None: - """Process pending and in-flight debate rounds. + """Process pending and in-flight review rounds. Called from watch_loop after each poll_once. Idempotent and restart-safe: running rounds resume from their persisted phase. - - ``client`` may be injected for testing; if omitted a real client is created - from cfg. """ if client is None: client = MulticaClient(cfg.server_url, cfg.workspace_id, cfg.token) - # Start any pending rounds (pending → running/awaiting_debaters). for round_ in list(queue.pending()): try: _start_round(round_, client, queue, cfg, logger) except Exception as exc: logger.error("error starting round %s: %s", round_.round_id, exc, exc_info=True) - # Advance all running rounds (covers newly-started and in-flight). for round_ in list(queue.running()): try: _advance_round(round_, client, queue, cfg, logger) diff --git a/src/coordinator/queue.py b/src/coordinator/queue.py index 197237d..dac53c3 100644 --- a/src/coordinator/queue.py +++ b/src/coordinator/queue.py @@ -6,23 +6,27 @@ Schema (~/.coordinator/queue.json): { "rounds": [ { - "round_id": "", - "issue_id": "", - "identifier": "WYL-42", - "title": "...", - "enqueued_at": "", - "status": "pending", - "phase": "convened", - "phase_entered_at": "", - "coordinator_comment_id": "", - "judge_comment_id": "" + "round_id": "", + "issue_id": "", + "identifier": "WYL-42", + "title": "...", + "enqueued_at": "", + "status": "pending", + "phase": "convened", + "phase_entered_at": "", + "meta_judge_comment_id": "", + "rubric_yaml": "", + "judge_mention_comment_id":"", + "judge_report_comment_ids":{}, + "debate_round": 0, + "retrigger_comment_id": "" }, ... ] } status: pending | running | done | error -phase: convened | awaiting_debaters | awaiting_judge | accepted | rejected | error +phase: convened | awaiting_rubric | awaiting_judges | awaiting_debate | accepted | rejected | error Only the watcher appends; only the orchestrator updates status/phase. Completed/errored rounds stay in the file so it stays fully auditable. @@ -52,11 +56,14 @@ class Round: title: str enqueued_at: str status: str = "pending" # pending | running | done | error - phase: str = "convened" # convened | awaiting_debaters | awaiting_judge | accepted | rejected | error - phase_entered_at: str = "" # ISO8601 when current phase was entered (for timeout) - coordinator_comment_id: str = "" # ID of debater-mention comment posted by coordinator - judge_comment_id: str = "" # ID of judge-mention comment posted by coordinator - retrigger_comment_id: str = "" # ID of retrigger comment posted after REJECT (audit) + phase: str = "convened" # convened | awaiting_rubric | awaiting_judges | awaiting_debate | accepted | rejected | error + phase_entered_at: str = "" # ISO8601 when current phase was entered (for timeout) + meta_judge_comment_id: str = "" # coordinator comment mentioning meta-judge + rubric_yaml: str = "" # verbatim YAML returned by meta-judge + judge_mention_comment_id: str = "" # coordinator comment mentioning the 3 judges (or the debate-round comment on round N>0) + judge_report_comment_ids: dict[str, str] = field(default_factory=dict) # {judge_name: comment_id} + debate_round: int = 0 # 0 = initial round; 1..3 = debate rounds + retrigger_comment_id: str = "" # ID of retrigger comment posted after REJECT (audit) def to_dict(self) -> dict[str, Any]: return asdict(self) @@ -72,8 +79,11 @@ class Round: status=d.get("status", "pending"), phase=d.get("phase", "convened"), phase_entered_at=d.get("phase_entered_at", ""), - coordinator_comment_id=d.get("coordinator_comment_id", ""), - judge_comment_id=d.get("judge_comment_id", ""), + meta_judge_comment_id=d.get("meta_judge_comment_id", ""), + rubric_yaml=d.get("rubric_yaml", ""), + judge_mention_comment_id=d.get("judge_mention_comment_id", ""), + judge_report_comment_ids=dict(d.get("judge_report_comment_ids", {}) or {}), + debate_round=int(d.get("debate_round", 0)), retrigger_comment_id=d.get("retrigger_comment_id", ""), ) diff --git a/tests/test_integration.py b/tests/test_integration.py deleted file mode 100644 index 82019e2..0000000 --- a/tests/test_integration.py +++ /dev/null @@ -1,354 +0,0 @@ -"""Integration tests for the debate round orchestration. - -These tests hit the REAL multica API. They require the coordinator env vars: - - COORDINATOR_SERVER_URL e.g. https://multica.example.com - COORDINATOR_WORKSPACE_ID - COORDINATOR_TOKEN - -Run with: - python -m pytest tests/test_integration.py -m integration -v - -The test creates a scratch issue, exercises the full orchestration lifecycle -with simulated debater/judge responses (posted via the coordinator's own token -using the content-based fallback detector), verifies the result, then cleans up. -""" -from __future__ import annotations - -import html -import logging -import os -import time -from pathlib import Path - -import pytest - -pytestmark = pytest.mark.integration - - -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - -@pytest.fixture(scope="module") -def cfg(): - """Load Config from env. Skip module if vars missing.""" - required = [ - "COORDINATOR_SERVER_URL", - "COORDINATOR_WORKSPACE_ID", - "COORDINATOR_TOKEN", - ] - missing = [k for k in required if not os.environ.get(k)] - if missing: - pytest.skip( - f"Integration test requires env vars: {', '.join(missing)}\n" - "Put them in ~/.coordinator/env or export them." - ) - # Load env file if present (same as Config.from_env) - from coordinator.config import load_env_file - load_env_file() - from coordinator.config import Config - return Config.from_env() - - -@pytest.fixture(scope="module") -def client(cfg): - from coordinator.multica_client import MulticaClient - return MulticaClient(cfg.server_url, cfg.workspace_id, cfg.token) - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -def _post_fake_debater_replies(client, issue_id: str): - """Post content-marker comments for each debater via the coordinator token. - - The orchestrator's content-based fallback detector accepts these as valid - debater evidence replies. - """ - from coordinator.orchestrator import DEBATER_NAMES - for name in DEBATER_NAMES: - client.post_comment( - issue_id, - f"[{name} response]: Integration-test evidence for {name}. " - "This comment simulates the agent's evidence analysis.", - ) - - -def _post_fake_judge_verdict(client, issue_id: str, verdict: str = "ACCEPT"): - client.post_comment( - issue_id, - f"VERDICT: {verdict}\n\n" - "Integration test verdict. The implementation satisfies the acceptance criteria.", - ) - - -# --------------------------------------------------------------------------- -# Integration test: full round lifecycle -# --------------------------------------------------------------------------- - -@pytest.mark.integration -def test_full_debate_round_lifecycle(cfg, client, tmp_path): - """End-to-end debate round lifecycle against the real API. - - Steps: - 1. Create scratch issue, set to in_review - 2. poll_once → verify round enqueued - 3. _start_round → verify coordinator comment posted on real issue - 4. Post simulated debater responses (content-marker fallback) - 5. _advance_awaiting_debaters → verify judge comment posted - 6. Post simulated judge verdict - 7. _advance_awaiting_judge → verify issue status updated (RACE FIX verified) - 8. Cleanup - """ - from coordinator.__main__ import poll_once - from coordinator.orchestrator import ( - DEBATER_NAMES, - _advance_awaiting_debaters, - _advance_awaiting_judge, - _start_round, - ) - from coordinator.queue import DebateQueue - from coordinator.state import SeenState - - logger = logging.getLogger("test.integration") - issue_id = None - - try: - # --- 1. Create scratch issue --- - issue = client.create_issue( - title="[DEBATE-SMOKE-TEST] Integration test — auto-delete", - description=( - "This issue is created by the coordinator integration test. " - "It will be cleaned up automatically.\n\n" - "Commit: https://git.wylab.me/multica/coordinator/commit/test000" - ), - ) - issue_id = issue["id"] - logger.info("created scratch issue %s", issue_id) - - # Set to in_review so the watcher picks it up - client.update_issue_status(issue_id, "in_review") - - # --- 2. poll_once → enqueue round --- - state = SeenState.load(tmp_path / "seen.json") - queue = DebateQueue.load(tmp_path / "queue.json") - poll_once(client, state, queue, logger) - - assert queue.pending(), "Expected round enqueued after poll_once" - round_ = queue.pending()[0] - assert round_.issue_id == issue_id - assert round_.status == "pending" - - # --- 3. _start_round → debater comment posted --- - _start_round(round_, client, queue, cfg, logger) - - assert round_.status == "running", f"Expected running, got {round_.status}" - assert round_.phase == "awaiting_debaters", f"Expected awaiting_debaters, got {round_.phase}" - assert round_.coordinator_comment_id, "coordinator_comment_id must be set" - - # Verify comment appeared on the real issue - real_comments = client.list_comments(issue_id) - coord_comment = next( - (c for c in real_comments if c.get("id") == round_.coordinator_comment_id), - None, - ) - assert coord_comment is not None, "Coordinator comment not found in issue comments" - for name in DEBATER_NAMES: - assert name in coord_comment["content"], f"{name} not mentioned in coordinator comment" - - # --- 4. Post simulated debater responses --- - _post_fake_debater_replies(client, issue_id) - - # --- 5. _advance_awaiting_debaters → judge comment posted --- - _advance_awaiting_debaters(round_, client, queue, cfg, logger) - - assert round_.phase == "awaiting_judge", f"Expected awaiting_judge, got {round_.phase}" - assert round_.judge_comment_id, "judge_comment_id must be set" - - real_comments = client.list_comments(issue_id) - judge_comment = next( - (c for c in real_comments if c.get("id") == round_.judge_comment_id), - None, - ) - assert judge_comment is not None, "Judge comment not found in issue comments" - assert "Verdict requested" in judge_comment["content"] - - # --- 6. Post simulated judge verdict --- - _post_fake_judge_verdict(client, issue_id, verdict="ACCEPT") - - # --- 7. _advance_awaiting_judge → issue status updated --- - _advance_awaiting_judge(round_, client, queue, cfg, logger) - - assert round_.status == "done", f"Expected done, got {round_.status}" - assert round_.phase == "accepted", f"Expected accepted, got {round_.phase}" - - # Verify issue status via real API (not just in-memory) - refreshed = client.get_issue(issue_id) - assert refreshed.get("status") == "done", ( - f"Issue status should be 'done' after ACCEPT verdict, " - f"got {refreshed.get('status')!r}" - ) - - # RACE FIX: after _advance_awaiting_judge completes, issue is no longer in_review, - # so poll_once cannot double-enqueue this round. - # Note: other in_review issues in the workspace may also get enqueued — that's OK. - poll_once(client, state, queue, logger) - our_rounds = [r for r in queue.rounds if r.issue_id == issue_id] - assert len(our_rounds) == 1, ( - f"poll_once created a second round for issue {issue_id} (double-enqueue)" - ) - - logger.info("VERDICT OK — full lifecycle passed") - - finally: - # --- 8. Cleanup --- - if issue_id: - try: - client.update_issue_status(issue_id, "done") - logger.info("cleanup: issue %s set to done", issue_id) - except Exception as exc: - logger.warning("cleanup failed for issue %s: %s", issue_id, exc) - - -# --------------------------------------------------------------------------- -# Integration test: REJECT retrigger + anchor (WYL-51) -# --------------------------------------------------------------------------- - -@pytest.mark.integration -def test_retrigger_on_reject_end_to_end(cfg, client, tmp_path): - """End-to-end test: REJECT verdict triggers an anchor retrigger comment. - - Steps: - 1. Create scratch issue with multi-paragraph description + an agent assignee - 2. Manually enqueue a Round at phase=awaiting_judge - 3. Inject a VERDICT: REJECT comment via the content-marker fallback - 4. Run _advance_awaiting_judge - 5. Assert: issue=in_progress, retrigger comment exists with @mention, - verbatim description, and no-drift instruction - 6. Cleanup - """ - from coordinator.orchestrator import ( - REWORK_INSTRUCTIONS, - _advance_awaiting_judge, - ) - from coordinator.queue import DebateQueue, Round - - logger = logging.getLogger("test.integration.retrigger") - issue_id = None - - # Multi-paragraph description with unique sentinel values for verbatim check - DESCRIPTION = ( - "Integration test anchor paragraph one. Sentinel: ANCHOR-INTEG-PARA1.\n\n" - "Integration test anchor paragraph two. Sentinel: ANCHOR-INTEG-PARA2.\n\n" - "Acceptance criteria: must contain ANCHOR-INTEG-CRITERIA." - ) - - try: - # --- 1. Create scratch issue --- - issue = client.create_issue( - title="[DEBATE-SMOKE-TEST] WYL-51 retrigger smoke — auto-delete", - description=DESCRIPTION, - status="in_review", - ) - issue_id = issue["id"] - logger.info("created scratch issue %s", issue_id) - - # Set an agent assignee so retrigger fires with a mention. - # Pick the first available agent in the workspace. - agents = client.list_agents() - assignee = agents[0] if agents else None - if assignee: - client.update_issue( - issue_id, - assignee_id=assignee["id"], - assignee_type="agent", - ) - logger.info("set assignee to %s (%s)", assignee.get("name"), assignee["id"]) - - # --- 2. Manually enqueue a Round at phase=awaiting_judge --- - queue = DebateQueue.load(tmp_path / "queue.json") - round_ = queue.enqueue(issue_id, "WYL-51-SMOKE", "Retrigger smoke test") - queue.update_status(round_.round_id, "running") - queue.update_phase( - round_.round_id, - "awaiting_judge", - phase_entered_at=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - coordinator_comment_id="", - ) - - # --- 3. Inject VERDICT: REJECT comment --- - verdict_body = ( - "VERDICT: REJECT\n\n" - "Integration test rejection. The smoke test intentionally rejects this " - "delivery to verify the retrigger mechanism fires correctly." - ) - client.post_comment(issue_id, verdict_body) - - # Set judge_comment_id to empty so content-fallback triggers - round_.judge_comment_id = "" - queue.save() - - # --- 4. Run _advance_awaiting_judge --- - _advance_awaiting_judge(round_, client, queue, cfg, logger) - - # --- 5. Assertions --- - assert round_.status == "done", f"Expected round done, got {round_.status}" - assert round_.phase == "rejected", f"Expected rejected, got {round_.phase}" - - # (a) Issue status must be in_progress - refreshed = client.get_issue(issue_id) - assert refreshed.get("status") == "in_progress", ( - f"Issue should be in_progress after REJECT, got {refreshed.get('status')!r}" - ) - - # (b) A retrigger comment must exist on the issue - real_comments = client.list_comments(issue_id) - retrigger_cid = round_.retrigger_comment_id - if assignee: - # Agent path: retrigger_comment_id must be set - assert retrigger_cid, "retrigger_comment_id must be persisted on the round" - retrigger_comment = next( - (c for c in real_comments if c.get("id") == retrigger_cid), - None, - ) - assert retrigger_comment is not None, ( - f"Retrigger comment {retrigger_cid} not found in issue comments" - ) - # The API HTML-encodes some chars (e.g. " → ", > → >) in returned - # content. Decode before comparing so assertions match plain-string constants. - body = html.unescape(retrigger_comment["content"]) - - # (b) @-mention present - assert assignee["id"] in body, ( - "Retrigger comment must contain the assignee agent ID" - ) - - # (c) Full verbatim description present - for line in DESCRIPTION.splitlines(): - if line.strip(): - assert line in body, ( - f"Description line {line!r} not found verbatim in retrigger comment" - ) - - # (d) No-drift instruction present - assert REWORK_INSTRUCTIONS in body, ( - "REWORK_INSTRUCTIONS must appear verbatim in retrigger comment" - ) - - logger.info( - "retrigger comment (first 400 chars):\n%s", - body[:400], - ) - - logger.info("VERDICT OK — retrigger end-to-end passed") - - finally: - # --- 6. Cleanup --- - if issue_id: - try: - client.update_issue_status(issue_id, "done") - logger.info("cleanup: issue %s set to done", issue_id) - except Exception as exc: - logger.warning("cleanup failed for issue %s: %s", issue_id, exc) diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 35e1186..f97005c 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -1,544 +1,610 @@ -"""Unit tests for coordinator.orchestrator (WYL-45).""" +"""Tests for the CEK-native review pipeline (meta-judge + 3 judges + consensus).""" from __future__ import annotations import logging -import uuid -from dataclasses import dataclass, field -from datetime import datetime, timezone, timedelta -from pathlib import Path +import pathlib +from datetime import datetime, timezone from typing import Any import pytest +import yaml +from coordinator.queue import DebateQueue, Round from coordinator.orchestrator import ( - DEBATER_NAMES, - DEBATER_PROMPTS, - JUDGE_NAME, + ACCEPT_MIN_SCORE, + CONSENSUS_CRITERION_THRESHOLD, + CONSENSUS_OVERALL_THRESHOLD, + JUDGE_NAMES, + MAX_DEBATE_ROUNDS, + META_JUDGE_NAME, REWORK_INSTRUCTIONS, - _advance_awaiting_debaters, - _advance_awaiting_judge, - _collect_debater_replies, - _collect_judge_reply, + _advance_awaiting_debate, + _advance_awaiting_judges, + _advance_awaiting_rubric, + _advance_round, + _apply_verdict, + _build_coordinator_note_no_agent, + _build_debate_round_comment, + _build_judge_mention_comment, + _build_meta_judge_mention, + _build_retrigger_comment, + _check_consensus, + _criterion_scores, + _extract_yaml, _find_commit_url, - _parse_verdict, + _find_reply_by_agent, + _overall_score, + _parse_judge_report, + _parse_rubric, _post_rejection_retrigger, _start_round, - orchestrate_pending, + _utcnow, ) -from coordinator.queue import DebateQueue, Round + +_logger = logging.getLogger("test.orchestrator") # --------------------------------------------------------------------------- -# Helpers +# Fakes # --------------------------------------------------------------------------- -def _utcnow() -> str: - return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - - -def _past(seconds: float) -> str: - t = datetime.now(timezone.utc) - timedelta(seconds=seconds) - return t.strftime("%Y-%m-%dT%H:%M:%SZ") - - -@dataclass class FakeConfig: - server_url: str = "http://fake" - workspace_id: str = "ws-1" - token: str = "tok" - poll_interval_s: int = 30 - round_timeout_s: int = 600 - max_concurrent_rounds: int = 3 - seen_file: Path = Path("/tmp/seen.json") - queue_file: Path = Path("/tmp/queue.json") - log_file: Path = Path("/tmp/coordinator.log") + server_url = "http://x" + workspace_id = "wid" + token = "tok" + poll_interval_s = 30 + round_timeout_s = 600 + max_concurrent_rounds = 3 -@dataclass class FakeClient: - """Controllable stand-in for MulticaClient.""" - - issue: dict[str, Any] = field(default_factory=lambda: { - "id": "issue-1", - "title": "Test issue", - "description": "Test description. Commit: https://git.example.com/commit/abc123", - "status": "in_review", - }) - comments: list[dict[str, Any]] = field(default_factory=list) - agents: list[dict[str, Any]] = field(default_factory=list) - posted_comments: list[str] = field(default_factory=list) - status_updates: list[tuple[str, str]] = field(default_factory=list) - post_comment_returns_id: str = "comment-coord" + def __init__(self) -> None: + self.issue: dict[str, Any] = { + "id": "issue-1", + "title": "Do the thing", + "description": "Please do the thing clearly.", + "status": "in_review", + "assignee_type": None, + "assignee_id": None, + } + self.comments: list[dict[str, Any]] = [] + self.posted_comments: list[str] = [] + self.agents: list[dict[str, Any]] = [ + {"name": META_JUDGE_NAME, "id": "agent-meta"}, + {"name": "Judge-GPT", "id": "agent-gpt"}, + {"name": "Judge-Claude", "id": "agent-claude"}, + {"name": "Judge-Gemini", "id": "agent-gemini"}, + ] + self._next_comment_id = 1000 def get_issue(self, issue_id: str) -> dict[str, Any]: - return self.issue + return dict(self.issue) + + def update_issue_status(self, issue_id: str, status: str) -> dict[str, Any]: + self.issue["status"] = status + return {"id": issue_id, "status": status} def list_comments(self, issue_id: str) -> list[dict[str, Any]]: return list(self.comments) def post_comment(self, issue_id: str, content: str) -> dict[str, Any]: self.posted_comments.append(content) - cid = self.post_comment_returns_id - # Advance the ID for subsequent calls - self.post_comment_returns_id = "comment-" + str(len(self.posted_comments)) - ts = _utcnow() - new_comment = {"id": cid, "content": content, "created_at": ts, - "author_id": "coord", "author_type": "member"} - self.comments.append(new_comment) - return new_comment + cid = f"posted-{self._next_comment_id}" + self._next_comment_id += 1 + created = _utcnow() + self.comments.append({ + "id": cid, + "content": content, + "author_id": "coord-user", + "created_at": created, + }) + return {"id": cid, "created_at": created} def list_agents(self) -> list[dict[str, Any]]: return list(self.agents) - def find_agents_by_name(self, names) -> dict[str, str]: - wanted = set(names) - return {a["name"]: a["id"] for a in self.agents if a["name"] in wanted} + def find_agents_by_name(self, names): + want = set(names) + return {a["name"]: a["id"] for a in self.agents if a["name"] in want} def get_agent_name(self, agent_id: str) -> str | None: for a in self.agents: - if a.get("id") == agent_id: - return a.get("name") + if a["id"] == agent_id: + return a["name"] return None - def update_issue_status(self, issue_id: str, status: str) -> dict[str, Any]: - self.status_updates.append((issue_id, status)) - self.issue["status"] = status - return self.issue - def update_issue(self, issue_id: str, **fields: Any) -> dict[str, Any]: - self.issue.update(fields) - return self.issue +def _past(seconds_ago: int) -> str: + from datetime import timedelta + t = datetime.now(timezone.utc) - timedelta(seconds=seconds_ago) + return t.strftime("%Y-%m-%dT%H:%M:%SZ") -def _make_round( - *, - status: str = "pending", - phase: str = "convened", - phase_entered_at: str = "", - coordinator_comment_id: str = "", - judge_comment_id: str = "", -) -> Round: - return Round( - round_id=str(uuid.uuid4()), +def _reply_comment(agent_id: str, content: str, created: str | None = None) -> dict[str, Any]: + return { + "id": f"reply-{agent_id}", + "author_id": agent_id, + "content": content, + "created_at": created or _utcnow(), + } + + +def _rubric_yaml_sample() -> str: + spec = { + "checklist": [ + {"question": "Does the code compile?", "category": "hard_rule", "importance": "essential", "rationale": "basic"}, + {"question": "Is documentation present?", "category": "principle", "importance": "important", "rationale": "quality"}, + ], + "rubric_dimensions": [ + {"name": "Correctness", "description": "Does the code work", "scale": "1-5", "weight": 0.6, "score_definitions": {1: "no", 5: "perfect"}}, + {"name": "Clarity", "description": "Readability", "scale": "1-5", "weight": 0.4, "score_definitions": {1: "opaque", 5: "crystal"}}, + ], + } + return yaml.safe_dump(spec, sort_keys=False) + + +def _judge_report(final_score: float, criteria: dict[str, float]) -> str: + report = { + "evaluation_report": { + "score_calculation": {"final_score": final_score}, + "rubric_scores": [ + {"name": k, "score": v, "weight": 0.5} for k, v in criteria.items() + ], + "executive_summary": f"score {final_score}", + } + } + return "```yaml\n" + yaml.safe_dump(report, sort_keys=False) + "\n```" + + +def _make_round(tmp_path: pathlib.Path, **overrides) -> tuple[Round, DebateQueue]: + r = Round( + round_id="r1", issue_id="issue-1", - identifier="WYL-99", - title="Test issue", + identifier="WYL-X", + title="Do the thing", enqueued_at=_utcnow(), - status=status, - phase=phase, - phase_entered_at=phase_entered_at or _utcnow(), - coordinator_comment_id=coordinator_comment_id, - judge_comment_id=judge_comment_id, + status="running", + phase="convened", + phase_entered_at=_utcnow(), ) - - -def _make_queue(tmp_path: Path, *rounds: Round) -> DebateQueue: + for k, v in overrides.items(): + setattr(r, k, v) q = DebateQueue.load(tmp_path / "queue.json") - for r in rounds: - q.rounds.append(r) + q.rounds.append(r) q.save() - return q - - -_logger = logging.getLogger("test.orchestrator") + return r, q # --------------------------------------------------------------------------- -# _parse_verdict +# Pure helpers # --------------------------------------------------------------------------- -def test_parse_verdict_accept(): - assert _parse_verdict("VERDICT: ACCEPT\n\nGreat work.") == "ACCEPT" +def test_extract_yaml_from_fenced_block(): + content = "Here is the rubric:\n\n```yaml\nchecklist:\n - question: foo\n```\n\nDone." + y = _extract_yaml(content) + assert y.startswith("checklist:") + assert "question: foo" in y -def test_parse_verdict_reject(): - assert _parse_verdict("VERDICT: REJECT\n\nMissing tests.") == "REJECT" +def test_extract_yaml_from_unfenced_content(): + content = "checklist:\n - question: foo" + y = _extract_yaml(content) + assert y == content.strip() -def test_parse_verdict_case_insensitive(): - assert _parse_verdict("verdict: accept") == "ACCEPT" +def test_parse_rubric_valid_flat(): + spec = _parse_rubric(f"```yaml\n{_rubric_yaml_sample()}\n```") + assert spec is not None + assert "checklist" in spec or "rubric_dimensions" in spec -def test_parse_verdict_none_when_absent(): - assert _parse_verdict("No verdict here.") is None +def test_parse_rubric_valid_wrapped(): + wrapped = yaml.safe_dump({"evaluation_specification": yaml.safe_load(_rubric_yaml_sample())}) + spec = _parse_rubric(f"```yaml\n{wrapped}\n```") + assert spec is not None + assert "rubric_dimensions" in spec + + +def test_parse_rubric_rejects_malformed_yaml(): + assert _parse_rubric("```yaml\nnot: valid: nested: without: quotes\n```") is None + + +def test_parse_rubric_rejects_yaml_without_expected_keys(): + assert _parse_rubric("```yaml\njust_some: random\n```") is None + + +def test_parse_judge_report_valid_with_final_score(): + content = _judge_report(3.7, {"Correctness": 4.0, "Clarity": 3.5}) + r = _parse_judge_report(content) + assert r is not None + assert r["score_calculation"]["final_score"] == 3.7 + + +def test_parse_judge_report_valid_without_final_score_but_rubric_scores(): + report = { + "evaluation_report": { + "rubric_scores": [ + {"name": "Correctness", "score": 4.0, "weight": 1.0}, + ] + } + } + content = "```yaml\n" + yaml.safe_dump(report) + "\n```" + assert _parse_judge_report(content) is not None + + +def test_parse_judge_report_rejects_empty(): + assert _parse_judge_report("```yaml\nnothing: here\n```") is None + + +def test_overall_score_prefers_final_score(): + r = {"score_calculation": {"final_score": 2.8}, "rubric_scores": [{"name": "x", "score": 5, "weight": 1}]} + assert _overall_score(r) == 2.8 + + +def test_overall_score_falls_back_to_weighted_average(): + r = {"rubric_scores": [ + {"name": "a", "score": 4.0, "weight": 0.6}, + {"name": "b", "score": 2.0, "weight": 0.4}, + ]} + assert _overall_score(r) == pytest.approx(3.2) + + +def test_overall_score_none_when_nothing_to_extract(): + assert _overall_score({}) is None + + +def test_criterion_scores_extracts_names_and_scores(): + r = {"rubric_scores": [ + {"name": "Correctness", "score": 4.0}, + {"name": "Clarity", "score": 3.0}, + ]} + s = _criterion_scores(r) + assert s == {"Correctness": 4.0, "Clarity": 3.0} + + +def test_check_consensus_converged_accept(): + reports = { + "Judge-GPT": {"score_calculation": {"final_score": 4.0}, "rubric_scores": [{"name": "C", "score": 4}]}, + "Judge-Claude": {"score_calculation": {"final_score": 4.2}, "rubric_scores": [{"name": "C", "score": 4}]}, + "Judge-Gemini": {"score_calculation": {"final_score": 4.1}, "rubric_scores": [{"name": "C", "score": 4}]}, + } + converged, verdict, avg = _check_consensus(reports) + assert converged is True + assert verdict == "ACCEPT" + assert avg == pytest.approx((4.0 + 4.2 + 4.1) / 3) + + +def test_check_consensus_converged_reject_low_score(): + reports = { + n: {"score_calculation": {"final_score": 2.5}, "rubric_scores": [{"name": "C", "score": 2}]} + for n in JUDGE_NAMES + } + converged, verdict, avg = _check_consensus(reports) + assert converged is True + assert verdict == "REJECT" + + +def test_check_consensus_not_converged_overall_spread(): + reports = { + "Judge-GPT": {"score_calculation": {"final_score": 2.0}}, + "Judge-Claude": {"score_calculation": {"final_score": 4.0}}, + "Judge-Gemini": {"score_calculation": {"final_score": 3.0}}, + } + converged, verdict, avg = _check_consensus(reports) + assert converged is False + assert verdict is None + assert avg == pytest.approx(3.0) + + +def test_check_consensus_not_converged_criterion_spread(): + reports = { + "Judge-GPT": {"score_calculation": {"final_score": 3.0}, "rubric_scores": [{"name": "C", "score": 2}]}, + "Judge-Claude": {"score_calculation": {"final_score": 3.1}, "rubric_scores": [{"name": "C", "score": 5}]}, + "Judge-Gemini": {"score_calculation": {"final_score": 3.0}, "rubric_scores": [{"name": "C", "score": 3}]}, + } + converged, _, _ = _check_consensus(reports) + assert converged is False + + +def test_check_consensus_no_overalls_returns_false(): + reports = {n: {} for n in JUDGE_NAMES} + converged, verdict, avg = _check_consensus(reports) + assert converged is False + assert avg is None + + +# --------------------------------------------------------------------------- +# Comment builders +# --------------------------------------------------------------------------- + +def test_meta_judge_mention_contains_mention_and_description(): + body = _build_meta_judge_mention("agent-meta", "Title", "Description line 1\nDescription line 2") + assert "mention://agent/agent-meta" in body + assert META_JUDGE_NAME in body + assert "Description line 1" in body + assert "Description line 2" in body + + +def test_judge_mention_contains_all_three_mentions_and_rubric(): + judge_ids = {"Judge-GPT": "a", "Judge-Claude": "b", "Judge-Gemini": "c"} + body = _build_judge_mention_comment(judge_ids, "Title", "Desc", "https://example.com/commit/abc", "rubric: yes") + for n in JUDGE_NAMES: + assert n in body + for agent_id in ("a", "b", "c"): + assert f"mention://agent/{agent_id}" in body + assert "rubric: yes" in body + assert "https://example.com/commit/abc" in body + + +def test_debate_round_comment_quotes_all_prior_reports(): + judge_ids = {n: f"id-{n}" for n in JUDGE_NAMES} + prior = {n: f"REPORT FROM {n}" for n in JUDGE_NAMES} + body = _build_debate_round_comment(judge_ids, 1, prior) + assert "Debate round 1" in body + for n in JUDGE_NAMES: + assert n in body + assert f"REPORT FROM {n}" in body + + +def test_retrigger_comment_has_anchor_and_no_drift_instructions(): + body = _build_retrigger_comment("Worker", "agent-worker", "Original desc line.", "VERDICT: REJECT", "r1") + assert "mention://agent/agent-worker" in body + assert "ANCHOR" in body + assert "Original desc line." in body + assert REWORK_INSTRUCTIONS in body + + +def test_coordinator_note_no_agent_has_no_mention(): + body = _build_coordinator_note_no_agent("r1", "no assignee set") + assert "mention://" not in body + assert "Manual follow-up required" in body # --------------------------------------------------------------------------- # _find_commit_url # --------------------------------------------------------------------------- -def test_find_commit_url_found(): +def test_find_commit_url_picks_latest(): comments = [ - {"content": "Watcher implemented. Commit: https://git.example.com/multica/foo/commit/abc123"}, + {"content": "older https://git.example/commit/abc123", "created_at": "2026-01-01T00:00:00Z"}, + {"content": "newer https://git.example/commit/def456", "created_at": "2026-01-02T00:00:00Z"}, ] - assert _find_commit_url(comments) == "https://git.example.com/multica/foo/commit/abc123" - - -def test_find_commit_url_returns_last(): - comments = [ - {"content": "Commit: https://git.example.com/multica/foo/commit/aaa111"}, - {"content": "Follow-up commit: https://git.example.com/multica/foo/commit/bbb222"}, - ] - assert _find_commit_url(comments) == "https://git.example.com/multica/foo/commit/bbb222" + assert _find_commit_url(comments) == "https://git.example/commit/def456" def test_find_commit_url_empty_when_absent(): - assert _find_commit_url([{"content": "No link here."}]) == "" + assert _find_commit_url([{"content": "no urls"}]) == "" # --------------------------------------------------------------------------- -# _collect_debater_replies +# _find_reply_by_agent # --------------------------------------------------------------------------- -def _make_comment( - cid: str, - content: str, - author_id: str = "anon", - ts: str = "", -) -> dict[str, Any]: - return { - "id": cid, - "content": content, - "author_id": author_id, - "author_type": "agent", - "created_at": ts or _utcnow(), - } - - -def test_collect_replies_by_agent_id(): - cutoff_ts = _past(10) - agent_map = {"Senior Developer": "agent-sd"} +def test_find_reply_by_agent_respects_cutoff(): comments = [ - {"id": "coord", "content": "Debate opened", "author_id": "coord", - "author_type": "member", "created_at": cutoff_ts}, - _make_comment("c1", "Evidence here.", author_id="agent-sd", - ts=_utcnow()), + {"id": "before", "author_id": "agent-x", "content": "early", "created_at": _past(100)}, + {"id": "cutoff", "author_id": "coord", "content": "mention", "created_at": _past(50)}, + {"id": "after", "author_id": "agent-x", "content": "late", "created_at": _past(10)}, ] - replies = _collect_debater_replies(comments, agent_map, "coord") - assert "Senior Developer" in replies + found = _find_reply_by_agent(comments, "agent-x", "cutoff") + assert found == ("after", "late") -def test_collect_replies_content_fallback(): - """Content-based marker accepted when agent ID not in map.""" - cutoff_ts = _past(10) - agent_map = {} - comments = [ - {"id": "coord", "content": "Debate opened", "author_id": "coord", - "author_type": "member", "created_at": cutoff_ts}, - _make_comment( - "c1", - "[Senior Developer response]: grep output here.", - author_id="someone-else", - ts=_utcnow(), - ), - ] - replies = _collect_debater_replies(comments, agent_map, "coord") - assert "Senior Developer" in replies - - -def test_collect_replies_skips_before_cutoff(): - """Comments before coordinator's mention are ignored.""" - early = _past(20) - cutoff_ts = _past(10) - agent_map = {"Senior Developer": "agent-sd"} - comments = [ - _make_comment("early", "Early reply", author_id="agent-sd", ts=early), - {"id": "coord", "content": "Debate opened", "author_id": "coord", - "author_type": "member", "created_at": cutoff_ts}, - ] - replies = _collect_debater_replies(comments, agent_map, "coord") - assert "Senior Developer" not in replies - - -def test_collect_replies_no_duplicate_per_debater(): - cutoff_ts = _past(10) - agent_map = {"Senior Developer": "agent-sd"} - comments = [ - {"id": "coord", "content": "x", "author_id": "coord", - "author_type": "member", "created_at": cutoff_ts}, - _make_comment("c1", "First reply", author_id="agent-sd", ts=_utcnow()), - _make_comment("c2", "Second reply", author_id="agent-sd", ts=_utcnow()), - ] - replies = _collect_debater_replies(comments, agent_map, "coord") - assert replies["Senior Developer"] == "First reply" - - -# --------------------------------------------------------------------------- -# _collect_judge_reply -# --------------------------------------------------------------------------- - -def test_collect_judge_reply_by_agent_id(): - cutoff_ts = _past(10) - comments = [ - {"id": "jc", "content": "Verdict requested", "author_id": "coord", - "created_at": cutoff_ts}, - _make_comment("j1", "VERDICT: ACCEPT\n\nGreat.", author_id="agent-judge", - ts=_utcnow()), - ] - result = _collect_judge_reply(comments, "agent-judge", "jc") - assert result is not None - assert "ACCEPT" in result - - -def test_collect_judge_reply_content_fallback(): - cutoff_ts = _past(10) - comments = [ - {"id": "jc", "content": "x", "author_id": "coord", "created_at": cutoff_ts}, - _make_comment("j1", "VERDICT: REJECT\n\nMissing tests.", author_id="anyone", - ts=_utcnow()), - ] - result = _collect_judge_reply(comments, "", "jc") - assert result is not None - assert "REJECT" in result - - -def test_collect_judge_reply_none_when_absent(): - cutoff_ts = _past(10) - comments = [ - {"id": "jc", "content": "x", "author_id": "coord", "created_at": cutoff_ts}, - ] - assert _collect_judge_reply(comments, "agent-judge", "jc") is None +def test_find_reply_by_agent_none_when_no_match(): + comments = [{"id": "x", "author_id": "other", "content": "hi", "created_at": _utcnow()}] + assert _find_reply_by_agent(comments, "agent-x", "somewhere") is None # --------------------------------------------------------------------------- # _start_round # --------------------------------------------------------------------------- -def test_start_round_marks_running_before_post(tmp_path): - """Status must be 'running' before any API call (early guard against double-enqueue).""" - call_order: list[str] = [] - - class TrackingQueue(DebateQueue): - def update_status(self, round_id, status): - call_order.append(f"update_status:{status}") - super().update_status(round_id, status) - - class TrackingClient(FakeClient): - def post_comment(self, issue_id, content): - call_order.append("post_comment") - return super().post_comment(issue_id, content) - - r = _make_round() - q = TrackingQueue.load(tmp_path / "queue.json") - q.rounds.append(r) - q.save() - - _start_round(r, TrackingClient(), q, FakeConfig(), _logger) - - # update_status("running") must precede post_comment - running_idx = next(i for i, v in enumerate(call_order) if v == "update_status:running") - post_idx = next(i for i, v in enumerate(call_order) if v == "post_comment") - assert running_idx < post_idx - - -def test_start_round_posts_debater_comment(tmp_path): - r = _make_round() - q = _make_queue(tmp_path, r) +def test_start_round_posts_meta_judge_mention_and_sets_phase(tmp_path): + r, q = _make_round(tmp_path, phase="convened", status="pending") client = FakeClient() - _start_round(r, client, q, FakeConfig(), _logger) + assert r.status == "running" assert len(client.posted_comments) == 1 - comment = client.posted_comments[0] - assert "Debate round opened" in comment - for name in DEBATER_NAMES: - assert name in comment + body = client.posted_comments[0] + assert "mention://agent/agent-meta" in body + assert r.phase == "awaiting_rubric" + assert r.meta_judge_comment_id != "" -def test_start_round_sets_phase_awaiting_debaters(tmp_path): - r = _make_round() - q = _make_queue(tmp_path, r) +def test_start_round_marks_error_on_api_failure(tmp_path): + r, q = _make_round(tmp_path, phase="convened", status="pending") - _start_round(r, FakeClient(), q, FakeConfig(), _logger) - - assert r.phase == "awaiting_debaters" - assert r.status == "running" - assert r.coordinator_comment_id # must be set - - -def test_start_round_error_on_api_failure(tmp_path): - class BrokenClient(FakeClient): - def post_comment(self, issue_id, content): - raise RuntimeError("API down") - - r = _make_round() - q = _make_queue(tmp_path, r) - - _start_round(r, BrokenClient(), q, FakeConfig(), _logger) + class FailingClient(FakeClient): + def get_issue(self, issue_id): + raise RuntimeError("no issue") + client = FailingClient() + _start_round(r, client, q, FakeConfig(), _logger) assert r.status == "error" # --------------------------------------------------------------------------- -# _advance_awaiting_debaters — debater replies +# _advance_awaiting_rubric # --------------------------------------------------------------------------- -def _debater_round(tmp_path: Path) -> tuple[Round, DebateQueue, FakeClient]: - """Return a round already in awaiting_debaters phase.""" - r = _make_round( - status="running", - phase="awaiting_debaters", - phase_entered_at=_utcnow(), - coordinator_comment_id="coord-comment", +def test_advance_awaiting_rubric_waits_when_meta_judge_silent(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_rubric", status="running", + meta_judge_comment_id="meta-c", phase_entered_at=_utcnow(), ) - q = _make_queue(tmp_path, r) - client = FakeClient() - # Seed coordinator comment in the comment list - client.comments.append({ - "id": "coord-comment", - "content": "Debate opened", - "author_id": "coord", - "author_type": "member", - "created_at": _past(5), - }) - # Seed debater agents - client.agents = [{"name": name, "id": f"agent-{i}"} for i, name in enumerate(DEBATER_NAMES)] - client.agents.append({"name": JUDGE_NAME, "id": "agent-judge"}) - return r, q, client + client.comments = [{"id": "meta-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}] + _advance_awaiting_rubric(r, client, q, FakeConfig(), _logger) + assert r.phase == "awaiting_rubric" + assert r.status == "running" -def test_advance_debaters_waits_when_not_all_replied(tmp_path): - r, q, client = _debater_round(tmp_path) - - _advance_awaiting_debaters(r, client, q, FakeConfig(), _logger) - - # Not enough replies → no judge comment posted - assert r.phase == "awaiting_debaters" - assert all("Verdict requested" not in c for c in client.posted_comments) - - -def test_advance_debaters_proceeds_when_all_replied(tmp_path): - r, q, client = _debater_round(tmp_path) - - # Add a reply from each debater - for i, name in enumerate(DEBATER_NAMES): - client.comments.append({ - "id": f"reply-{i}", - "content": f"[{name} response]: Evidence here.", - "author_id": f"agent-{i}", - "author_type": "agent", - "created_at": _utcnow(), - }) - - _advance_awaiting_debaters(r, client, q, FakeConfig(), _logger) - - assert r.phase == "awaiting_judge" - assert r.judge_comment_id - judge_comment = client.posted_comments[-1] - assert "Verdict requested" in judge_comment - for name in DEBATER_NAMES: - assert name in judge_comment - - -def test_advance_debaters_timeout_with_partial_evidence(tmp_path): - """After timeout, proceed with partial evidence (missing debaters noted in transcript).""" - r, q, client = _debater_round(tmp_path) - # Only one debater replies - client.comments.append({ - "id": "r1", - "content": f"[{DEBATER_NAMES[0]} response]: My evidence.", - "author_id": "agent-0", - "author_type": "agent", - "created_at": _utcnow(), - }) - # Simulate timeout by setting phase_entered_at far in the past - r.phase_entered_at = _past(700) - q.save() - - cfg = FakeConfig() - _advance_awaiting_debaters(r, client, q, cfg, _logger) - - assert r.phase == "awaiting_judge" - judge_comment = client.posted_comments[-1] - assert "timed out" in judge_comment or "no response" in judge_comment - - -# --------------------------------------------------------------------------- -# _advance_awaiting_judge — verdict handling -# --------------------------------------------------------------------------- - -def _judge_round(tmp_path: Path, *, phase_entered_at: str = "") -> tuple[Round, DebateQueue, FakeClient]: - r = _make_round( - status="running", - phase="awaiting_judge", - phase_entered_at=phase_entered_at or _utcnow(), - coordinator_comment_id="coord-comment", - judge_comment_id="judge-comment", +def test_advance_awaiting_rubric_errors_on_malformed_yaml(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_rubric", status="running", + meta_judge_comment_id="meta-c", phase_entered_at=_utcnow(), ) - q = _make_queue(tmp_path, r) - client = FakeClient() - client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}] client.comments = [ - {"id": "judge-comment", "content": "Verdict requested", - "author_id": "coord", "author_type": "member", "created_at": _past(5)}, + {"id": "meta-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + {"id": "reply", "author_id": "agent-meta", "content": "```yaml\nnot: valid: nested\n```", "created_at": _utcnow()}, ] - return r, q, client - - -def test_advance_judge_waits_when_no_verdict(tmp_path): - r, q, client = _judge_round(tmp_path) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.phase == "awaiting_judge" - assert r.status == "running" - assert not client.status_updates - - -def test_advance_judge_accept_updates_issue_status_to_done(tmp_path): - r, q, client = _judge_round(tmp_path) - client.comments.append({ - "id": "verdict1", - "content": "VERDICT: ACCEPT\n\nLooks good.", - "author_id": "agent-judge", - "author_type": "agent", - "created_at": _utcnow(), - }) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "accepted" - assert client.status_updates == [("issue-1", "done")] - - -def test_advance_judge_reject_updates_issue_status_to_in_progress(tmp_path): - r, q, client = _judge_round(tmp_path) - client.comments.append({ - "id": "verdict1", - "content": "VERDICT: REJECT\n\nMissing tests.", - "author_id": "agent-judge", - "author_type": "agent", - "created_at": _utcnow(), - }) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "rejected" - assert client.status_updates == [("issue-1", "in_progress")] - - -def test_advance_judge_timeout_marks_error(tmp_path): - """Judge timeout: round → error, issue left in_review for human escalation.""" - r, q, client = _judge_round(tmp_path, phase_entered_at=_past(700)) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "error" + _advance_awaiting_rubric(r, client, q, FakeConfig(), _logger) assert r.phase == "error" - # Issue status must NOT be changed — leave in_review for humans - assert not client.status_updates + assert r.status == "error" + + +def test_advance_awaiting_rubric_moves_to_awaiting_judges_on_valid_rubric(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_rubric", status="running", + meta_judge_comment_id="meta-c", phase_entered_at=_utcnow(), + ) + client = FakeClient() + client.comments = [ + {"id": "meta-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + {"id": "reply", "author_id": "agent-meta", "content": f"```yaml\n{_rubric_yaml_sample()}\n```", "created_at": _utcnow()}, + ] + _advance_awaiting_rubric(r, client, q, FakeConfig(), _logger) + assert r.phase == "awaiting_judges" + assert r.rubric_yaml != "" + assert any("mention://agent/agent-gpt" in c for c in client.posted_comments) + assert any("mention://agent/agent-claude" in c for c in client.posted_comments) + assert any("mention://agent/agent-gemini" in c for c in client.posted_comments) + + +def test_advance_awaiting_rubric_errors_on_timeout_without_reply(tmp_path): + cfg = FakeConfig() + r, q = _make_round( + tmp_path, phase="awaiting_rubric", status="running", + meta_judge_comment_id="meta-c", phase_entered_at=_past(cfg.round_timeout_s + 5), + ) + client = FakeClient() + client.comments = [{"id": "meta-c", "author_id": "coord", "content": "mention", "created_at": _past(cfg.round_timeout_s + 5)}] + _advance_awaiting_rubric(r, client, q, cfg, _logger) + assert r.phase == "error" + assert r.status == "error" # --------------------------------------------------------------------------- -# Race condition: issue status before round done (CRITICAL) +# _advance_awaiting_judges # --------------------------------------------------------------------------- -def test_issue_status_updated_before_round_marked_done(tmp_path): - """RACE FIX: client.update_issue_status MUST precede queue.update_status('done').""" +def test_advance_awaiting_judges_waits_when_missing_reports(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_judges", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_utcnow(), + ) + client = FakeClient() + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + _reply_comment("agent-gpt", _judge_report(3.5, {"Correctness": 4})), + ] + _advance_awaiting_judges(r, client, q, FakeConfig(), _logger) + assert r.phase == "awaiting_judges" + + +def test_advance_awaiting_judges_accepts_on_consensus(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_judges", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_utcnow(), + ) + client = FakeClient() + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + _reply_comment("agent-gpt", _judge_report(4.0, {"Correctness": 4})), + _reply_comment("agent-claude", _judge_report(4.1, {"Correctness": 4})), + _reply_comment("agent-gemini", _judge_report(4.2, {"Correctness": 4})), + ] + _advance_awaiting_judges(r, client, q, FakeConfig(), _logger) + assert r.phase == "accepted" + assert r.status == "done" + assert client.issue["status"] == "done" + + +def test_advance_awaiting_judges_rejects_on_consensus_low(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_judges", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_utcnow(), + ) + client = FakeClient() + client.issue["assignee_type"] = "agent" + client.issue["assignee_id"] = "agent-worker" + client.agents.append({"name": "Worker", "id": "agent-worker"}) + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + _reply_comment("agent-gpt", _judge_report(2.0, {"Correctness": 2})), + _reply_comment("agent-claude", _judge_report(2.2, {"Correctness": 2})), + _reply_comment("agent-gemini", _judge_report(2.1, {"Correctness": 2})), + ] + _advance_awaiting_judges(r, client, q, FakeConfig(), _logger) + assert r.phase == "rejected" + assert r.status == "done" + assert client.issue["status"] == "in_progress" + assert any("mention://agent/agent-worker" in c for c in client.posted_comments) + + +def test_advance_awaiting_judges_starts_debate_round_when_spread(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_judges", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_utcnow(), debate_round=0, + ) + client = FakeClient() + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + _reply_comment("agent-gpt", _judge_report(2.0, {"Correctness": 2})), + _reply_comment("agent-claude", _judge_report(4.0, {"Correctness": 4})), + _reply_comment("agent-gemini", _judge_report(3.0, {"Correctness": 3})), + ] + _advance_awaiting_judges(r, client, q, FakeConfig(), _logger) + assert r.phase == "awaiting_debate" + assert r.debate_round == 1 + + +def test_advance_awaiting_judges_errors_when_no_parseable(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_judges", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_past(FakeConfig.round_timeout_s + 5), + ) + client = FakeClient() + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(FakeConfig.round_timeout_s + 5)}, + _reply_comment("agent-gpt", "not yaml at all"), + _reply_comment("agent-claude", "also not yaml"), + _reply_comment("agent-gemini", "garbage"), + ] + _advance_awaiting_judges(r, client, q, FakeConfig(), _logger) + assert r.phase == "error" + assert r.status == "error" + + +# --------------------------------------------------------------------------- +# Debate round cap +# --------------------------------------------------------------------------- + +def test_awaiting_debate_errors_out_at_cap(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_debate", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_utcnow(), + debate_round=MAX_DEBATE_ROUNDS, + ) + client = FakeClient() + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + _reply_comment("agent-gpt", _judge_report(2.0, {"Correctness": 2})), + _reply_comment("agent-claude", _judge_report(4.0, {"Correctness": 4})), + _reply_comment("agent-gemini", _judge_report(3.0, {"Correctness": 3})), + ] + _advance_awaiting_debate(r, client, q, FakeConfig(), _logger) + assert r.phase == "error" + assert r.status == "error" + + +# --------------------------------------------------------------------------- +# Race fix: issue status moves BEFORE round marked done +# --------------------------------------------------------------------------- + +def test_apply_verdict_updates_issue_before_marking_round_done(tmp_path): call_order: list[str] = [] class TrackingClient(FakeClient): @@ -551,440 +617,78 @@ def test_issue_status_updated_before_round_marked_done(tmp_path): call_order.append(f"round:{status}") super().update_status(round_id, status) - r = _make_round( - status="running", - phase="awaiting_judge", + r = Round( + round_id="r1", issue_id="issue-1", identifier="WYL-X", title="t", + enqueued_at=_utcnow(), status="running", phase="awaiting_judges", phase_entered_at=_utcnow(), - coordinator_comment_id="coord-comment", - judge_comment_id="judge-comment", ) q = TrackingQueue.load(tmp_path / "queue.json") q.rounds.append(r) q.save() - client = TrackingClient() - client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}] - client.comments = [ - {"id": "judge-comment", "content": "x", "author_id": "coord", - "created_at": _past(5)}, - {"id": "v1", "content": "VERDICT: ACCEPT\n\nAll good.", - "author_id": "agent-judge", "author_type": "agent", "created_at": _utcnow()}, - ] - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) + _apply_verdict(r, client, q, "ACCEPT", "VERDICT: ACCEPT\nScore 4.0", _logger) - # Both calls must have happened assert "issue:done" in call_order assert "round:done" in call_order - # Issue status MUST precede round-done - assert call_order.index("issue:done") < call_order.index("round:done"), ( - f"Expected issue:done before round:done, got order: {call_order}" - ) - - -def test_round_not_marked_done_if_issue_update_fails(tmp_path): - """If issue status update fails, don't mark round done (retry next cycle).""" - - class FailingClient(FakeClient): - def update_issue_status(self, issue_id, status): - raise RuntimeError("network error") - - r = _make_round( - status="running", - phase="awaiting_judge", - phase_entered_at=_utcnow(), - coordinator_comment_id="coord-comment", - judge_comment_id="judge-comment", - ) - q = _make_queue(tmp_path, r) - - client = FailingClient() - client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}] - client.comments = [ - {"id": "judge-comment", "content": "x", "author_id": "coord", "created_at": _past(5)}, - {"id": "v1", "content": "VERDICT: ACCEPT", "author_id": "agent-judge", - "created_at": _utcnow()}, - ] - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - # Round must remain running for retry - assert r.status == "running" + assert call_order.index("issue:done") < call_order.index("round:done") # --------------------------------------------------------------------------- -# Phase transitions: full cycle +# _post_rejection_retrigger corner cases # --------------------------------------------------------------------------- -def test_full_phase_cycle(tmp_path): - """pending → running/awaiting_debaters → awaiting_judge → accepted/done.""" - r = _make_round() - q = _make_queue(tmp_path, r) - - # Step 1: start_round +def test_reject_retrigger_skipped_for_member_assignee(tmp_path): + r, q = _make_round(tmp_path) client = FakeClient() - client.agents = [ - *[{"name": n, "id": f"agent-{i}"} for i, n in enumerate(DEBATER_NAMES)], - {"name": JUDGE_NAME, "id": "agent-judge"}, - ] - _start_round(r, client, q, FakeConfig(), _logger) - assert r.status == "running" - assert r.phase == "awaiting_debaters" - coord_cid = r.coordinator_comment_id - - # Step 2: debaters reply - for i, name in enumerate(DEBATER_NAMES): - client.comments.append({ - "id": f"reply-{i}", "content": f"[{name} response]: Evidence.", - "author_id": f"agent-{i}", "author_type": "agent", "created_at": _utcnow(), - }) - _advance_awaiting_debaters(r, client, q, FakeConfig(), _logger) - assert r.phase == "awaiting_judge" - judge_cid = r.judge_comment_id - - # Step 3: judge replies - client.comments.append({ - "id": "verdict1", "content": "VERDICT: ACCEPT\n\nShipped.", - "author_id": "agent-judge", "author_type": "agent", "created_at": _utcnow(), - }) - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - assert r.phase == "accepted" - assert r.status == "done" - assert client.issue["status"] == "done" + client.issue["assignee_type"] = "member" + client.issue["assignee_id"] = "user-1" + cid = _post_rejection_retrigger(r, client, client.issue, "VERDICT: REJECT", _logger) + assert cid is None + assert len(client.posted_comments) == 1 + assert "mention://" not in client.posted_comments[0] + assert "Manual follow-up" in client.posted_comments[0] -# --------------------------------------------------------------------------- -# Restart resume: in-flight rounds resume from correct phase -# --------------------------------------------------------------------------- - -def test_restart_resumes_awaiting_debaters(tmp_path): - """On restart, a running/awaiting_debaters round picks up without re-posting comment.""" - r = _make_round( - status="running", - phase="awaiting_debaters", - phase_entered_at=_utcnow(), - coordinator_comment_id="existing-coord-comment", - ) - q = _make_queue(tmp_path, r) - +def test_reject_retrigger_skipped_when_no_assignee(tmp_path): + r, q = _make_round(tmp_path) client = FakeClient() - client.comments = [ - {"id": "existing-coord-comment", "content": "Debate opened", - "author_id": "coord", "created_at": _past(60)}, - ] - - orchestrate_pending(q, FakeConfig(), _logger, client=client) - - # Must NOT post another debater comment - assert all("Debate round opened" not in c for c in client.posted_comments) - # Phase should still be awaiting_debaters (no replies) - assert r.phase == "awaiting_debaters" + client.issue["assignee_type"] = None + client.issue["assignee_id"] = None + cid = _post_rejection_retrigger(r, client, client.issue, "VERDICT: REJECT", _logger) + assert cid is None -def test_restart_resumes_awaiting_judge(tmp_path): - """On restart, a running/awaiting_judge round resumes without re-posting judge comment.""" - r = _make_round( - status="running", - phase="awaiting_judge", - phase_entered_at=_utcnow(), - coordinator_comment_id="coord-c", - judge_comment_id="judge-c", - ) - q = _make_queue(tmp_path, r) - +def test_reject_retrigger_includes_verbatim_description(tmp_path): + r, q = _make_round(tmp_path) client = FakeClient() - client.comments = [ - {"id": "judge-c", "content": "Verdict requested", - "author_id": "coord", "created_at": _past(30)}, - ] - - orchestrate_pending(q, FakeConfig(), _logger, client=client) - - # Must NOT post another judge comment - assert all("Verdict requested" not in c for c in client.posted_comments) - assert r.phase == "awaiting_judge" - - -# --------------------------------------------------------------------------- -# orchestrate_pending: pending rounds are started, running rounds advanced -# --------------------------------------------------------------------------- - -def test_orchestrate_pending_starts_pending(tmp_path): - r = _make_round() - q = _make_queue(tmp_path, r) - client = FakeClient() - - orchestrate_pending(q, FakeConfig(), _logger, client=client) - - assert r.status == "running" - assert any("Debate round opened" in c for c in client.posted_comments) - - -def test_orchestrate_pending_advances_running(tmp_path): - """Running/awaiting_judge round with a verdict is completed.""" - r = _make_round( - status="running", - phase="awaiting_judge", - phase_entered_at=_utcnow(), - coordinator_comment_id="coord-c", - judge_comment_id="judge-c", - ) - q = _make_queue(tmp_path, r) - - client = FakeClient() - client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}] - client.comments = [ - {"id": "judge-c", "content": "x", "author_id": "coord", "created_at": _past(10)}, - {"id": "v1", "content": "VERDICT: REJECT\n\nNeeds work.", - "author_id": "agent-judge", "created_at": _utcnow()}, - ] - - orchestrate_pending(q, FakeConfig(), _logger, client=client) - - assert r.status == "done" - assert r.phase == "rejected" - - -# --------------------------------------------------------------------------- -# _advance_awaiting_judge — REJECT retrigger + anchor (WYL-51) -# --------------------------------------------------------------------------- - -def _judge_round_with_agent_assignee( - tmp_path: Path, - *, - assignee_id: str = "agent-worker", - assignee_name: str = "Senior Developer", - description: str = "Test issue description.", -) -> tuple[Round, DebateQueue, FakeClient]: - """_judge_round helper pre-configured with an agent assignee.""" - r, q, client = _judge_round(tmp_path) client.issue["assignee_type"] = "agent" - client.issue["assignee_id"] = assignee_id - client.issue["description"] = description - client.agents.append({"name": assignee_name, "id": assignee_id}) - return r, q, client - - -def _verdict_comment(content: str = "VERDICT: REJECT\n\nDoes not satisfy requirements.") -> dict: - return { - "id": "verdict1", - "content": content, - "author_id": "agent-judge", - "author_type": "agent", - "created_at": _utcnow(), - } - - -# D1 -def test_reject_posts_retrigger_with_assignee_mention(tmp_path): - """REJECT path: a retrigger comment is posted that @-mentions the agent assignee.""" - r, q, client = _judge_round_with_agent_assignee(tmp_path) - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "rejected" - retrigger = next( - (c for c in client.posted_comments if "mention://agent/agent-worker" in c), - None, - ) - assert retrigger is not None, "Expected retrigger comment with @-mention of assignee" - assert "Senior Developer" in retrigger - assert "REJECT" in retrigger - - -# D2 -def test_reject_retrigger_includes_verbatim_original_description(tmp_path): - """REJECT retrigger: full verbatim issue description appears in comment body.""" - desc = ( - "First paragraph of specific requirements.\n\n" - "Second paragraph with unique detail: abc-xyz-123.\n\n" - "Third paragraph with acceptance criteria: must-pass-qa-check." - ) - r, q, client = _judge_round_with_agent_assignee(tmp_path, description=desc) - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - retrigger = next( - (c for c in client.posted_comments if "mention://agent/agent-worker" in c), - None, - ) - assert retrigger is not None - # Every non-empty line of the description must appear verbatim + client.issue["assignee_id"] = "agent-worker" + client.agents.append({"name": "Worker", "id": "agent-worker"}) + desc = "Line A.\n\nLine B with unique-marker-123.\n\nLine C." + client.issue["description"] = desc + cid = _post_rejection_retrigger(r, client, client.issue, "VERDICT: REJECT", _logger) + assert cid is not None + body = client.posted_comments[-1] for line in desc.splitlines(): if line.strip(): - assert line in retrigger, ( - f"Line {line!r} not found verbatim in retrigger comment" - ) - - -# D3 -def test_reject_retrigger_includes_no_drift_instruction(tmp_path): - """REJECT retrigger: REWORK_INSTRUCTIONS constant appears verbatim in comment.""" - r, q, client = _judge_round_with_agent_assignee(tmp_path) - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - retrigger = next( - (c for c in client.posted_comments if "mention://agent/agent-worker" in c), - None, - ) - assert retrigger is not None - assert REWORK_INSTRUCTIONS in retrigger, ( - "REWORK_INSTRUCTIONS constant must appear verbatim in retrigger comment" - ) - assert "Do NOT redefine scope" in retrigger - - -# D4 -def test_reject_retrigger_skipped_if_assignee_is_member(tmp_path): - """REJECT: member assignee → non-mentioning coordinator comment, no @-mention.""" - r, q, client = _judge_round(tmp_path) - client.issue["assignee_type"] = "member" - client.issue["assignee_id"] = "user-123" - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "rejected" - # A coordinator comment IS posted (for audit), but without any @-mention - assert len(client.posted_comments) == 1 - assert "mention://" not in client.posted_comments[0] - assert "Manual follow-up required" in client.posted_comments[0] - - -# D5 -def test_reject_retrigger_skipped_if_no_assignee(tmp_path): - """REJECT: no assignee → non-mentioning coordinator comment, no @-mention.""" - r, q, client = _judge_round(tmp_path) - # Default FakeClient issue has no assignee fields - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "rejected" - assert len(client.posted_comments) == 1 - assert "mention://" not in client.posted_comments[0] - assert "Manual follow-up required" in client.posted_comments[0] - - -# D6 -def test_accept_does_not_post_retrigger(tmp_path): - """ACCEPT path: no retrigger comment is posted.""" - r, q, client = _judge_round_with_agent_assignee(tmp_path) - client.comments.append(_verdict_comment("VERDICT: ACCEPT\n\nExcellent work!")) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "accepted" - assert all("mention://agent/agent-worker" not in c for c in client.posted_comments) - - -# D7 -def test_retrigger_comment_id_persisted_on_round(tmp_path): - """After REJECT, round.retrigger_comment_id is persisted to queue.json.""" - r, q, client = _judge_round_with_agent_assignee(tmp_path) - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.retrigger_comment_id, "retrigger_comment_id should be set on the round" - - # Reload from disk and verify persistence - q2 = DebateQueue.load(tmp_path / "queue.json") - r2 = q2.rounds[0] - assert r2.retrigger_comment_id == r.retrigger_comment_id, ( - "retrigger_comment_id must survive a queue reload" - ) - - -# D8 -def test_retrigger_happens_after_status_update_not_before(tmp_path): - """Race regression: issue:in_progress must precede the retrigger post_comment.""" - call_order: list[str] = [] - - class TrackingClient(FakeClient): - def update_issue_status(self, issue_id, status): - call_order.append(f"issue:{status}") - return super().update_issue_status(issue_id, status) - - def post_comment(self, issue_id, content): - call_order.append("post_comment") - return super().post_comment(issue_id, content) - - r = _make_round( - status="running", - phase="awaiting_judge", - phase_entered_at=_utcnow(), - coordinator_comment_id="coord-c", - judge_comment_id="judge-c", - ) - q = _make_queue(tmp_path, r) - - client = TrackingClient() - client.issue["assignee_type"] = "agent" - client.issue["assignee_id"] = "agent-worker" - client.agents = [ - {"name": JUDGE_NAME, "id": "agent-judge"}, - {"name": "Senior Developer", "id": "agent-worker"}, - ] - client.comments = [ - {"id": "judge-c", "content": "x", "author_id": "coord", "created_at": _past(5)}, - {"id": "v1", "content": "VERDICT: REJECT\n\nFailed.", "author_id": "agent-judge", - "author_type": "agent", "created_at": _utcnow()}, - ] - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert "issue:in_progress" in call_order, "issue status update must happen" - assert "post_comment" in call_order, "retrigger post_comment must happen" - assert call_order.index("issue:in_progress") < call_order.index("post_comment"), ( - f"issue:in_progress must precede post_comment, got: {call_order}" - ) + assert line in body + assert REWORK_INSTRUCTIONS in body # --------------------------------------------------------------------------- -# DEBATER_PROMPTS scope-swap clause (WYL-60) +# _advance_round dispatch # --------------------------------------------------------------------------- -def test_debater_prompts_contain_scope_swap_clause(): - """Every entry in DEBATER_PROMPTS must contain the scope-swap detection clause.""" - for name in DEBATER_NAMES: - assert name in DEBATER_PROMPTS, f"DEBATER_PROMPTS missing entry for {name!r}" - assert "SCOPE SWAP" in DEBATER_PROMPTS[name], ( - f"DEBATER_PROMPTS[{name!r}] is missing the scope-swap detection clause" - ) +def test_advance_round_dispatches_convened_to_start(tmp_path): + r, q = _make_round(tmp_path, phase="convened", status="pending") + client = FakeClient() + _advance_round(r, client, q, FakeConfig(), _logger) + assert r.phase == "awaiting_rubric" -# --------------------------------------------------------------------------- -# (end of WYL-60 tests) -# --------------------------------------------------------------------------- - -def test_reject_retrigger_failure_does_not_block_round_completion(tmp_path): - """REJECT retrigger: round completes even if posting the retrigger comment fails.""" - r, q, client = _judge_round_with_agent_assignee(tmp_path) - - original_post = FakeClient.post_comment - call_count = [0] - - def post_with_failure(self, issue_id, content): - call_count[0] += 1 - if call_count[0] > 1: - raise RuntimeError("API error on retrigger") - return original_post(self, issue_id, content) - - FakeClient.post_comment = post_with_failure - try: - client.comments.append(_verdict_comment()) - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - assert r.status == "done", "Round must complete even if retrigger posting fails" - assert r.phase == "rejected" - finally: - FakeClient.post_comment = original_post +def test_advance_round_corrects_terminal_phase_with_wrong_status(tmp_path): + r, q = _make_round(tmp_path, phase="accepted", status="running") + client = FakeClient() + _advance_round(r, client, q, FakeConfig(), _logger) + assert r.status == "done"