diff --git a/pyproject.toml b/pyproject.toml index 7437cfc..d1380dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,9 +5,10 @@ description = "Tool-MAD middle-management layer for multica. Watches for in_revi readme = "README.md" requires-python = ">=3.11" dependencies = [ - # v1 intentionally depends only on stdlib + requests. No async, no frameworks. - # If this list grows past 3 items before a working v1 is shipped, something is wrong. + # v1 intentionally depends only on stdlib + requests + pyyaml. No async, no frameworks. + # If this list grows past 4 items before a working v1 is shipped, something is wrong. "requests>=2.32", + "pyyaml>=6.0", ] [project.scripts] diff --git a/src/coordinator/orchestrator.py b/src/coordinator/orchestrator.py index 887ebfc..925d6fb 100644 --- a/src/coordinator/orchestrator.py +++ b/src/coordinator/orchestrator.py @@ -1,25 +1,45 @@ -"""Debate round orchestration (WYL-45). +"""Review-round orchestration — CEK meta-judge + 3-judge consensus with optional debate rounds. + +The pipeline translates CEK's ``sadd:judge-with-debate`` shape to multica-native execution. +All prescriptive content (how meta-judge writes rubrics, how judges score) lives in the +meta-judge.md and judge.md files uploaded as each agent's ``instructions`` field. This +module only plumbs the flow: posting mentions, collecting replies, parsing structured +output, and actioning verdicts. Round lifecycle --------------- - pending → _start_round() → running / phase=awaiting_debaters - awaiting_debaters → _advance_awaiting_debaters() - all replied (or timeout) → phase=awaiting_judge - awaiting_judge → _advance_awaiting_judge() - verdict received → issue status updated FIRST, - then round marked done/phase=accepted|rejected - timeout → round marked error, issue left in_review + pending → _start_round() + posts meta-judge mention comment → phase=awaiting_rubric -Restart safety --------------- - DebateQueue persists phase + comment IDs on every mutation. On restart, - running rounds resume from the correct phase without re-posting comments. + awaiting_rubric → _advance_awaiting_rubric() + meta-judge replied → extract YAML rubric → post 3 judge mention comments + → phase=awaiting_judges (debate_round=0) -Race fix (WYL-44 note) ----------------------- - In _advance_awaiting_judge, client.update_issue_status() is called BEFORE - queue.update_status("done"). This guarantees poll_once cannot see - (is_issue_pending_or_running=False AND issue=in_review) at the same time. + awaiting_judges → _advance_awaiting_judges() + all 3 judge reports in (or timeout with partial) → parse scores + → consensus check + converged + avg≥3 → _apply_verdict(ACCEPT) + converged + avg<3 → _apply_verdict(REJECT) + not converged → post debate round mention comment + → phase=awaiting_debate (debate_round=1) + + awaiting_debate → _advance_awaiting_debate() + all 3 revised reports in → consensus check again + converged → _apply_verdict + not converged AND debate_round < MAX_DEBATE_ROUNDS + → post next debate round → debate_round+=1 + not converged AND debate_round == MAX_DEBATE_ROUNDS + → phase=error, leave issue in_review for human + + On any parse failure (meta-judge YAML malformed, judge reports all unparseable): + → phase=error (per higher management direction: we do not deal with a + model that cannot perform YAML tasks; we escalate) + +Race fix invariant +------------------ + _apply_verdict calls client.update_issue_status() BEFORE queue.update_status("done"), + and retrigger comments are posted AFTER the issue status has moved out of in_review. + This guarantees poll_once cannot see (round done AND issue=in_review) simultaneously. """ from __future__ import annotations @@ -28,111 +48,32 @@ import re from datetime import datetime, timezone from typing import Any +import yaml + from coordinator.config import Config from coordinator.multica_client import MulticaClient from coordinator.queue import DebateQueue, Round + # --------------------------------------------------------------------------- -# Hardcoded v1 constants +# Hardcoded pipeline constants # --------------------------------------------------------------------------- -DEBATER_NAMES: list[str] = [ - "Senior Developer", - "Code Reviewer", - "AI Engineer", - "Project Manager Senior", -] +META_JUDGE_NAME: str = "Meta-Judge" +JUDGE_NAMES: list[str] = ["Judge-GPT", "Judge-Claude", "Judge-Gemini"] -JUDGE_NAME: str = "Reality Checker" +# Consensus math (from CEK judge-with-debate/SKILL.md lines 264-270): +# "all three overall scores within 0.5 of each other AND each per-criterion score +# within 1.0 across judges → consensus" +CONSENSUS_OVERALL_THRESHOLD: float = 0.5 +CONSENSUS_CRITERION_THRESHOLD: float = 1.0 -# Per-debater evidence prompts. -# Placeholders: {issue_title}, {issue_description}, {commit_url} -# Each prompt tells the debater exactly which tool output to ground its argument in. -_NO_MENTION_RULE: str = ( - "**Important:** Do not @-mention any other agent or person in your reply. " - "If you need to refer to another debater, use their plain name only " - "(e.g. \"Code Reviewer\" not \"@Code Reviewer\"). " - "Mentions cause unwanted cascade tasks." -) +# Accept if consensus average is ≥ 3.0 out of 5 (CEK default). +ACCEPT_MIN_SCORE: float = 3.0 -_SCOPE_SWAP_CLAUSE: str = ( - "Then, independently of your answer above, re-read the original issue description " - "(provided in this comment under **Description (excerpt)**) and judge: does the " - "committed work actually address what was asked, or did it answer a different " - "(easier) question? Specifically look for substitutions — e.g. the original asks " - "for X but the code delivers X-prime, a simpler version. If you detect a scope " - "swap, state it clearly: SCOPE SWAP DETECTED — [one sentence explaining what was " - "asked vs what was delivered]. Recommend REJECT on this basis alone, regardless " - "of how well the delivered work satisfies the substituted scope." -) +# Debate rounds cap (CEK default). +MAX_DEBATE_ROUNDS: int = 3 -DEBATER_PROMPTS: dict[str, str] = { - "Senior Developer": ( - "**Your task:** grep the committed code for LLM API calls " - "(`openai`, `anthropic`, `requests.post` to AI endpoints, etc.). Report:\n" - "1. Yes/no — does the code make LLM API calls?\n" - "2. The exact grep command you ran.\n" - "3. The full grep output (or 'no matches').\n\n" - "Max 300 words.\n\n" - + _SCOPE_SWAP_CLAUSE + "\n\n" - + _NO_MENTION_RULE - ), - "Code Reviewer": ( - "**Your task:** diff the committed implementation method-by-method against " - "every requirement in the issue description. For each requirement: state it, " - "then label MET or MISSING with one line of evidence from the diff. " - "Overall confidence: high / medium / low.\n\n" - "Max 300 words.\n\n" - + _SCOPE_SWAP_CLAUSE + "\n\n" - + _NO_MENTION_RULE - ), - "AI Engineer": ( - "**Your task:** ground-check every factual claim in the issue description " - "against the committed code. For each claim: state it, then verify or refute " - "with a direct code reference. Label each VERIFIED or UNVERIFIED.\n\n" - "Max 300 words.\n\n" - + _SCOPE_SWAP_CLAUSE + "\n\n" - + _NO_MENTION_RULE - ), - "Project Manager Senior": ( - "**Your task:** check the committed work against the original acceptance " - "criteria. For each criterion: satisfied (with evidence) or not met. " - "State your recommendation: ACCEPT or REJECT.\n\n" - "Max 300 words.\n\n" - + _SCOPE_SWAP_CLAUSE + "\n\n" - + _NO_MENTION_RULE - ), -} - -JUDGE_PROMPT_TEMPLATE: str = """\ -**Verdict requested — {issue_title}** - -**Original issue:** -{issue_description} - -**Commit:** {commit_url} - ---- -## Debater evidence - -{transcript} ---- - -Deliver your verdict. Begin your reply with exactly one of: -- `VERDICT: ACCEPT` — implementation satisfies all critical acceptance criteria -- `VERDICT: REJECT` — it does not - -Follow with reasoning (max 400 words). - -**Important:** Do not @-mention any agent or person in your reply. Use plain names only.\ -""" - -_COMMIT_URL_RE = re.compile( - r"https?://\S+/commit/[0-9a-f]+", re.IGNORECASE -) -_VERDICT_RE = re.compile( - r"^VERDICT:\s*(ACCEPT|REJECT)", re.MULTILINE | re.IGNORECASE -) # --------------------------------------------------------------------------- # Requirement anchoring — the single most important coordinator property. @@ -157,7 +98,15 @@ REWORK_INSTRUCTIONS = ( # --------------------------------------------------------------------------- -# Internal helpers +# Regexes +# --------------------------------------------------------------------------- + +_COMMIT_URL_RE = re.compile(r"https?://\S+/commit/[0-9a-f]+", re.IGNORECASE) +_YAML_FENCE_RE = re.compile(r"```(?:yaml|yml)?\s*\n(.*?)\n```", re.DOTALL | re.IGNORECASE) + + +# --------------------------------------------------------------------------- +# Small helpers # --------------------------------------------------------------------------- def _utcnow() -> str: @@ -187,10 +136,9 @@ def _find_commit_url(comments: list[dict[str, Any]]) -> str: return "" -def _parse_verdict(content: str) -> str | None: - """Return 'ACCEPT', 'REJECT', or None.""" - m = _VERDICT_RE.search(content) - return m.group(1).upper() if m else None +def _to_blockquote(text: str) -> str: + """Wrap each line of text with markdown blockquote prefix.""" + return "\n".join(f"> {line}" if line else ">" for line in text.split("\n")) def _cutoff_iso(comments: list[dict[str, Any]], after_comment_id: str) -> str: @@ -201,115 +149,282 @@ def _cutoff_iso(comments: list[dict[str, Any]], after_comment_id: str) -> str: return "" -def _collect_debater_replies( +def _find_reply_by_agent( comments: list[dict[str, Any]], - agent_map: dict[str, str], # name → agent_id + agent_id: str, after_comment_id: str, -) -> dict[str, str]: - """Return {debater_name: reply_content} for debaters who replied. - - Detection (in priority order): - 1. comment.author_id matches a debater's agent_id - 2. comment content starts with ``[{name} response]:`` (fallback for testing) - - Only comments posted *after* after_comment_id are considered. - """ - id_to_name: dict[str, str] = {v: k for k, v in agent_map.items()} +) -> tuple[str, str] | None: + """Return (comment_id, content) of the first reply by ``agent_id`` posted + strictly after ``after_comment_id``. None if no such reply exists.""" + if not agent_id: + return None cutoff = _cutoff_iso(comments, after_comment_id) - replies: dict[str, str] = {} - - for comment in comments: - # Skip the coordinator's own mention comment and anything before it. - if comment.get("id") == after_comment_id: - continue - created = comment.get("created_at", "") - if cutoff and created < cutoff: - continue - - content = comment.get("content", "") - author_id = comment.get("author_id", "") - - # Primary: agent ID match - if author_id in id_to_name: - name = id_to_name[author_id] - if name not in replies: - replies[name] = content - continue - - # Fallback: content-based marker (enables integration / smoke tests) - for name in DEBATER_NAMES: - marker = f"[{name} response]:".lower() - if content.strip().lower().startswith(marker) and name not in replies: - replies[name] = content - - return replies - - -def _collect_judge_reply( - comments: list[dict[str, Any]], - judge_agent_id: str, - after_comment_id: str, -) -> str | None: - """Return judge's comment content if found after after_comment_id, else None. - - Fallback: any comment containing ``VERDICT: ACCEPT|REJECT`` is accepted - (enables smoke / integration tests where the judge is impersonated). - """ - cutoff = _cutoff_iso(comments, after_comment_id) - for comment in comments: if comment.get("id") == after_comment_id: continue created = comment.get("created_at", "") if cutoff and created < cutoff: continue - - content = comment.get("content", "") - author_id = comment.get("author_id", "") - - if judge_agent_id and author_id == judge_agent_id: - return content - - # Fallback: content-based verdict detection - if _parse_verdict(content) is not None: - return content - + if comment.get("author_id") == agent_id: + return (comment.get("id", ""), comment.get("content", "")) return None -def _build_debater_comment( - agent_map: dict[str, str], +# --------------------------------------------------------------------------- +# YAML extraction and parsing +# --------------------------------------------------------------------------- + +def _extract_yaml(content: str) -> str: + """Pull the YAML block out of a reply. Prefers fenced code blocks. + + Returns the YAML text (without fences), or the original content if no fence + is found. The caller is responsible for deciding whether the raw content + is parseable. + """ + m = _YAML_FENCE_RE.search(content) + if m: + return m.group(1).strip() + return content.strip() + + +def _parse_rubric(content: str) -> dict[str, Any] | None: + """Parse meta-judge's reply into the rubric+checklist dict. + + Returns None on parse failure or if the parsed object does not have the + expected shape (at minimum a ``checklist`` or ``rubric_dimensions`` key). + """ + yaml_text = _extract_yaml(content) + try: + parsed = yaml.safe_load(yaml_text) + except yaml.YAMLError: + return None + if not isinstance(parsed, dict): + return None + # Accept either a wrapped form with evaluation_specification or a flat form + spec = parsed.get("evaluation_specification", parsed) + if not isinstance(spec, dict): + return None + if "checklist" not in spec and "rubric_dimensions" not in spec: + return None + return spec + + +def _parse_judge_report(content: str) -> dict[str, Any] | None: + """Parse a judge's evaluation report YAML into a dict. + + Returns None on parse failure or if the expected scoring keys are absent. + Expected shape (from CEK judge.md Expected Output section) includes either + ``evaluation_report`` wrapping or a flat form with ``score_calculation`` and + ``rubric_scores``. + """ + yaml_text = _extract_yaml(content) + try: + parsed = yaml.safe_load(yaml_text) + except yaml.YAMLError: + return None + if not isinstance(parsed, dict): + return None + report = parsed.get("evaluation_report", parsed) + if not isinstance(report, dict): + return None + # Accept the report if it contains at least final_score OR a scored + # rubric_scores list we can average from + score_calc = report.get("score_calculation", {}) or {} + if isinstance(score_calc, dict) and "final_score" in score_calc: + return report + rubric_scores = report.get("rubric_scores", []) or [] + if isinstance(rubric_scores, list) and any(isinstance(s, dict) and "score" in s for s in rubric_scores): + return report + return None + + +def _overall_score(report: dict[str, Any]) -> float | None: + """Extract the overall 0-5 score from a parsed judge report. + + Prefers ``score_calculation.final_score``; falls back to weighted average of + rubric_scores if final_score is missing. Returns None if neither source is + available. + """ + sc = report.get("score_calculation") or {} + if isinstance(sc, dict): + fs = sc.get("final_score") + if isinstance(fs, (int, float)): + return float(fs) + rubric_scores = report.get("rubric_scores") or [] + if isinstance(rubric_scores, list) and rubric_scores: + # Weighted average if weights are present, else plain average + total_weight = 0.0 + weighted = 0.0 + plain = [] + for item in rubric_scores: + if not isinstance(item, dict): + continue + s = item.get("score") + if not isinstance(s, (int, float)): + continue + w = item.get("weight") + if isinstance(w, (int, float)) and w > 0: + weighted += float(s) * float(w) + total_weight += float(w) + plain.append(float(s)) + if total_weight > 0: + return weighted / total_weight + if plain: + return sum(plain) / len(plain) + return None + + +def _criterion_scores(report: dict[str, Any]) -> dict[str, float]: + """Return {criterion_name: score} from a parsed judge report.""" + out: dict[str, float] = {} + for item in report.get("rubric_scores") or []: + if not isinstance(item, dict): + continue + name = item.get("name") or item.get("criterion_name") + s = item.get("score") + if isinstance(name, str) and isinstance(s, (int, float)): + out[name] = float(s) + return out + + +def _check_consensus( + reports: dict[str, dict[str, Any]], +) -> tuple[bool, str | None, float | None]: + """Given parsed reports keyed by judge name, check consensus. + + Returns (converged, verdict, avg_overall). + - converged: True if all overall scores are within CONSENSUS_OVERALL_THRESHOLD + and each per-criterion score is within CONSENSUS_CRITERION_THRESHOLD across + judges. Only criteria present in ALL reports participate in the per-criterion + check. + - verdict: "ACCEPT" if converged and avg_overall >= ACCEPT_MIN_SCORE, else + "REJECT" if converged, else None. + - avg_overall: mean of overall scores across judges with a parseable overall, + or None if no overall could be extracted. + """ + overalls: list[float] = [] + criterion_by_judge: dict[str, dict[str, float]] = {} + for name, report in reports.items(): + ov = _overall_score(report) + if ov is not None: + overalls.append(ov) + criterion_by_judge[name] = _criterion_scores(report) + + if not overalls: + return (False, None, None) + + avg = sum(overalls) / len(overalls) + + overall_converged = (max(overalls) - min(overalls)) <= CONSENSUS_OVERALL_THRESHOLD + + # Per-criterion check: only criteria present in every judge's report + common_criteria: set[str] | None = None + for scores in criterion_by_judge.values(): + keys = set(scores.keys()) + common_criteria = keys if common_criteria is None else (common_criteria & keys) + common_criteria = common_criteria or set() + + criterion_converged = True + for crit in common_criteria: + vals = [criterion_by_judge[name][crit] for name in criterion_by_judge if crit in criterion_by_judge[name]] + if vals and (max(vals) - min(vals)) > CONSENSUS_CRITERION_THRESHOLD: + criterion_converged = False + break + + converged = overall_converged and criterion_converged + if not converged: + return (False, None, avg) + verdict = "ACCEPT" if avg >= ACCEPT_MIN_SCORE else "REJECT" + return (True, verdict, avg) + + +# --------------------------------------------------------------------------- +# Comment builders (no prose authored by the coordinator — just plumbing) +# --------------------------------------------------------------------------- + +def _build_meta_judge_mention( + meta_judge_agent_id: str, + issue_title: str, + issue_description: str, +) -> str: + """Post the initial meta-judge mention comment. + + Contains only the @-mention, the task context that CEK meta-judge expects + (user prompt + artifact type hint), and no direction about WHAT to produce — + meta-judge's own instructions tell it that. + """ + mention = ( + f"[@{META_JUDGE_NAME}](mention://agent/{meta_judge_agent_id})" + if meta_judge_agent_id + else f"@{META_JUDGE_NAME}" + ) + desc_bq = _to_blockquote(issue_description.strip()) + return "\n\n".join([ + mention, + f"**User Prompt:** {issue_title}", + "**Description:**", + desc_bq, + "**Artifact Type:** code + documentation (review-only — a delivered commit will be evaluated against your specification by downstream judges).", + ]) + + +def _build_judge_mention_comment( + judge_agent_ids: dict[str, str], issue_title: str, issue_description: str, commit_url: str, + rubric_yaml: str, ) -> str: - commit_ref = commit_url or "(no commit link found in comments)" + """Post a mention comment that fires all 3 judges in parallel. + + Each judge sees: mentions (of all three), task, commit link, rubric YAML. + No debater prose, no scope-swap clause, no no-mention rule — judge.md + instructions cover those. + """ + mentions = " ".join( + f"[@{name}](mention://agent/{judge_agent_ids[name]})" + if judge_agent_ids.get(name) + else f"@{name}" + for name in JUDGE_NAMES + ) + desc_bq = _to_blockquote(issue_description.strip()) + return "\n\n".join([ + mentions, + f"**Original Task:** {issue_title}", + "**Description:**", + desc_bq, + f"**Commit under review:** {commit_url or '(no commit link found in delivery)'}", + "**Evaluation Specification (from meta-judge):**", + "```yaml", + rubric_yaml.strip(), + "```", + "Apply the specification to the commit. Post your `evaluation_report` YAML as your reply.", + ]) + + +def _build_debate_round_comment( + judge_agent_ids: dict[str, str], + round_num: int, + prior_reports: dict[str, str], +) -> str: + """Post a debate-round mention comment showing all 3 judges each others' prior reports.""" + mentions = " ".join( + f"[@{name}](mention://agent/{judge_agent_ids[name]})" + if judge_agent_ids.get(name) + else f"@{name}" + for name in JUDGE_NAMES + ) parts = [ - f"**Debate round opened — {issue_title}**\n", - f"Commit under review: {commit_ref}\n", - "---", + mentions, + f"**Debate round {round_num}** — your prior reports disagree beyond the consensus threshold.", + "Each of you: read the others' reports below, re-examine the commit, then post a REVISED `evaluation_report` YAML. You may hold your position if you have new evidence; you may move if you find the other reasoning more grounded. Do not split the difference to compromise.", ] - for name in DEBATER_NAMES: - agent_id = agent_map.get(name, "") - mention = ( - f"[@{name}](mention://agent/{agent_id})" if agent_id else f"@{name}" - ) - prompt = DEBATER_PROMPTS.get(name, "Please provide your evidence analysis.") - snippet = issue_description[:800].strip() - parts.append( - f"\n{mention}\n\n" - f"**Issue:** {issue_title}\n" - f"**Description (excerpt):** {snippet}\n" - f"**Commit:** {commit_ref}\n\n" - f"{prompt}" - ) - parts.append("\n---") - return "\n".join(parts) - - -def _to_blockquote(text: str) -> str: - """Wrap each line of text in a Markdown blockquote (``> `` prefix).""" - return "\n".join(f"> {line}" if line else ">" for line in text.splitlines()) + for name in JUDGE_NAMES: + content = prior_reports.get(name, "*(no prior report)*") + parts.append(f"---\n\n### {name} — prior report\n\n{content}") + parts.append("---") + parts.append("Post your revised report as a NEW reply. Do not edit prior reports.") + return "\n\n".join(parts) def _build_retrigger_comment( @@ -319,20 +434,10 @@ def _build_retrigger_comment( verdict_content: str, round_id: str, ) -> str: - """Build the full anchor-bearing retrigger comment body. - - Structure (verbatim — do not rearrange or summarise any section): - 1. @-mention to fire the agent - 2. One-line verdict summary - 3. ## ANCHOR — Original requirements (verbatim description, blockquoted) - 4. ## Why this was rejected (verbatim verdict, blockquoted) - 5. ## Instructions for rework (REWORK_INSTRUCTIONS constant) - 6. Trailing audit line - """ + """Anchor-bearing retrigger comment body — unchanged from the old pipeline.""" mention = f"[@{assignee_name}](mention://agent/{assignee_agent_id})" desc_bq = _to_blockquote(issue_description.strip()) verdict_bq = _to_blockquote(verdict_content.strip()) - return "\n\n".join([ mention, f"Verdict: REJECT (round {round_id})", @@ -344,13 +449,397 @@ def _build_retrigger_comment( def _build_coordinator_note_no_agent(round_id: str, reason: str) -> str: - """Coordinator-only note (no @-mention) when the assignee is not an agent.""" + """Coordinator-only note (no @-mention) when retrigger cannot resolve an assignee.""" return ( f"Verdict: REJECT (round {round_id}) \u2014 {reason}. " "Manual follow-up required." ) +def _build_error_note(round_id: str, reason: str) -> str: + """Posted when the round cannot complete cleanly (malformed output, timeout).""" + return ( + f"Review round {round_id} could not complete: {reason}. " + "Issue remains in `in_review`; human follow-up required. " + "— Coordinator" + ) + + +# --------------------------------------------------------------------------- +# Phase handlers +# --------------------------------------------------------------------------- + +def _start_round( + round_: Round, + client: MulticaClient, + queue: DebateQueue, + cfg: Config, + logger: logging.Logger, +) -> None: + """pending → running, post meta-judge mention, phase → awaiting_rubric.""" + logger.info("starting round %s for %s", round_.round_id, round_.identifier) + queue.update_status(round_.round_id, "running") + + try: + issue = client.get_issue(round_.issue_id) + except Exception as exc: + logger.error("round %s: cannot fetch issue %s: %s", round_.round_id, round_.issue_id, exc) + queue.update_status(round_.round_id, "error") + return + + issue_title = issue.get("title", round_.title) + issue_description = issue.get("description", "") or "" + + try: + agent_map = client.find_agents_by_name([META_JUDGE_NAME]) + except Exception as exc: + logger.error("round %s: cannot look up %s: %s", round_.round_id, META_JUDGE_NAME, exc) + queue.update_status(round_.round_id, "error") + return + + meta_id = agent_map.get(META_JUDGE_NAME, "") + body = _build_meta_judge_mention(meta_id, issue_title, issue_description) + try: + posted = client.post_comment(round_.issue_id, body) + meta_cid = posted.get("id", "") + except Exception as exc: + logger.error("round %s: cannot post meta-judge mention: %s", round_.round_id, exc) + queue.update_status(round_.round_id, "error") + return + + queue.update_phase( + round_.round_id, + "awaiting_rubric", + phase_entered_at=_utcnow(), + meta_judge_comment_id=meta_cid, + ) + logger.info("round %s: meta-judge mention posted id=%s", round_.round_id, meta_cid) + + +def _advance_awaiting_rubric( + round_: Round, + client: MulticaClient, + queue: DebateQueue, + cfg: Config, + logger: logging.Logger, +) -> None: + """Poll for meta-judge reply, extract rubric YAML, post judge mentions.""" + elapsed = _elapsed_s(round_.phase_entered_at) + timed_out = elapsed >= cfg.round_timeout_s + + try: + comments = client.list_comments(round_.issue_id) + except Exception as exc: + logger.error("round %s: cannot list comments: %s", round_.round_id, exc) + return + + try: + meta_map = client.find_agents_by_name([META_JUDGE_NAME]) + except Exception as exc: + logger.warning("round %s: cannot look up meta-judge agent: %s", round_.round_id, exc) + meta_map = {} + meta_id = meta_map.get(META_JUDGE_NAME, "") + + reply = _find_reply_by_agent(comments, meta_id, round_.meta_judge_comment_id) + if reply is None: + if timed_out: + logger.error( + "round %s: meta-judge did not reply within %ds — marking error", + round_.round_id, cfg.round_timeout_s, + ) + _post_error_note(round_, client, "meta-judge did not reply within timeout", logger) + queue.update_phase(round_.round_id, "error") + queue.update_status(round_.round_id, "error") + else: + logger.debug( + "round %s: awaiting meta-judge reply (%.0fs/%ds)", + round_.round_id, elapsed, cfg.round_timeout_s, + ) + return + + _meta_cid_reply, meta_content = reply + rubric_spec = _parse_rubric(meta_content) + if rubric_spec is None: + logger.error( + "round %s: meta-judge reply did not contain a parseable rubric — marking error (per higher-management direction: no retries on YAML failure)", + round_.round_id, + ) + _post_error_note( + round_, client, + "meta-judge reply did not contain parseable YAML rubric — a model that cannot produce YAML is not acceptable for this role", + logger, + ) + queue.update_phase(round_.round_id, "error") + queue.update_status(round_.round_id, "error") + return + + # Serialize the rubric spec back to YAML for the judge mention comment. + rubric_yaml = yaml.safe_dump(rubric_spec, sort_keys=False, default_flow_style=False) + + # Fetch issue for title/description and find commit URL from comments + try: + issue = client.get_issue(round_.issue_id) + except Exception as exc: + logger.error("round %s: cannot re-fetch issue: %s", round_.round_id, exc) + queue.update_phase(round_.round_id, "error") + queue.update_status(round_.round_id, "error") + return + + issue_title = issue.get("title", round_.title) + issue_description = issue.get("description", "") or "" + commit_url = _find_commit_url(comments) + + try: + judge_map = client.find_agents_by_name(JUDGE_NAMES) + except Exception as exc: + logger.warning("round %s: cannot look up judge agents: %s", round_.round_id, exc) + judge_map = {} + + body = _build_judge_mention_comment(judge_map, issue_title, issue_description, commit_url, rubric_yaml) + try: + posted = client.post_comment(round_.issue_id, body) + judge_mention_cid = posted.get("id", "") + except Exception as exc: + logger.error("round %s: cannot post judge mention: %s", round_.round_id, exc) + queue.update_phase(round_.round_id, "error") + queue.update_status(round_.round_id, "error") + return + + queue.update_phase( + round_.round_id, + "awaiting_judges", + phase_entered_at=_utcnow(), + rubric_yaml=rubric_yaml, + judge_mention_comment_id=judge_mention_cid, + debate_round=0, + ) + logger.info( + "round %s: judges mentioned (id=%s), awaiting 3 reports", + round_.round_id, judge_mention_cid, + ) + + +def _collect_judge_reports( + round_: Round, + client: MulticaClient, + logger: logging.Logger, +) -> tuple[dict[str, str], dict[str, str], dict[str, str]]: + """Return (raw_contents, parsed_ok_contents, judge_agent_ids) for judges who replied after + round_.judge_mention_comment_id (or the most recent debate-round mention). + + raw_contents and parsed_ok_contents are keyed by judge name. parsed_ok_contents + includes only judges whose report parsed successfully. + """ + try: + comments = client.list_comments(round_.issue_id) + except Exception as exc: + logger.error("round %s: cannot list comments: %s", round_.round_id, exc) + return ({}, {}, {}) + + try: + judge_map = client.find_agents_by_name(JUDGE_NAMES) + except Exception as exc: + logger.warning("round %s: cannot look up judges: %s", round_.round_id, exc) + judge_map = {} + + raw: dict[str, str] = {} + parsed: dict[str, str] = {} + after = round_.judge_mention_comment_id + for name in JUDGE_NAMES: + agent_id = judge_map.get(name, "") + if not agent_id: + continue + reply = _find_reply_by_agent(comments, agent_id, after) + if reply is None: + continue + _, content = reply + raw[name] = content + if _parse_judge_report(content) is not None: + parsed[name] = content + return (raw, parsed, judge_map) + + +def _advance_awaiting_judges( + round_: Round, + client: MulticaClient, + queue: DebateQueue, + cfg: Config, + logger: logging.Logger, +) -> None: + """Wait for all 3 judge reports; on consensus apply verdict; else start debate.""" + raw, parsed, judge_map = _collect_judge_reports(round_, client, logger) + + missing = [n for n in JUDGE_NAMES if n not in raw] + elapsed = _elapsed_s(round_.phase_entered_at) + timed_out = elapsed >= cfg.round_timeout_s + + if missing and not timed_out: + logger.debug( + "round %s: awaiting judges %s (%.0fs/%ds)", + round_.round_id, missing, elapsed, cfg.round_timeout_s, + ) + return + + # Persist the raw report comment IDs for audit — look them up again by name + report_ids = dict(round_.judge_report_comment_ids) + try: + comments = client.list_comments(round_.issue_id) + for name, content in raw.items(): + agent_id = judge_map.get(name, "") + # find the specific comment id matching (author_id, content) + for c in comments: + if c.get("author_id") == agent_id and c.get("content") == content: + report_ids[name] = c.get("id", "") + break + except Exception as exc: + logger.debug("round %s: could not persist report IDs: %s", round_.round_id, exc) + + if len(parsed) == 0: + logger.error( + "round %s: no parseable judge reports (0 of %d) — marking error", + round_.round_id, len(raw), + ) + _post_error_note(round_, client, "no judge produced a parseable evaluation_report YAML", logger) + queue.update_phase(round_.round_id, "error", judge_report_comment_ids=report_ids) + queue.update_status(round_.round_id, "error") + return + + reports: dict[str, dict[str, Any]] = { + name: _parse_judge_report(content) for name, content in parsed.items() # type: ignore[misc] + } + reports = {k: v for k, v in reports.items() if v is not None} + + converged, verdict, avg = _check_consensus(reports) + + if converged and verdict is not None: + logger.info( + "round %s: consensus reached after initial judges — verdict=%s avg=%.2f", + round_.round_id, verdict, avg or 0.0, + ) + verdict_summary = _format_verdict_summary(verdict, avg, reports) + _apply_verdict(round_, client, queue, verdict, verdict_summary, logger) + return + + # Not converged — start debate round 1 + if round_.debate_round >= MAX_DEBATE_ROUNDS: + logger.error( + "round %s: max debate rounds reached without consensus — marking error", + round_.round_id, + ) + _post_error_note(round_, client, f"no consensus after {MAX_DEBATE_ROUNDS} debate rounds", logger) + queue.update_phase(round_.round_id, "error", judge_report_comment_ids=report_ids) + queue.update_status(round_.round_id, "error") + return + + next_round = round_.debate_round + 1 + logger.info( + "round %s: no consensus (avg=%.2f, %d parseable) — starting debate round %d", + round_.round_id, avg or 0.0, len(parsed), next_round, + ) + body = _build_debate_round_comment(judge_map, next_round, raw) + try: + posted = client.post_comment(round_.issue_id, body) + debate_cid = posted.get("id", "") + except Exception as exc: + logger.error("round %s: cannot post debate round %d mention: %s", round_.round_id, next_round, exc) + queue.update_phase(round_.round_id, "error", judge_report_comment_ids=report_ids) + queue.update_status(round_.round_id, "error") + return + + queue.update_phase( + round_.round_id, + "awaiting_debate", + phase_entered_at=_utcnow(), + judge_mention_comment_id=debate_cid, + judge_report_comment_ids=report_ids, + debate_round=next_round, + ) + + +def _advance_awaiting_debate( + round_: Round, + client: MulticaClient, + queue: DebateQueue, + cfg: Config, + logger: logging.Logger, +) -> None: + """Same logic as awaiting_judges but on a subsequent round; caps at MAX_DEBATE_ROUNDS.""" + _advance_awaiting_judges(round_, client, queue, cfg, logger) + + +def _format_verdict_summary( + verdict: str, + avg: float | None, + reports: dict[str, dict[str, Any]], +) -> str: + """Build the verdict text used in both the acceptance comment and the rejection anchor.""" + lines = [f"VERDICT: {verdict}"] + if avg is not None: + lines.append(f"Consensus average score: {avg:.2f} / 5.0") + lines.append("") + lines.append("Per-judge overall scores:") + for name in JUDGE_NAMES: + r = reports.get(name) + ov = _overall_score(r) if r else None + if ov is not None: + lines.append(f"- {name}: {ov:.2f}") + else: + lines.append(f"- {name}: (no score)") + return "\n".join(lines) + + +def _apply_verdict( + round_: Round, + client: MulticaClient, + queue: DebateQueue, + verdict: str, + verdict_summary: str, + logger: logging.Logger, +) -> None: + """Update issue status and mark round done (or post retrigger on REJECT). + + RACE FIX: issue status update precedes queue.update_status("done"). + """ + new_issue_status = "done" if verdict == "ACCEPT" else "in_progress" + logger.info( + "round %s: verdict=%s → issue status=%s", + round_.round_id, verdict, new_issue_status, + ) + try: + client.update_issue_status(round_.issue_id, new_issue_status) + except Exception as exc: + logger.error( + "round %s: cannot update issue status — will retry next cycle: %s", + round_.round_id, exc, + ) + return + + # Post verdict summary comment + try: + client.post_comment(round_.issue_id, verdict_summary) + except Exception as exc: + logger.warning("round %s: cannot post verdict summary: %s", round_.round_id, exc) + + # On REJECT: post anchor retrigger + if verdict == "REJECT": + try: + issue = client.get_issue(round_.issue_id) + except Exception as exc: + logger.warning("round %s: cannot re-fetch issue for retrigger: %s", round_.round_id, exc) + issue = {} + retrigger_cid = _post_rejection_retrigger(round_, client, issue, verdict_summary, logger) + if retrigger_cid: + try: + queue.set_retrigger_comment_id(round_.round_id, retrigger_cid) + except Exception as exc: + logger.warning("round %s: cannot persist retrigger_comment_id: %s", round_.round_id, exc) + + phase = "accepted" if verdict == "ACCEPT" else "rejected" + queue.update_phase(round_.round_id, phase) + queue.update_status(round_.round_id, "done") + logger.info("round %s: complete (phase=%s)", round_.round_id, phase) + + def _post_rejection_retrigger( round_: Round, client: MulticaClient, @@ -358,16 +847,7 @@ def _post_rejection_retrigger( verdict_comment_content: str, logger: logging.Logger, ) -> str | None: - """Post the retrigger comment after a REJECT verdict. - - Returns the comment ID of the retrigger comment when the assignee is an - agent (caller should persist it), or ``None`` when a non-mentioning - coordinator note was posted instead (member/no-assignee) or when posting - failed entirely. - - Any failure is logged and swallowed — round completion must not depend on - this side-effect. - """ + """Post the anchored retrigger comment. Same behaviour as before.""" assignee_type = issue.get("assignee_type", "") assignee_id = issue.get("assignee_id", "") or "" @@ -387,17 +867,13 @@ def _post_rejection_retrigger( _build_coordinator_note_no_agent(round_.round_id, reason), ) except Exception as exc: - logger.warning( - "round %s: cannot post coordinator note: %s", - round_.round_id, exc, - ) + logger.warning("round %s: cannot post coordinator note: %s", round_.round_id, exc) return None - # Agent path — look up name (calls list_agents once) assignee_name = client.get_agent_name(assignee_id) if not assignee_name: logger.warning( - "round %s: assignee agent %s not found in workspace — posting note without mention", + "round %s: assignee %s not found in workspace — posting note without mention", round_.round_id, assignee_id, ) try: @@ -409,10 +885,7 @@ def _post_rejection_retrigger( ), ) except Exception as exc: - logger.warning( - "round %s: cannot post coordinator note: %s", - round_.round_id, exc, - ) + logger.warning("round %s: cannot post coordinator note: %s", round_.round_id, exc) return None issue_description = issue.get("description", "") or "" @@ -423,306 +896,26 @@ def _post_rejection_retrigger( posted = client.post_comment(round_.issue_id, body) cid = posted.get("id", "") logger.info( - "round %s: retrigger comment posted to @%s (id=%s)", + "round %s: retrigger posted to @%s (id=%s)", round_.round_id, assignee_name, cid, ) return cid except Exception as exc: - logger.warning( - "round %s: cannot post retrigger comment: %s", - round_.round_id, exc, - ) + logger.warning("round %s: cannot post retrigger: %s", round_.round_id, exc) return None -def _build_judge_comment( - judge_agent_id: str, - issue_title: str, - issue_description: str, - commit_url: str, - transcript: str, -) -> str: - mention = ( - f"[@{JUDGE_NAME}](mention://agent/{judge_agent_id})" - if judge_agent_id - else f"@{JUDGE_NAME}" - ) - body = JUDGE_PROMPT_TEMPLATE.format( - issue_title=issue_title, - issue_description=issue_description[:2000].strip(), - commit_url=commit_url or "N/A", - transcript=transcript, - ) - return f"{mention}\n\n{body}" - - -# --------------------------------------------------------------------------- -# Phase handlers -# --------------------------------------------------------------------------- - -def _start_round( +def _post_error_note( round_: Round, client: MulticaClient, - queue: DebateQueue, - cfg: Config, + reason: str, logger: logging.Logger, ) -> None: - """pending → running, post debater comment, phase → awaiting_debaters.""" - logger.info("starting round %s for %s", round_.round_id, round_.identifier) - - # Mark running immediately so poll_once won't double-enqueue on a bump. - queue.update_status(round_.round_id, "running") - + """Post a coordinator-only note explaining the round errored out.""" try: - issue = client.get_issue(round_.issue_id) + client.post_comment(round_.issue_id, _build_error_note(round_.round_id, reason)) except Exception as exc: - logger.error("round %s: cannot fetch issue %s: %s", round_.round_id, round_.issue_id, exc, exc_info=True) - queue.update_status(round_.round_id, "error") - return - - issue_title = issue.get("title", round_.title) - issue_description = issue.get("description", "") or "" - - try: - all_comments = client.list_comments(round_.issue_id) - except Exception as exc: - logger.warning("round %s: cannot list comments: %s", round_.round_id, exc) - all_comments = [] - - commit_url = _find_commit_url(all_comments) - - try: - agent_map = client.find_agents_by_name(DEBATER_NAMES) - except Exception as exc: - logger.warning("round %s: cannot look up debater agents: %s", round_.round_id, exc) - agent_map = {} - - body = _build_debater_comment(agent_map, issue_title, issue_description, commit_url) - try: - posted = client.post_comment(round_.issue_id, body) - coord_cid = posted.get("id", "") - except Exception as exc: - logger.error("round %s: cannot post debater mention comment: %s", round_.round_id, exc, exc_info=True) - queue.update_status(round_.round_id, "error") - return - - queue.update_phase( - round_.round_id, - "awaiting_debaters", - phase_entered_at=_utcnow(), - coordinator_comment_id=coord_cid, - ) - logger.info( - "round %s: debater comment posted (id=%s), awaiting debaters", - round_.round_id, coord_cid, - ) - - -def _advance_awaiting_debaters( - round_: Round, - client: MulticaClient, - queue: DebateQueue, - cfg: Config, - logger: logging.Logger, -) -> None: - """Poll debater replies → on all-replied or timeout, post judge comment.""" - elapsed = _elapsed_s(round_.phase_entered_at) - timed_out = elapsed >= cfg.round_timeout_s - - try: - comments = client.list_comments(round_.issue_id) - except Exception as exc: - logger.error("round %s: cannot list comments: %s", round_.round_id, exc, exc_info=True) - return - - try: - issue = client.get_issue(round_.issue_id) - except Exception as exc: - logger.warning("round %s: cannot fetch issue: %s", round_.round_id, exc) - issue = {} - - issue_title = issue.get("title", round_.title) - issue_description = issue.get("description", "") or "" - commit_url = _find_commit_url(comments) - - try: - agent_map = client.find_agents_by_name(DEBATER_NAMES) - except Exception as exc: - logger.warning("round %s: cannot look up debater agents: %s", round_.round_id, exc) - agent_map = {} - - replies = _collect_debater_replies( - comments, agent_map, round_.coordinator_comment_id - ) - missing = [n for n in DEBATER_NAMES if n not in replies] - - if missing and not timed_out: - if elapsed >= cfg.round_timeout_s * 0.8: - logger.warning( - "round %s: debaters approaching timeout (%.0fs / %ds), still missing: %s", - round_.round_id, elapsed, cfg.round_timeout_s, missing, - ) - else: - logger.debug( - "round %s: still waiting for debaters %s (%.0fs / %ds)", - round_.round_id, missing, elapsed, cfg.round_timeout_s, - ) - return - - if missing: - logger.warning( - "round %s: debater timeout after %.0fs — proceeding with partial evidence " - "(missing: %s)", - round_.round_id, elapsed, missing, - ) - else: - logger.info( - "round %s: all %d debaters replied, assembling transcript", - round_.round_id, len(DEBATER_NAMES), - ) - - # Assemble transcript - parts = [] - for name in DEBATER_NAMES: - if name in replies: - parts.append(f"### {name}\n\n{replies[name]}") - else: - parts.append(f"### {name}\n\n*(no response — timed out)*") - transcript = "\n\n".join(parts) - - try: - judge_map = client.find_agents_by_name([JUDGE_NAME]) - judge_agent_id = judge_map.get(JUDGE_NAME, "") - except Exception as exc: - logger.warning("round %s: cannot look up judge agent: %s", round_.round_id, exc) - judge_agent_id = "" - - judge_body = _build_judge_comment( - judge_agent_id, issue_title, issue_description, commit_url, transcript - ) - try: - posted = client.post_comment(round_.issue_id, judge_body) - judge_cid = posted.get("id", "") - except Exception as exc: - logger.error("round %s: cannot post judge comment: %s", round_.round_id, exc, exc_info=True) - return - - queue.update_phase( - round_.round_id, - "awaiting_judge", - phase_entered_at=_utcnow(), - judge_comment_id=judge_cid, - ) - logger.info( - "round %s: judge comment posted (id=%s), awaiting verdict", - round_.round_id, judge_cid, - ) - - -def _advance_awaiting_judge( - round_: Round, - client: MulticaClient, - queue: DebateQueue, - cfg: Config, - logger: logging.Logger, -) -> None: - """Poll for judge verdict. - - On verdict: update issue status FIRST (race fix), then mark round done. - On timeout: mark round error, leave issue in_review for human escalation. - """ - elapsed = _elapsed_s(round_.phase_entered_at) - if elapsed >= cfg.round_timeout_s: - logger.error( - "round %s: judge timeout after %.0fs — leaving issue in_review for human escalation", - round_.round_id, elapsed, - ) - queue.update_phase(round_.round_id, "error") - queue.update_status(round_.round_id, "error") - return - - try: - comments = client.list_comments(round_.issue_id) - except Exception as exc: - logger.error("round %s: cannot list comments: %s", round_.round_id, exc) - return - - try: - judge_map = client.find_agents_by_name([JUDGE_NAME]) - judge_agent_id = judge_map.get(JUDGE_NAME, "") - except Exception as exc: - logger.warning("round %s: cannot look up judge agent: %s", round_.round_id, exc) - judge_agent_id = "" - - judge_reply = _collect_judge_reply( - comments, judge_agent_id, round_.judge_comment_id - ) - - if judge_reply is None: - if elapsed >= cfg.round_timeout_s * 0.8: - logger.warning( - "round %s: judge approaching timeout (%.0fs / %ds) — no verdict yet", - round_.round_id, elapsed, cfg.round_timeout_s, - ) - else: - logger.debug( - "round %s: waiting for judge verdict (%.0fs / %ds)", - round_.round_id, elapsed, cfg.round_timeout_s, - ) - return - - verdict = _parse_verdict(judge_reply) - if verdict is None: - logger.warning( - "round %s: judge replied but no VERDICT: ACCEPT/REJECT found — treating as REJECT", - round_.round_id, - ) - verdict = "REJECT" - - new_issue_status = "done" if verdict == "ACCEPT" else "in_progress" - logger.info( - "round %s: verdict=%s → issue status=%s", - round_.round_id, verdict, new_issue_status, - ) - - # RACE FIX: issue status MUST be updated before marking round done. - # If poll_once runs after this line, the issue is no longer in_review, - # so no double-enqueue is possible regardless of round status. - try: - client.update_issue_status(round_.issue_id, new_issue_status) - except Exception as exc: - logger.error( - "round %s: cannot update issue status — will retry next cycle: %s", - round_.round_id, exc, exc_info=True, - ) - return # do NOT mark done; retry next cycle - - # On REJECT: fetch issue and post the full anchor retrigger comment. - if verdict == "REJECT": - try: - issue = client.get_issue(round_.issue_id) - except Exception as exc: - logger.warning( - "round %s: cannot fetch issue for retrigger — using empty dict: %s", - round_.round_id, exc, - ) - issue = {} - retrigger_cid = _post_rejection_retrigger( - round_, client, issue, judge_reply, logger, - ) - if retrigger_cid: - try: - queue.set_retrigger_comment_id(round_.round_id, retrigger_cid) - except Exception as exc: - logger.warning( - "round %s: cannot persist retrigger_comment_id: %s", - round_.round_id, exc, - ) - - phase = "accepted" if verdict == "ACCEPT" else "rejected" - queue.update_phase(round_.round_id, phase) - queue.update_status(round_.round_id, "done") - logger.info("round %s: complete (phase=%s)", round_.round_id, phase) + logger.warning("round %s: cannot post error note: %s", round_.round_id, exc) def _advance_round( @@ -734,14 +927,14 @@ def _advance_round( ) -> None: """Dispatch to the correct phase handler for a running round.""" if round_.phase == "convened": - # Should be caught by _start_round, but handle gracefully on restart. _start_round(round_, client, queue, cfg, logger) - elif round_.phase == "awaiting_debaters": - _advance_awaiting_debaters(round_, client, queue, cfg, logger) - elif round_.phase == "awaiting_judge": - _advance_awaiting_judge(round_, client, queue, cfg, logger) + elif round_.phase == "awaiting_rubric": + _advance_awaiting_rubric(round_, client, queue, cfg, logger) + elif round_.phase == "awaiting_judges": + _advance_awaiting_judges(round_, client, queue, cfg, logger) + elif round_.phase == "awaiting_debate": + _advance_awaiting_debate(round_, client, queue, cfg, logger) elif round_.phase in ("accepted", "rejected", "error"): - # Terminal phase but status still running — fix it. logger.warning( "round %s is in terminal phase %r but status=%r — correcting", round_.round_id, round_.phase, round_.status, @@ -762,25 +955,20 @@ def orchestrate_pending( logger: logging.Logger, client: MulticaClient | None = None, ) -> None: - """Process pending and in-flight debate rounds. + """Process pending and in-flight review rounds. Called from watch_loop after each poll_once. Idempotent and restart-safe: running rounds resume from their persisted phase. - - ``client`` may be injected for testing; if omitted a real client is created - from cfg. """ if client is None: client = MulticaClient(cfg.server_url, cfg.workspace_id, cfg.token) - # Start any pending rounds (pending → running/awaiting_debaters). for round_ in list(queue.pending()): try: _start_round(round_, client, queue, cfg, logger) except Exception as exc: logger.error("error starting round %s: %s", round_.round_id, exc, exc_info=True) - # Advance all running rounds (covers newly-started and in-flight). for round_ in list(queue.running()): try: _advance_round(round_, client, queue, cfg, logger) diff --git a/src/coordinator/queue.py b/src/coordinator/queue.py index 197237d..dac53c3 100644 --- a/src/coordinator/queue.py +++ b/src/coordinator/queue.py @@ -6,23 +6,27 @@ Schema (~/.coordinator/queue.json): { "rounds": [ { - "round_id": "", - "issue_id": "", - "identifier": "WYL-42", - "title": "...", - "enqueued_at": "", - "status": "pending", - "phase": "convened", - "phase_entered_at": "", - "coordinator_comment_id": "", - "judge_comment_id": "" + "round_id": "", + "issue_id": "", + "identifier": "WYL-42", + "title": "...", + "enqueued_at": "", + "status": "pending", + "phase": "convened", + "phase_entered_at": "", + "meta_judge_comment_id": "", + "rubric_yaml": "", + "judge_mention_comment_id":"", + "judge_report_comment_ids":{}, + "debate_round": 0, + "retrigger_comment_id": "" }, ... ] } status: pending | running | done | error -phase: convened | awaiting_debaters | awaiting_judge | accepted | rejected | error +phase: convened | awaiting_rubric | awaiting_judges | awaiting_debate | accepted | rejected | error Only the watcher appends; only the orchestrator updates status/phase. Completed/errored rounds stay in the file so it stays fully auditable. @@ -52,11 +56,14 @@ class Round: title: str enqueued_at: str status: str = "pending" # pending | running | done | error - phase: str = "convened" # convened | awaiting_debaters | awaiting_judge | accepted | rejected | error - phase_entered_at: str = "" # ISO8601 when current phase was entered (for timeout) - coordinator_comment_id: str = "" # ID of debater-mention comment posted by coordinator - judge_comment_id: str = "" # ID of judge-mention comment posted by coordinator - retrigger_comment_id: str = "" # ID of retrigger comment posted after REJECT (audit) + phase: str = "convened" # convened | awaiting_rubric | awaiting_judges | awaiting_debate | accepted | rejected | error + phase_entered_at: str = "" # ISO8601 when current phase was entered (for timeout) + meta_judge_comment_id: str = "" # coordinator comment mentioning meta-judge + rubric_yaml: str = "" # verbatim YAML returned by meta-judge + judge_mention_comment_id: str = "" # coordinator comment mentioning the 3 judges (or the debate-round comment on round N>0) + judge_report_comment_ids: dict[str, str] = field(default_factory=dict) # {judge_name: comment_id} + debate_round: int = 0 # 0 = initial round; 1..3 = debate rounds + retrigger_comment_id: str = "" # ID of retrigger comment posted after REJECT (audit) def to_dict(self) -> dict[str, Any]: return asdict(self) @@ -72,8 +79,11 @@ class Round: status=d.get("status", "pending"), phase=d.get("phase", "convened"), phase_entered_at=d.get("phase_entered_at", ""), - coordinator_comment_id=d.get("coordinator_comment_id", ""), - judge_comment_id=d.get("judge_comment_id", ""), + meta_judge_comment_id=d.get("meta_judge_comment_id", ""), + rubric_yaml=d.get("rubric_yaml", ""), + judge_mention_comment_id=d.get("judge_mention_comment_id", ""), + judge_report_comment_ids=dict(d.get("judge_report_comment_ids", {}) or {}), + debate_round=int(d.get("debate_round", 0)), retrigger_comment_id=d.get("retrigger_comment_id", ""), ) diff --git a/tests/test_integration.py b/tests/test_integration.py deleted file mode 100644 index 82019e2..0000000 --- a/tests/test_integration.py +++ /dev/null @@ -1,354 +0,0 @@ -"""Integration tests for the debate round orchestration. - -These tests hit the REAL multica API. They require the coordinator env vars: - - COORDINATOR_SERVER_URL e.g. https://multica.example.com - COORDINATOR_WORKSPACE_ID - COORDINATOR_TOKEN - -Run with: - python -m pytest tests/test_integration.py -m integration -v - -The test creates a scratch issue, exercises the full orchestration lifecycle -with simulated debater/judge responses (posted via the coordinator's own token -using the content-based fallback detector), verifies the result, then cleans up. -""" -from __future__ import annotations - -import html -import logging -import os -import time -from pathlib import Path - -import pytest - -pytestmark = pytest.mark.integration - - -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - -@pytest.fixture(scope="module") -def cfg(): - """Load Config from env. Skip module if vars missing.""" - required = [ - "COORDINATOR_SERVER_URL", - "COORDINATOR_WORKSPACE_ID", - "COORDINATOR_TOKEN", - ] - missing = [k for k in required if not os.environ.get(k)] - if missing: - pytest.skip( - f"Integration test requires env vars: {', '.join(missing)}\n" - "Put them in ~/.coordinator/env or export them." - ) - # Load env file if present (same as Config.from_env) - from coordinator.config import load_env_file - load_env_file() - from coordinator.config import Config - return Config.from_env() - - -@pytest.fixture(scope="module") -def client(cfg): - from coordinator.multica_client import MulticaClient - return MulticaClient(cfg.server_url, cfg.workspace_id, cfg.token) - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -def _post_fake_debater_replies(client, issue_id: str): - """Post content-marker comments for each debater via the coordinator token. - - The orchestrator's content-based fallback detector accepts these as valid - debater evidence replies. - """ - from coordinator.orchestrator import DEBATER_NAMES - for name in DEBATER_NAMES: - client.post_comment( - issue_id, - f"[{name} response]: Integration-test evidence for {name}. " - "This comment simulates the agent's evidence analysis.", - ) - - -def _post_fake_judge_verdict(client, issue_id: str, verdict: str = "ACCEPT"): - client.post_comment( - issue_id, - f"VERDICT: {verdict}\n\n" - "Integration test verdict. The implementation satisfies the acceptance criteria.", - ) - - -# --------------------------------------------------------------------------- -# Integration test: full round lifecycle -# --------------------------------------------------------------------------- - -@pytest.mark.integration -def test_full_debate_round_lifecycle(cfg, client, tmp_path): - """End-to-end debate round lifecycle against the real API. - - Steps: - 1. Create scratch issue, set to in_review - 2. poll_once → verify round enqueued - 3. _start_round → verify coordinator comment posted on real issue - 4. Post simulated debater responses (content-marker fallback) - 5. _advance_awaiting_debaters → verify judge comment posted - 6. Post simulated judge verdict - 7. _advance_awaiting_judge → verify issue status updated (RACE FIX verified) - 8. Cleanup - """ - from coordinator.__main__ import poll_once - from coordinator.orchestrator import ( - DEBATER_NAMES, - _advance_awaiting_debaters, - _advance_awaiting_judge, - _start_round, - ) - from coordinator.queue import DebateQueue - from coordinator.state import SeenState - - logger = logging.getLogger("test.integration") - issue_id = None - - try: - # --- 1. Create scratch issue --- - issue = client.create_issue( - title="[DEBATE-SMOKE-TEST] Integration test — auto-delete", - description=( - "This issue is created by the coordinator integration test. " - "It will be cleaned up automatically.\n\n" - "Commit: https://git.wylab.me/multica/coordinator/commit/test000" - ), - ) - issue_id = issue["id"] - logger.info("created scratch issue %s", issue_id) - - # Set to in_review so the watcher picks it up - client.update_issue_status(issue_id, "in_review") - - # --- 2. poll_once → enqueue round --- - state = SeenState.load(tmp_path / "seen.json") - queue = DebateQueue.load(tmp_path / "queue.json") - poll_once(client, state, queue, logger) - - assert queue.pending(), "Expected round enqueued after poll_once" - round_ = queue.pending()[0] - assert round_.issue_id == issue_id - assert round_.status == "pending" - - # --- 3. _start_round → debater comment posted --- - _start_round(round_, client, queue, cfg, logger) - - assert round_.status == "running", f"Expected running, got {round_.status}" - assert round_.phase == "awaiting_debaters", f"Expected awaiting_debaters, got {round_.phase}" - assert round_.coordinator_comment_id, "coordinator_comment_id must be set" - - # Verify comment appeared on the real issue - real_comments = client.list_comments(issue_id) - coord_comment = next( - (c for c in real_comments if c.get("id") == round_.coordinator_comment_id), - None, - ) - assert coord_comment is not None, "Coordinator comment not found in issue comments" - for name in DEBATER_NAMES: - assert name in coord_comment["content"], f"{name} not mentioned in coordinator comment" - - # --- 4. Post simulated debater responses --- - _post_fake_debater_replies(client, issue_id) - - # --- 5. _advance_awaiting_debaters → judge comment posted --- - _advance_awaiting_debaters(round_, client, queue, cfg, logger) - - assert round_.phase == "awaiting_judge", f"Expected awaiting_judge, got {round_.phase}" - assert round_.judge_comment_id, "judge_comment_id must be set" - - real_comments = client.list_comments(issue_id) - judge_comment = next( - (c for c in real_comments if c.get("id") == round_.judge_comment_id), - None, - ) - assert judge_comment is not None, "Judge comment not found in issue comments" - assert "Verdict requested" in judge_comment["content"] - - # --- 6. Post simulated judge verdict --- - _post_fake_judge_verdict(client, issue_id, verdict="ACCEPT") - - # --- 7. _advance_awaiting_judge → issue status updated --- - _advance_awaiting_judge(round_, client, queue, cfg, logger) - - assert round_.status == "done", f"Expected done, got {round_.status}" - assert round_.phase == "accepted", f"Expected accepted, got {round_.phase}" - - # Verify issue status via real API (not just in-memory) - refreshed = client.get_issue(issue_id) - assert refreshed.get("status") == "done", ( - f"Issue status should be 'done' after ACCEPT verdict, " - f"got {refreshed.get('status')!r}" - ) - - # RACE FIX: after _advance_awaiting_judge completes, issue is no longer in_review, - # so poll_once cannot double-enqueue this round. - # Note: other in_review issues in the workspace may also get enqueued — that's OK. - poll_once(client, state, queue, logger) - our_rounds = [r for r in queue.rounds if r.issue_id == issue_id] - assert len(our_rounds) == 1, ( - f"poll_once created a second round for issue {issue_id} (double-enqueue)" - ) - - logger.info("VERDICT OK — full lifecycle passed") - - finally: - # --- 8. Cleanup --- - if issue_id: - try: - client.update_issue_status(issue_id, "done") - logger.info("cleanup: issue %s set to done", issue_id) - except Exception as exc: - logger.warning("cleanup failed for issue %s: %s", issue_id, exc) - - -# --------------------------------------------------------------------------- -# Integration test: REJECT retrigger + anchor (WYL-51) -# --------------------------------------------------------------------------- - -@pytest.mark.integration -def test_retrigger_on_reject_end_to_end(cfg, client, tmp_path): - """End-to-end test: REJECT verdict triggers an anchor retrigger comment. - - Steps: - 1. Create scratch issue with multi-paragraph description + an agent assignee - 2. Manually enqueue a Round at phase=awaiting_judge - 3. Inject a VERDICT: REJECT comment via the content-marker fallback - 4. Run _advance_awaiting_judge - 5. Assert: issue=in_progress, retrigger comment exists with @mention, - verbatim description, and no-drift instruction - 6. Cleanup - """ - from coordinator.orchestrator import ( - REWORK_INSTRUCTIONS, - _advance_awaiting_judge, - ) - from coordinator.queue import DebateQueue, Round - - logger = logging.getLogger("test.integration.retrigger") - issue_id = None - - # Multi-paragraph description with unique sentinel values for verbatim check - DESCRIPTION = ( - "Integration test anchor paragraph one. Sentinel: ANCHOR-INTEG-PARA1.\n\n" - "Integration test anchor paragraph two. Sentinel: ANCHOR-INTEG-PARA2.\n\n" - "Acceptance criteria: must contain ANCHOR-INTEG-CRITERIA." - ) - - try: - # --- 1. Create scratch issue --- - issue = client.create_issue( - title="[DEBATE-SMOKE-TEST] WYL-51 retrigger smoke — auto-delete", - description=DESCRIPTION, - status="in_review", - ) - issue_id = issue["id"] - logger.info("created scratch issue %s", issue_id) - - # Set an agent assignee so retrigger fires with a mention. - # Pick the first available agent in the workspace. - agents = client.list_agents() - assignee = agents[0] if agents else None - if assignee: - client.update_issue( - issue_id, - assignee_id=assignee["id"], - assignee_type="agent", - ) - logger.info("set assignee to %s (%s)", assignee.get("name"), assignee["id"]) - - # --- 2. Manually enqueue a Round at phase=awaiting_judge --- - queue = DebateQueue.load(tmp_path / "queue.json") - round_ = queue.enqueue(issue_id, "WYL-51-SMOKE", "Retrigger smoke test") - queue.update_status(round_.round_id, "running") - queue.update_phase( - round_.round_id, - "awaiting_judge", - phase_entered_at=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - coordinator_comment_id="", - ) - - # --- 3. Inject VERDICT: REJECT comment --- - verdict_body = ( - "VERDICT: REJECT\n\n" - "Integration test rejection. The smoke test intentionally rejects this " - "delivery to verify the retrigger mechanism fires correctly." - ) - client.post_comment(issue_id, verdict_body) - - # Set judge_comment_id to empty so content-fallback triggers - round_.judge_comment_id = "" - queue.save() - - # --- 4. Run _advance_awaiting_judge --- - _advance_awaiting_judge(round_, client, queue, cfg, logger) - - # --- 5. Assertions --- - assert round_.status == "done", f"Expected round done, got {round_.status}" - assert round_.phase == "rejected", f"Expected rejected, got {round_.phase}" - - # (a) Issue status must be in_progress - refreshed = client.get_issue(issue_id) - assert refreshed.get("status") == "in_progress", ( - f"Issue should be in_progress after REJECT, got {refreshed.get('status')!r}" - ) - - # (b) A retrigger comment must exist on the issue - real_comments = client.list_comments(issue_id) - retrigger_cid = round_.retrigger_comment_id - if assignee: - # Agent path: retrigger_comment_id must be set - assert retrigger_cid, "retrigger_comment_id must be persisted on the round" - retrigger_comment = next( - (c for c in real_comments if c.get("id") == retrigger_cid), - None, - ) - assert retrigger_comment is not None, ( - f"Retrigger comment {retrigger_cid} not found in issue comments" - ) - # The API HTML-encodes some chars (e.g. " → ", > → >) in returned - # content. Decode before comparing so assertions match plain-string constants. - body = html.unescape(retrigger_comment["content"]) - - # (b) @-mention present - assert assignee["id"] in body, ( - "Retrigger comment must contain the assignee agent ID" - ) - - # (c) Full verbatim description present - for line in DESCRIPTION.splitlines(): - if line.strip(): - assert line in body, ( - f"Description line {line!r} not found verbatim in retrigger comment" - ) - - # (d) No-drift instruction present - assert REWORK_INSTRUCTIONS in body, ( - "REWORK_INSTRUCTIONS must appear verbatim in retrigger comment" - ) - - logger.info( - "retrigger comment (first 400 chars):\n%s", - body[:400], - ) - - logger.info("VERDICT OK — retrigger end-to-end passed") - - finally: - # --- 6. Cleanup --- - if issue_id: - try: - client.update_issue_status(issue_id, "done") - logger.info("cleanup: issue %s set to done", issue_id) - except Exception as exc: - logger.warning("cleanup failed for issue %s: %s", issue_id, exc) diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 35e1186..f97005c 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -1,544 +1,610 @@ -"""Unit tests for coordinator.orchestrator (WYL-45).""" +"""Tests for the CEK-native review pipeline (meta-judge + 3 judges + consensus).""" from __future__ import annotations import logging -import uuid -from dataclasses import dataclass, field -from datetime import datetime, timezone, timedelta -from pathlib import Path +import pathlib +from datetime import datetime, timezone from typing import Any import pytest +import yaml +from coordinator.queue import DebateQueue, Round from coordinator.orchestrator import ( - DEBATER_NAMES, - DEBATER_PROMPTS, - JUDGE_NAME, + ACCEPT_MIN_SCORE, + CONSENSUS_CRITERION_THRESHOLD, + CONSENSUS_OVERALL_THRESHOLD, + JUDGE_NAMES, + MAX_DEBATE_ROUNDS, + META_JUDGE_NAME, REWORK_INSTRUCTIONS, - _advance_awaiting_debaters, - _advance_awaiting_judge, - _collect_debater_replies, - _collect_judge_reply, + _advance_awaiting_debate, + _advance_awaiting_judges, + _advance_awaiting_rubric, + _advance_round, + _apply_verdict, + _build_coordinator_note_no_agent, + _build_debate_round_comment, + _build_judge_mention_comment, + _build_meta_judge_mention, + _build_retrigger_comment, + _check_consensus, + _criterion_scores, + _extract_yaml, _find_commit_url, - _parse_verdict, + _find_reply_by_agent, + _overall_score, + _parse_judge_report, + _parse_rubric, _post_rejection_retrigger, _start_round, - orchestrate_pending, + _utcnow, ) -from coordinator.queue import DebateQueue, Round + +_logger = logging.getLogger("test.orchestrator") # --------------------------------------------------------------------------- -# Helpers +# Fakes # --------------------------------------------------------------------------- -def _utcnow() -> str: - return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - - -def _past(seconds: float) -> str: - t = datetime.now(timezone.utc) - timedelta(seconds=seconds) - return t.strftime("%Y-%m-%dT%H:%M:%SZ") - - -@dataclass class FakeConfig: - server_url: str = "http://fake" - workspace_id: str = "ws-1" - token: str = "tok" - poll_interval_s: int = 30 - round_timeout_s: int = 600 - max_concurrent_rounds: int = 3 - seen_file: Path = Path("/tmp/seen.json") - queue_file: Path = Path("/tmp/queue.json") - log_file: Path = Path("/tmp/coordinator.log") + server_url = "http://x" + workspace_id = "wid" + token = "tok" + poll_interval_s = 30 + round_timeout_s = 600 + max_concurrent_rounds = 3 -@dataclass class FakeClient: - """Controllable stand-in for MulticaClient.""" - - issue: dict[str, Any] = field(default_factory=lambda: { - "id": "issue-1", - "title": "Test issue", - "description": "Test description. Commit: https://git.example.com/commit/abc123", - "status": "in_review", - }) - comments: list[dict[str, Any]] = field(default_factory=list) - agents: list[dict[str, Any]] = field(default_factory=list) - posted_comments: list[str] = field(default_factory=list) - status_updates: list[tuple[str, str]] = field(default_factory=list) - post_comment_returns_id: str = "comment-coord" + def __init__(self) -> None: + self.issue: dict[str, Any] = { + "id": "issue-1", + "title": "Do the thing", + "description": "Please do the thing clearly.", + "status": "in_review", + "assignee_type": None, + "assignee_id": None, + } + self.comments: list[dict[str, Any]] = [] + self.posted_comments: list[str] = [] + self.agents: list[dict[str, Any]] = [ + {"name": META_JUDGE_NAME, "id": "agent-meta"}, + {"name": "Judge-GPT", "id": "agent-gpt"}, + {"name": "Judge-Claude", "id": "agent-claude"}, + {"name": "Judge-Gemini", "id": "agent-gemini"}, + ] + self._next_comment_id = 1000 def get_issue(self, issue_id: str) -> dict[str, Any]: - return self.issue + return dict(self.issue) + + def update_issue_status(self, issue_id: str, status: str) -> dict[str, Any]: + self.issue["status"] = status + return {"id": issue_id, "status": status} def list_comments(self, issue_id: str) -> list[dict[str, Any]]: return list(self.comments) def post_comment(self, issue_id: str, content: str) -> dict[str, Any]: self.posted_comments.append(content) - cid = self.post_comment_returns_id - # Advance the ID for subsequent calls - self.post_comment_returns_id = "comment-" + str(len(self.posted_comments)) - ts = _utcnow() - new_comment = {"id": cid, "content": content, "created_at": ts, - "author_id": "coord", "author_type": "member"} - self.comments.append(new_comment) - return new_comment + cid = f"posted-{self._next_comment_id}" + self._next_comment_id += 1 + created = _utcnow() + self.comments.append({ + "id": cid, + "content": content, + "author_id": "coord-user", + "created_at": created, + }) + return {"id": cid, "created_at": created} def list_agents(self) -> list[dict[str, Any]]: return list(self.agents) - def find_agents_by_name(self, names) -> dict[str, str]: - wanted = set(names) - return {a["name"]: a["id"] for a in self.agents if a["name"] in wanted} + def find_agents_by_name(self, names): + want = set(names) + return {a["name"]: a["id"] for a in self.agents if a["name"] in want} def get_agent_name(self, agent_id: str) -> str | None: for a in self.agents: - if a.get("id") == agent_id: - return a.get("name") + if a["id"] == agent_id: + return a["name"] return None - def update_issue_status(self, issue_id: str, status: str) -> dict[str, Any]: - self.status_updates.append((issue_id, status)) - self.issue["status"] = status - return self.issue - def update_issue(self, issue_id: str, **fields: Any) -> dict[str, Any]: - self.issue.update(fields) - return self.issue +def _past(seconds_ago: int) -> str: + from datetime import timedelta + t = datetime.now(timezone.utc) - timedelta(seconds=seconds_ago) + return t.strftime("%Y-%m-%dT%H:%M:%SZ") -def _make_round( - *, - status: str = "pending", - phase: str = "convened", - phase_entered_at: str = "", - coordinator_comment_id: str = "", - judge_comment_id: str = "", -) -> Round: - return Round( - round_id=str(uuid.uuid4()), +def _reply_comment(agent_id: str, content: str, created: str | None = None) -> dict[str, Any]: + return { + "id": f"reply-{agent_id}", + "author_id": agent_id, + "content": content, + "created_at": created or _utcnow(), + } + + +def _rubric_yaml_sample() -> str: + spec = { + "checklist": [ + {"question": "Does the code compile?", "category": "hard_rule", "importance": "essential", "rationale": "basic"}, + {"question": "Is documentation present?", "category": "principle", "importance": "important", "rationale": "quality"}, + ], + "rubric_dimensions": [ + {"name": "Correctness", "description": "Does the code work", "scale": "1-5", "weight": 0.6, "score_definitions": {1: "no", 5: "perfect"}}, + {"name": "Clarity", "description": "Readability", "scale": "1-5", "weight": 0.4, "score_definitions": {1: "opaque", 5: "crystal"}}, + ], + } + return yaml.safe_dump(spec, sort_keys=False) + + +def _judge_report(final_score: float, criteria: dict[str, float]) -> str: + report = { + "evaluation_report": { + "score_calculation": {"final_score": final_score}, + "rubric_scores": [ + {"name": k, "score": v, "weight": 0.5} for k, v in criteria.items() + ], + "executive_summary": f"score {final_score}", + } + } + return "```yaml\n" + yaml.safe_dump(report, sort_keys=False) + "\n```" + + +def _make_round(tmp_path: pathlib.Path, **overrides) -> tuple[Round, DebateQueue]: + r = Round( + round_id="r1", issue_id="issue-1", - identifier="WYL-99", - title="Test issue", + identifier="WYL-X", + title="Do the thing", enqueued_at=_utcnow(), - status=status, - phase=phase, - phase_entered_at=phase_entered_at or _utcnow(), - coordinator_comment_id=coordinator_comment_id, - judge_comment_id=judge_comment_id, + status="running", + phase="convened", + phase_entered_at=_utcnow(), ) - - -def _make_queue(tmp_path: Path, *rounds: Round) -> DebateQueue: + for k, v in overrides.items(): + setattr(r, k, v) q = DebateQueue.load(tmp_path / "queue.json") - for r in rounds: - q.rounds.append(r) + q.rounds.append(r) q.save() - return q - - -_logger = logging.getLogger("test.orchestrator") + return r, q # --------------------------------------------------------------------------- -# _parse_verdict +# Pure helpers # --------------------------------------------------------------------------- -def test_parse_verdict_accept(): - assert _parse_verdict("VERDICT: ACCEPT\n\nGreat work.") == "ACCEPT" +def test_extract_yaml_from_fenced_block(): + content = "Here is the rubric:\n\n```yaml\nchecklist:\n - question: foo\n```\n\nDone." + y = _extract_yaml(content) + assert y.startswith("checklist:") + assert "question: foo" in y -def test_parse_verdict_reject(): - assert _parse_verdict("VERDICT: REJECT\n\nMissing tests.") == "REJECT" +def test_extract_yaml_from_unfenced_content(): + content = "checklist:\n - question: foo" + y = _extract_yaml(content) + assert y == content.strip() -def test_parse_verdict_case_insensitive(): - assert _parse_verdict("verdict: accept") == "ACCEPT" +def test_parse_rubric_valid_flat(): + spec = _parse_rubric(f"```yaml\n{_rubric_yaml_sample()}\n```") + assert spec is not None + assert "checklist" in spec or "rubric_dimensions" in spec -def test_parse_verdict_none_when_absent(): - assert _parse_verdict("No verdict here.") is None +def test_parse_rubric_valid_wrapped(): + wrapped = yaml.safe_dump({"evaluation_specification": yaml.safe_load(_rubric_yaml_sample())}) + spec = _parse_rubric(f"```yaml\n{wrapped}\n```") + assert spec is not None + assert "rubric_dimensions" in spec + + +def test_parse_rubric_rejects_malformed_yaml(): + assert _parse_rubric("```yaml\nnot: valid: nested: without: quotes\n```") is None + + +def test_parse_rubric_rejects_yaml_without_expected_keys(): + assert _parse_rubric("```yaml\njust_some: random\n```") is None + + +def test_parse_judge_report_valid_with_final_score(): + content = _judge_report(3.7, {"Correctness": 4.0, "Clarity": 3.5}) + r = _parse_judge_report(content) + assert r is not None + assert r["score_calculation"]["final_score"] == 3.7 + + +def test_parse_judge_report_valid_without_final_score_but_rubric_scores(): + report = { + "evaluation_report": { + "rubric_scores": [ + {"name": "Correctness", "score": 4.0, "weight": 1.0}, + ] + } + } + content = "```yaml\n" + yaml.safe_dump(report) + "\n```" + assert _parse_judge_report(content) is not None + + +def test_parse_judge_report_rejects_empty(): + assert _parse_judge_report("```yaml\nnothing: here\n```") is None + + +def test_overall_score_prefers_final_score(): + r = {"score_calculation": {"final_score": 2.8}, "rubric_scores": [{"name": "x", "score": 5, "weight": 1}]} + assert _overall_score(r) == 2.8 + + +def test_overall_score_falls_back_to_weighted_average(): + r = {"rubric_scores": [ + {"name": "a", "score": 4.0, "weight": 0.6}, + {"name": "b", "score": 2.0, "weight": 0.4}, + ]} + assert _overall_score(r) == pytest.approx(3.2) + + +def test_overall_score_none_when_nothing_to_extract(): + assert _overall_score({}) is None + + +def test_criterion_scores_extracts_names_and_scores(): + r = {"rubric_scores": [ + {"name": "Correctness", "score": 4.0}, + {"name": "Clarity", "score": 3.0}, + ]} + s = _criterion_scores(r) + assert s == {"Correctness": 4.0, "Clarity": 3.0} + + +def test_check_consensus_converged_accept(): + reports = { + "Judge-GPT": {"score_calculation": {"final_score": 4.0}, "rubric_scores": [{"name": "C", "score": 4}]}, + "Judge-Claude": {"score_calculation": {"final_score": 4.2}, "rubric_scores": [{"name": "C", "score": 4}]}, + "Judge-Gemini": {"score_calculation": {"final_score": 4.1}, "rubric_scores": [{"name": "C", "score": 4}]}, + } + converged, verdict, avg = _check_consensus(reports) + assert converged is True + assert verdict == "ACCEPT" + assert avg == pytest.approx((4.0 + 4.2 + 4.1) / 3) + + +def test_check_consensus_converged_reject_low_score(): + reports = { + n: {"score_calculation": {"final_score": 2.5}, "rubric_scores": [{"name": "C", "score": 2}]} + for n in JUDGE_NAMES + } + converged, verdict, avg = _check_consensus(reports) + assert converged is True + assert verdict == "REJECT" + + +def test_check_consensus_not_converged_overall_spread(): + reports = { + "Judge-GPT": {"score_calculation": {"final_score": 2.0}}, + "Judge-Claude": {"score_calculation": {"final_score": 4.0}}, + "Judge-Gemini": {"score_calculation": {"final_score": 3.0}}, + } + converged, verdict, avg = _check_consensus(reports) + assert converged is False + assert verdict is None + assert avg == pytest.approx(3.0) + + +def test_check_consensus_not_converged_criterion_spread(): + reports = { + "Judge-GPT": {"score_calculation": {"final_score": 3.0}, "rubric_scores": [{"name": "C", "score": 2}]}, + "Judge-Claude": {"score_calculation": {"final_score": 3.1}, "rubric_scores": [{"name": "C", "score": 5}]}, + "Judge-Gemini": {"score_calculation": {"final_score": 3.0}, "rubric_scores": [{"name": "C", "score": 3}]}, + } + converged, _, _ = _check_consensus(reports) + assert converged is False + + +def test_check_consensus_no_overalls_returns_false(): + reports = {n: {} for n in JUDGE_NAMES} + converged, verdict, avg = _check_consensus(reports) + assert converged is False + assert avg is None + + +# --------------------------------------------------------------------------- +# Comment builders +# --------------------------------------------------------------------------- + +def test_meta_judge_mention_contains_mention_and_description(): + body = _build_meta_judge_mention("agent-meta", "Title", "Description line 1\nDescription line 2") + assert "mention://agent/agent-meta" in body + assert META_JUDGE_NAME in body + assert "Description line 1" in body + assert "Description line 2" in body + + +def test_judge_mention_contains_all_three_mentions_and_rubric(): + judge_ids = {"Judge-GPT": "a", "Judge-Claude": "b", "Judge-Gemini": "c"} + body = _build_judge_mention_comment(judge_ids, "Title", "Desc", "https://example.com/commit/abc", "rubric: yes") + for n in JUDGE_NAMES: + assert n in body + for agent_id in ("a", "b", "c"): + assert f"mention://agent/{agent_id}" in body + assert "rubric: yes" in body + assert "https://example.com/commit/abc" in body + + +def test_debate_round_comment_quotes_all_prior_reports(): + judge_ids = {n: f"id-{n}" for n in JUDGE_NAMES} + prior = {n: f"REPORT FROM {n}" for n in JUDGE_NAMES} + body = _build_debate_round_comment(judge_ids, 1, prior) + assert "Debate round 1" in body + for n in JUDGE_NAMES: + assert n in body + assert f"REPORT FROM {n}" in body + + +def test_retrigger_comment_has_anchor_and_no_drift_instructions(): + body = _build_retrigger_comment("Worker", "agent-worker", "Original desc line.", "VERDICT: REJECT", "r1") + assert "mention://agent/agent-worker" in body + assert "ANCHOR" in body + assert "Original desc line." in body + assert REWORK_INSTRUCTIONS in body + + +def test_coordinator_note_no_agent_has_no_mention(): + body = _build_coordinator_note_no_agent("r1", "no assignee set") + assert "mention://" not in body + assert "Manual follow-up required" in body # --------------------------------------------------------------------------- # _find_commit_url # --------------------------------------------------------------------------- -def test_find_commit_url_found(): +def test_find_commit_url_picks_latest(): comments = [ - {"content": "Watcher implemented. Commit: https://git.example.com/multica/foo/commit/abc123"}, + {"content": "older https://git.example/commit/abc123", "created_at": "2026-01-01T00:00:00Z"}, + {"content": "newer https://git.example/commit/def456", "created_at": "2026-01-02T00:00:00Z"}, ] - assert _find_commit_url(comments) == "https://git.example.com/multica/foo/commit/abc123" - - -def test_find_commit_url_returns_last(): - comments = [ - {"content": "Commit: https://git.example.com/multica/foo/commit/aaa111"}, - {"content": "Follow-up commit: https://git.example.com/multica/foo/commit/bbb222"}, - ] - assert _find_commit_url(comments) == "https://git.example.com/multica/foo/commit/bbb222" + assert _find_commit_url(comments) == "https://git.example/commit/def456" def test_find_commit_url_empty_when_absent(): - assert _find_commit_url([{"content": "No link here."}]) == "" + assert _find_commit_url([{"content": "no urls"}]) == "" # --------------------------------------------------------------------------- -# _collect_debater_replies +# _find_reply_by_agent # --------------------------------------------------------------------------- -def _make_comment( - cid: str, - content: str, - author_id: str = "anon", - ts: str = "", -) -> dict[str, Any]: - return { - "id": cid, - "content": content, - "author_id": author_id, - "author_type": "agent", - "created_at": ts or _utcnow(), - } - - -def test_collect_replies_by_agent_id(): - cutoff_ts = _past(10) - agent_map = {"Senior Developer": "agent-sd"} +def test_find_reply_by_agent_respects_cutoff(): comments = [ - {"id": "coord", "content": "Debate opened", "author_id": "coord", - "author_type": "member", "created_at": cutoff_ts}, - _make_comment("c1", "Evidence here.", author_id="agent-sd", - ts=_utcnow()), + {"id": "before", "author_id": "agent-x", "content": "early", "created_at": _past(100)}, + {"id": "cutoff", "author_id": "coord", "content": "mention", "created_at": _past(50)}, + {"id": "after", "author_id": "agent-x", "content": "late", "created_at": _past(10)}, ] - replies = _collect_debater_replies(comments, agent_map, "coord") - assert "Senior Developer" in replies + found = _find_reply_by_agent(comments, "agent-x", "cutoff") + assert found == ("after", "late") -def test_collect_replies_content_fallback(): - """Content-based marker accepted when agent ID not in map.""" - cutoff_ts = _past(10) - agent_map = {} - comments = [ - {"id": "coord", "content": "Debate opened", "author_id": "coord", - "author_type": "member", "created_at": cutoff_ts}, - _make_comment( - "c1", - "[Senior Developer response]: grep output here.", - author_id="someone-else", - ts=_utcnow(), - ), - ] - replies = _collect_debater_replies(comments, agent_map, "coord") - assert "Senior Developer" in replies - - -def test_collect_replies_skips_before_cutoff(): - """Comments before coordinator's mention are ignored.""" - early = _past(20) - cutoff_ts = _past(10) - agent_map = {"Senior Developer": "agent-sd"} - comments = [ - _make_comment("early", "Early reply", author_id="agent-sd", ts=early), - {"id": "coord", "content": "Debate opened", "author_id": "coord", - "author_type": "member", "created_at": cutoff_ts}, - ] - replies = _collect_debater_replies(comments, agent_map, "coord") - assert "Senior Developer" not in replies - - -def test_collect_replies_no_duplicate_per_debater(): - cutoff_ts = _past(10) - agent_map = {"Senior Developer": "agent-sd"} - comments = [ - {"id": "coord", "content": "x", "author_id": "coord", - "author_type": "member", "created_at": cutoff_ts}, - _make_comment("c1", "First reply", author_id="agent-sd", ts=_utcnow()), - _make_comment("c2", "Second reply", author_id="agent-sd", ts=_utcnow()), - ] - replies = _collect_debater_replies(comments, agent_map, "coord") - assert replies["Senior Developer"] == "First reply" - - -# --------------------------------------------------------------------------- -# _collect_judge_reply -# --------------------------------------------------------------------------- - -def test_collect_judge_reply_by_agent_id(): - cutoff_ts = _past(10) - comments = [ - {"id": "jc", "content": "Verdict requested", "author_id": "coord", - "created_at": cutoff_ts}, - _make_comment("j1", "VERDICT: ACCEPT\n\nGreat.", author_id="agent-judge", - ts=_utcnow()), - ] - result = _collect_judge_reply(comments, "agent-judge", "jc") - assert result is not None - assert "ACCEPT" in result - - -def test_collect_judge_reply_content_fallback(): - cutoff_ts = _past(10) - comments = [ - {"id": "jc", "content": "x", "author_id": "coord", "created_at": cutoff_ts}, - _make_comment("j1", "VERDICT: REJECT\n\nMissing tests.", author_id="anyone", - ts=_utcnow()), - ] - result = _collect_judge_reply(comments, "", "jc") - assert result is not None - assert "REJECT" in result - - -def test_collect_judge_reply_none_when_absent(): - cutoff_ts = _past(10) - comments = [ - {"id": "jc", "content": "x", "author_id": "coord", "created_at": cutoff_ts}, - ] - assert _collect_judge_reply(comments, "agent-judge", "jc") is None +def test_find_reply_by_agent_none_when_no_match(): + comments = [{"id": "x", "author_id": "other", "content": "hi", "created_at": _utcnow()}] + assert _find_reply_by_agent(comments, "agent-x", "somewhere") is None # --------------------------------------------------------------------------- # _start_round # --------------------------------------------------------------------------- -def test_start_round_marks_running_before_post(tmp_path): - """Status must be 'running' before any API call (early guard against double-enqueue).""" - call_order: list[str] = [] - - class TrackingQueue(DebateQueue): - def update_status(self, round_id, status): - call_order.append(f"update_status:{status}") - super().update_status(round_id, status) - - class TrackingClient(FakeClient): - def post_comment(self, issue_id, content): - call_order.append("post_comment") - return super().post_comment(issue_id, content) - - r = _make_round() - q = TrackingQueue.load(tmp_path / "queue.json") - q.rounds.append(r) - q.save() - - _start_round(r, TrackingClient(), q, FakeConfig(), _logger) - - # update_status("running") must precede post_comment - running_idx = next(i for i, v in enumerate(call_order) if v == "update_status:running") - post_idx = next(i for i, v in enumerate(call_order) if v == "post_comment") - assert running_idx < post_idx - - -def test_start_round_posts_debater_comment(tmp_path): - r = _make_round() - q = _make_queue(tmp_path, r) +def test_start_round_posts_meta_judge_mention_and_sets_phase(tmp_path): + r, q = _make_round(tmp_path, phase="convened", status="pending") client = FakeClient() - _start_round(r, client, q, FakeConfig(), _logger) + assert r.status == "running" assert len(client.posted_comments) == 1 - comment = client.posted_comments[0] - assert "Debate round opened" in comment - for name in DEBATER_NAMES: - assert name in comment + body = client.posted_comments[0] + assert "mention://agent/agent-meta" in body + assert r.phase == "awaiting_rubric" + assert r.meta_judge_comment_id != "" -def test_start_round_sets_phase_awaiting_debaters(tmp_path): - r = _make_round() - q = _make_queue(tmp_path, r) +def test_start_round_marks_error_on_api_failure(tmp_path): + r, q = _make_round(tmp_path, phase="convened", status="pending") - _start_round(r, FakeClient(), q, FakeConfig(), _logger) - - assert r.phase == "awaiting_debaters" - assert r.status == "running" - assert r.coordinator_comment_id # must be set - - -def test_start_round_error_on_api_failure(tmp_path): - class BrokenClient(FakeClient): - def post_comment(self, issue_id, content): - raise RuntimeError("API down") - - r = _make_round() - q = _make_queue(tmp_path, r) - - _start_round(r, BrokenClient(), q, FakeConfig(), _logger) + class FailingClient(FakeClient): + def get_issue(self, issue_id): + raise RuntimeError("no issue") + client = FailingClient() + _start_round(r, client, q, FakeConfig(), _logger) assert r.status == "error" # --------------------------------------------------------------------------- -# _advance_awaiting_debaters — debater replies +# _advance_awaiting_rubric # --------------------------------------------------------------------------- -def _debater_round(tmp_path: Path) -> tuple[Round, DebateQueue, FakeClient]: - """Return a round already in awaiting_debaters phase.""" - r = _make_round( - status="running", - phase="awaiting_debaters", - phase_entered_at=_utcnow(), - coordinator_comment_id="coord-comment", +def test_advance_awaiting_rubric_waits_when_meta_judge_silent(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_rubric", status="running", + meta_judge_comment_id="meta-c", phase_entered_at=_utcnow(), ) - q = _make_queue(tmp_path, r) - client = FakeClient() - # Seed coordinator comment in the comment list - client.comments.append({ - "id": "coord-comment", - "content": "Debate opened", - "author_id": "coord", - "author_type": "member", - "created_at": _past(5), - }) - # Seed debater agents - client.agents = [{"name": name, "id": f"agent-{i}"} for i, name in enumerate(DEBATER_NAMES)] - client.agents.append({"name": JUDGE_NAME, "id": "agent-judge"}) - return r, q, client + client.comments = [{"id": "meta-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}] + _advance_awaiting_rubric(r, client, q, FakeConfig(), _logger) + assert r.phase == "awaiting_rubric" + assert r.status == "running" -def test_advance_debaters_waits_when_not_all_replied(tmp_path): - r, q, client = _debater_round(tmp_path) - - _advance_awaiting_debaters(r, client, q, FakeConfig(), _logger) - - # Not enough replies → no judge comment posted - assert r.phase == "awaiting_debaters" - assert all("Verdict requested" not in c for c in client.posted_comments) - - -def test_advance_debaters_proceeds_when_all_replied(tmp_path): - r, q, client = _debater_round(tmp_path) - - # Add a reply from each debater - for i, name in enumerate(DEBATER_NAMES): - client.comments.append({ - "id": f"reply-{i}", - "content": f"[{name} response]: Evidence here.", - "author_id": f"agent-{i}", - "author_type": "agent", - "created_at": _utcnow(), - }) - - _advance_awaiting_debaters(r, client, q, FakeConfig(), _logger) - - assert r.phase == "awaiting_judge" - assert r.judge_comment_id - judge_comment = client.posted_comments[-1] - assert "Verdict requested" in judge_comment - for name in DEBATER_NAMES: - assert name in judge_comment - - -def test_advance_debaters_timeout_with_partial_evidence(tmp_path): - """After timeout, proceed with partial evidence (missing debaters noted in transcript).""" - r, q, client = _debater_round(tmp_path) - # Only one debater replies - client.comments.append({ - "id": "r1", - "content": f"[{DEBATER_NAMES[0]} response]: My evidence.", - "author_id": "agent-0", - "author_type": "agent", - "created_at": _utcnow(), - }) - # Simulate timeout by setting phase_entered_at far in the past - r.phase_entered_at = _past(700) - q.save() - - cfg = FakeConfig() - _advance_awaiting_debaters(r, client, q, cfg, _logger) - - assert r.phase == "awaiting_judge" - judge_comment = client.posted_comments[-1] - assert "timed out" in judge_comment or "no response" in judge_comment - - -# --------------------------------------------------------------------------- -# _advance_awaiting_judge — verdict handling -# --------------------------------------------------------------------------- - -def _judge_round(tmp_path: Path, *, phase_entered_at: str = "") -> tuple[Round, DebateQueue, FakeClient]: - r = _make_round( - status="running", - phase="awaiting_judge", - phase_entered_at=phase_entered_at or _utcnow(), - coordinator_comment_id="coord-comment", - judge_comment_id="judge-comment", +def test_advance_awaiting_rubric_errors_on_malformed_yaml(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_rubric", status="running", + meta_judge_comment_id="meta-c", phase_entered_at=_utcnow(), ) - q = _make_queue(tmp_path, r) - client = FakeClient() - client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}] client.comments = [ - {"id": "judge-comment", "content": "Verdict requested", - "author_id": "coord", "author_type": "member", "created_at": _past(5)}, + {"id": "meta-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + {"id": "reply", "author_id": "agent-meta", "content": "```yaml\nnot: valid: nested\n```", "created_at": _utcnow()}, ] - return r, q, client - - -def test_advance_judge_waits_when_no_verdict(tmp_path): - r, q, client = _judge_round(tmp_path) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.phase == "awaiting_judge" - assert r.status == "running" - assert not client.status_updates - - -def test_advance_judge_accept_updates_issue_status_to_done(tmp_path): - r, q, client = _judge_round(tmp_path) - client.comments.append({ - "id": "verdict1", - "content": "VERDICT: ACCEPT\n\nLooks good.", - "author_id": "agent-judge", - "author_type": "agent", - "created_at": _utcnow(), - }) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "accepted" - assert client.status_updates == [("issue-1", "done")] - - -def test_advance_judge_reject_updates_issue_status_to_in_progress(tmp_path): - r, q, client = _judge_round(tmp_path) - client.comments.append({ - "id": "verdict1", - "content": "VERDICT: REJECT\n\nMissing tests.", - "author_id": "agent-judge", - "author_type": "agent", - "created_at": _utcnow(), - }) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "rejected" - assert client.status_updates == [("issue-1", "in_progress")] - - -def test_advance_judge_timeout_marks_error(tmp_path): - """Judge timeout: round → error, issue left in_review for human escalation.""" - r, q, client = _judge_round(tmp_path, phase_entered_at=_past(700)) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "error" + _advance_awaiting_rubric(r, client, q, FakeConfig(), _logger) assert r.phase == "error" - # Issue status must NOT be changed — leave in_review for humans - assert not client.status_updates + assert r.status == "error" + + +def test_advance_awaiting_rubric_moves_to_awaiting_judges_on_valid_rubric(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_rubric", status="running", + meta_judge_comment_id="meta-c", phase_entered_at=_utcnow(), + ) + client = FakeClient() + client.comments = [ + {"id": "meta-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + {"id": "reply", "author_id": "agent-meta", "content": f"```yaml\n{_rubric_yaml_sample()}\n```", "created_at": _utcnow()}, + ] + _advance_awaiting_rubric(r, client, q, FakeConfig(), _logger) + assert r.phase == "awaiting_judges" + assert r.rubric_yaml != "" + assert any("mention://agent/agent-gpt" in c for c in client.posted_comments) + assert any("mention://agent/agent-claude" in c for c in client.posted_comments) + assert any("mention://agent/agent-gemini" in c for c in client.posted_comments) + + +def test_advance_awaiting_rubric_errors_on_timeout_without_reply(tmp_path): + cfg = FakeConfig() + r, q = _make_round( + tmp_path, phase="awaiting_rubric", status="running", + meta_judge_comment_id="meta-c", phase_entered_at=_past(cfg.round_timeout_s + 5), + ) + client = FakeClient() + client.comments = [{"id": "meta-c", "author_id": "coord", "content": "mention", "created_at": _past(cfg.round_timeout_s + 5)}] + _advance_awaiting_rubric(r, client, q, cfg, _logger) + assert r.phase == "error" + assert r.status == "error" # --------------------------------------------------------------------------- -# Race condition: issue status before round done (CRITICAL) +# _advance_awaiting_judges # --------------------------------------------------------------------------- -def test_issue_status_updated_before_round_marked_done(tmp_path): - """RACE FIX: client.update_issue_status MUST precede queue.update_status('done').""" +def test_advance_awaiting_judges_waits_when_missing_reports(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_judges", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_utcnow(), + ) + client = FakeClient() + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + _reply_comment("agent-gpt", _judge_report(3.5, {"Correctness": 4})), + ] + _advance_awaiting_judges(r, client, q, FakeConfig(), _logger) + assert r.phase == "awaiting_judges" + + +def test_advance_awaiting_judges_accepts_on_consensus(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_judges", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_utcnow(), + ) + client = FakeClient() + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + _reply_comment("agent-gpt", _judge_report(4.0, {"Correctness": 4})), + _reply_comment("agent-claude", _judge_report(4.1, {"Correctness": 4})), + _reply_comment("agent-gemini", _judge_report(4.2, {"Correctness": 4})), + ] + _advance_awaiting_judges(r, client, q, FakeConfig(), _logger) + assert r.phase == "accepted" + assert r.status == "done" + assert client.issue["status"] == "done" + + +def test_advance_awaiting_judges_rejects_on_consensus_low(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_judges", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_utcnow(), + ) + client = FakeClient() + client.issue["assignee_type"] = "agent" + client.issue["assignee_id"] = "agent-worker" + client.agents.append({"name": "Worker", "id": "agent-worker"}) + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + _reply_comment("agent-gpt", _judge_report(2.0, {"Correctness": 2})), + _reply_comment("agent-claude", _judge_report(2.2, {"Correctness": 2})), + _reply_comment("agent-gemini", _judge_report(2.1, {"Correctness": 2})), + ] + _advance_awaiting_judges(r, client, q, FakeConfig(), _logger) + assert r.phase == "rejected" + assert r.status == "done" + assert client.issue["status"] == "in_progress" + assert any("mention://agent/agent-worker" in c for c in client.posted_comments) + + +def test_advance_awaiting_judges_starts_debate_round_when_spread(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_judges", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_utcnow(), debate_round=0, + ) + client = FakeClient() + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + _reply_comment("agent-gpt", _judge_report(2.0, {"Correctness": 2})), + _reply_comment("agent-claude", _judge_report(4.0, {"Correctness": 4})), + _reply_comment("agent-gemini", _judge_report(3.0, {"Correctness": 3})), + ] + _advance_awaiting_judges(r, client, q, FakeConfig(), _logger) + assert r.phase == "awaiting_debate" + assert r.debate_round == 1 + + +def test_advance_awaiting_judges_errors_when_no_parseable(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_judges", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_past(FakeConfig.round_timeout_s + 5), + ) + client = FakeClient() + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(FakeConfig.round_timeout_s + 5)}, + _reply_comment("agent-gpt", "not yaml at all"), + _reply_comment("agent-claude", "also not yaml"), + _reply_comment("agent-gemini", "garbage"), + ] + _advance_awaiting_judges(r, client, q, FakeConfig(), _logger) + assert r.phase == "error" + assert r.status == "error" + + +# --------------------------------------------------------------------------- +# Debate round cap +# --------------------------------------------------------------------------- + +def test_awaiting_debate_errors_out_at_cap(tmp_path): + r, q = _make_round( + tmp_path, phase="awaiting_debate", status="running", + judge_mention_comment_id="jm-c", phase_entered_at=_utcnow(), + debate_round=MAX_DEBATE_ROUNDS, + ) + client = FakeClient() + client.comments = [ + {"id": "jm-c", "author_id": "coord", "content": "mention", "created_at": _past(5)}, + _reply_comment("agent-gpt", _judge_report(2.0, {"Correctness": 2})), + _reply_comment("agent-claude", _judge_report(4.0, {"Correctness": 4})), + _reply_comment("agent-gemini", _judge_report(3.0, {"Correctness": 3})), + ] + _advance_awaiting_debate(r, client, q, FakeConfig(), _logger) + assert r.phase == "error" + assert r.status == "error" + + +# --------------------------------------------------------------------------- +# Race fix: issue status moves BEFORE round marked done +# --------------------------------------------------------------------------- + +def test_apply_verdict_updates_issue_before_marking_round_done(tmp_path): call_order: list[str] = [] class TrackingClient(FakeClient): @@ -551,440 +617,78 @@ def test_issue_status_updated_before_round_marked_done(tmp_path): call_order.append(f"round:{status}") super().update_status(round_id, status) - r = _make_round( - status="running", - phase="awaiting_judge", + r = Round( + round_id="r1", issue_id="issue-1", identifier="WYL-X", title="t", + enqueued_at=_utcnow(), status="running", phase="awaiting_judges", phase_entered_at=_utcnow(), - coordinator_comment_id="coord-comment", - judge_comment_id="judge-comment", ) q = TrackingQueue.load(tmp_path / "queue.json") q.rounds.append(r) q.save() - client = TrackingClient() - client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}] - client.comments = [ - {"id": "judge-comment", "content": "x", "author_id": "coord", - "created_at": _past(5)}, - {"id": "v1", "content": "VERDICT: ACCEPT\n\nAll good.", - "author_id": "agent-judge", "author_type": "agent", "created_at": _utcnow()}, - ] - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) + _apply_verdict(r, client, q, "ACCEPT", "VERDICT: ACCEPT\nScore 4.0", _logger) - # Both calls must have happened assert "issue:done" in call_order assert "round:done" in call_order - # Issue status MUST precede round-done - assert call_order.index("issue:done") < call_order.index("round:done"), ( - f"Expected issue:done before round:done, got order: {call_order}" - ) - - -def test_round_not_marked_done_if_issue_update_fails(tmp_path): - """If issue status update fails, don't mark round done (retry next cycle).""" - - class FailingClient(FakeClient): - def update_issue_status(self, issue_id, status): - raise RuntimeError("network error") - - r = _make_round( - status="running", - phase="awaiting_judge", - phase_entered_at=_utcnow(), - coordinator_comment_id="coord-comment", - judge_comment_id="judge-comment", - ) - q = _make_queue(tmp_path, r) - - client = FailingClient() - client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}] - client.comments = [ - {"id": "judge-comment", "content": "x", "author_id": "coord", "created_at": _past(5)}, - {"id": "v1", "content": "VERDICT: ACCEPT", "author_id": "agent-judge", - "created_at": _utcnow()}, - ] - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - # Round must remain running for retry - assert r.status == "running" + assert call_order.index("issue:done") < call_order.index("round:done") # --------------------------------------------------------------------------- -# Phase transitions: full cycle +# _post_rejection_retrigger corner cases # --------------------------------------------------------------------------- -def test_full_phase_cycle(tmp_path): - """pending → running/awaiting_debaters → awaiting_judge → accepted/done.""" - r = _make_round() - q = _make_queue(tmp_path, r) - - # Step 1: start_round +def test_reject_retrigger_skipped_for_member_assignee(tmp_path): + r, q = _make_round(tmp_path) client = FakeClient() - client.agents = [ - *[{"name": n, "id": f"agent-{i}"} for i, n in enumerate(DEBATER_NAMES)], - {"name": JUDGE_NAME, "id": "agent-judge"}, - ] - _start_round(r, client, q, FakeConfig(), _logger) - assert r.status == "running" - assert r.phase == "awaiting_debaters" - coord_cid = r.coordinator_comment_id - - # Step 2: debaters reply - for i, name in enumerate(DEBATER_NAMES): - client.comments.append({ - "id": f"reply-{i}", "content": f"[{name} response]: Evidence.", - "author_id": f"agent-{i}", "author_type": "agent", "created_at": _utcnow(), - }) - _advance_awaiting_debaters(r, client, q, FakeConfig(), _logger) - assert r.phase == "awaiting_judge" - judge_cid = r.judge_comment_id - - # Step 3: judge replies - client.comments.append({ - "id": "verdict1", "content": "VERDICT: ACCEPT\n\nShipped.", - "author_id": "agent-judge", "author_type": "agent", "created_at": _utcnow(), - }) - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - assert r.phase == "accepted" - assert r.status == "done" - assert client.issue["status"] == "done" + client.issue["assignee_type"] = "member" + client.issue["assignee_id"] = "user-1" + cid = _post_rejection_retrigger(r, client, client.issue, "VERDICT: REJECT", _logger) + assert cid is None + assert len(client.posted_comments) == 1 + assert "mention://" not in client.posted_comments[0] + assert "Manual follow-up" in client.posted_comments[0] -# --------------------------------------------------------------------------- -# Restart resume: in-flight rounds resume from correct phase -# --------------------------------------------------------------------------- - -def test_restart_resumes_awaiting_debaters(tmp_path): - """On restart, a running/awaiting_debaters round picks up without re-posting comment.""" - r = _make_round( - status="running", - phase="awaiting_debaters", - phase_entered_at=_utcnow(), - coordinator_comment_id="existing-coord-comment", - ) - q = _make_queue(tmp_path, r) - +def test_reject_retrigger_skipped_when_no_assignee(tmp_path): + r, q = _make_round(tmp_path) client = FakeClient() - client.comments = [ - {"id": "existing-coord-comment", "content": "Debate opened", - "author_id": "coord", "created_at": _past(60)}, - ] - - orchestrate_pending(q, FakeConfig(), _logger, client=client) - - # Must NOT post another debater comment - assert all("Debate round opened" not in c for c in client.posted_comments) - # Phase should still be awaiting_debaters (no replies) - assert r.phase == "awaiting_debaters" + client.issue["assignee_type"] = None + client.issue["assignee_id"] = None + cid = _post_rejection_retrigger(r, client, client.issue, "VERDICT: REJECT", _logger) + assert cid is None -def test_restart_resumes_awaiting_judge(tmp_path): - """On restart, a running/awaiting_judge round resumes without re-posting judge comment.""" - r = _make_round( - status="running", - phase="awaiting_judge", - phase_entered_at=_utcnow(), - coordinator_comment_id="coord-c", - judge_comment_id="judge-c", - ) - q = _make_queue(tmp_path, r) - +def test_reject_retrigger_includes_verbatim_description(tmp_path): + r, q = _make_round(tmp_path) client = FakeClient() - client.comments = [ - {"id": "judge-c", "content": "Verdict requested", - "author_id": "coord", "created_at": _past(30)}, - ] - - orchestrate_pending(q, FakeConfig(), _logger, client=client) - - # Must NOT post another judge comment - assert all("Verdict requested" not in c for c in client.posted_comments) - assert r.phase == "awaiting_judge" - - -# --------------------------------------------------------------------------- -# orchestrate_pending: pending rounds are started, running rounds advanced -# --------------------------------------------------------------------------- - -def test_orchestrate_pending_starts_pending(tmp_path): - r = _make_round() - q = _make_queue(tmp_path, r) - client = FakeClient() - - orchestrate_pending(q, FakeConfig(), _logger, client=client) - - assert r.status == "running" - assert any("Debate round opened" in c for c in client.posted_comments) - - -def test_orchestrate_pending_advances_running(tmp_path): - """Running/awaiting_judge round with a verdict is completed.""" - r = _make_round( - status="running", - phase="awaiting_judge", - phase_entered_at=_utcnow(), - coordinator_comment_id="coord-c", - judge_comment_id="judge-c", - ) - q = _make_queue(tmp_path, r) - - client = FakeClient() - client.agents = [{"name": JUDGE_NAME, "id": "agent-judge"}] - client.comments = [ - {"id": "judge-c", "content": "x", "author_id": "coord", "created_at": _past(10)}, - {"id": "v1", "content": "VERDICT: REJECT\n\nNeeds work.", - "author_id": "agent-judge", "created_at": _utcnow()}, - ] - - orchestrate_pending(q, FakeConfig(), _logger, client=client) - - assert r.status == "done" - assert r.phase == "rejected" - - -# --------------------------------------------------------------------------- -# _advance_awaiting_judge — REJECT retrigger + anchor (WYL-51) -# --------------------------------------------------------------------------- - -def _judge_round_with_agent_assignee( - tmp_path: Path, - *, - assignee_id: str = "agent-worker", - assignee_name: str = "Senior Developer", - description: str = "Test issue description.", -) -> tuple[Round, DebateQueue, FakeClient]: - """_judge_round helper pre-configured with an agent assignee.""" - r, q, client = _judge_round(tmp_path) client.issue["assignee_type"] = "agent" - client.issue["assignee_id"] = assignee_id - client.issue["description"] = description - client.agents.append({"name": assignee_name, "id": assignee_id}) - return r, q, client - - -def _verdict_comment(content: str = "VERDICT: REJECT\n\nDoes not satisfy requirements.") -> dict: - return { - "id": "verdict1", - "content": content, - "author_id": "agent-judge", - "author_type": "agent", - "created_at": _utcnow(), - } - - -# D1 -def test_reject_posts_retrigger_with_assignee_mention(tmp_path): - """REJECT path: a retrigger comment is posted that @-mentions the agent assignee.""" - r, q, client = _judge_round_with_agent_assignee(tmp_path) - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "rejected" - retrigger = next( - (c for c in client.posted_comments if "mention://agent/agent-worker" in c), - None, - ) - assert retrigger is not None, "Expected retrigger comment with @-mention of assignee" - assert "Senior Developer" in retrigger - assert "REJECT" in retrigger - - -# D2 -def test_reject_retrigger_includes_verbatim_original_description(tmp_path): - """REJECT retrigger: full verbatim issue description appears in comment body.""" - desc = ( - "First paragraph of specific requirements.\n\n" - "Second paragraph with unique detail: abc-xyz-123.\n\n" - "Third paragraph with acceptance criteria: must-pass-qa-check." - ) - r, q, client = _judge_round_with_agent_assignee(tmp_path, description=desc) - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - retrigger = next( - (c for c in client.posted_comments if "mention://agent/agent-worker" in c), - None, - ) - assert retrigger is not None - # Every non-empty line of the description must appear verbatim + client.issue["assignee_id"] = "agent-worker" + client.agents.append({"name": "Worker", "id": "agent-worker"}) + desc = "Line A.\n\nLine B with unique-marker-123.\n\nLine C." + client.issue["description"] = desc + cid = _post_rejection_retrigger(r, client, client.issue, "VERDICT: REJECT", _logger) + assert cid is not None + body = client.posted_comments[-1] for line in desc.splitlines(): if line.strip(): - assert line in retrigger, ( - f"Line {line!r} not found verbatim in retrigger comment" - ) - - -# D3 -def test_reject_retrigger_includes_no_drift_instruction(tmp_path): - """REJECT retrigger: REWORK_INSTRUCTIONS constant appears verbatim in comment.""" - r, q, client = _judge_round_with_agent_assignee(tmp_path) - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - retrigger = next( - (c for c in client.posted_comments if "mention://agent/agent-worker" in c), - None, - ) - assert retrigger is not None - assert REWORK_INSTRUCTIONS in retrigger, ( - "REWORK_INSTRUCTIONS constant must appear verbatim in retrigger comment" - ) - assert "Do NOT redefine scope" in retrigger - - -# D4 -def test_reject_retrigger_skipped_if_assignee_is_member(tmp_path): - """REJECT: member assignee → non-mentioning coordinator comment, no @-mention.""" - r, q, client = _judge_round(tmp_path) - client.issue["assignee_type"] = "member" - client.issue["assignee_id"] = "user-123" - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "rejected" - # A coordinator comment IS posted (for audit), but without any @-mention - assert len(client.posted_comments) == 1 - assert "mention://" not in client.posted_comments[0] - assert "Manual follow-up required" in client.posted_comments[0] - - -# D5 -def test_reject_retrigger_skipped_if_no_assignee(tmp_path): - """REJECT: no assignee → non-mentioning coordinator comment, no @-mention.""" - r, q, client = _judge_round(tmp_path) - # Default FakeClient issue has no assignee fields - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "rejected" - assert len(client.posted_comments) == 1 - assert "mention://" not in client.posted_comments[0] - assert "Manual follow-up required" in client.posted_comments[0] - - -# D6 -def test_accept_does_not_post_retrigger(tmp_path): - """ACCEPT path: no retrigger comment is posted.""" - r, q, client = _judge_round_with_agent_assignee(tmp_path) - client.comments.append(_verdict_comment("VERDICT: ACCEPT\n\nExcellent work!")) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.phase == "accepted" - assert all("mention://agent/agent-worker" not in c for c in client.posted_comments) - - -# D7 -def test_retrigger_comment_id_persisted_on_round(tmp_path): - """After REJECT, round.retrigger_comment_id is persisted to queue.json.""" - r, q, client = _judge_round_with_agent_assignee(tmp_path) - client.comments.append(_verdict_comment()) - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert r.status == "done" - assert r.retrigger_comment_id, "retrigger_comment_id should be set on the round" - - # Reload from disk and verify persistence - q2 = DebateQueue.load(tmp_path / "queue.json") - r2 = q2.rounds[0] - assert r2.retrigger_comment_id == r.retrigger_comment_id, ( - "retrigger_comment_id must survive a queue reload" - ) - - -# D8 -def test_retrigger_happens_after_status_update_not_before(tmp_path): - """Race regression: issue:in_progress must precede the retrigger post_comment.""" - call_order: list[str] = [] - - class TrackingClient(FakeClient): - def update_issue_status(self, issue_id, status): - call_order.append(f"issue:{status}") - return super().update_issue_status(issue_id, status) - - def post_comment(self, issue_id, content): - call_order.append("post_comment") - return super().post_comment(issue_id, content) - - r = _make_round( - status="running", - phase="awaiting_judge", - phase_entered_at=_utcnow(), - coordinator_comment_id="coord-c", - judge_comment_id="judge-c", - ) - q = _make_queue(tmp_path, r) - - client = TrackingClient() - client.issue["assignee_type"] = "agent" - client.issue["assignee_id"] = "agent-worker" - client.agents = [ - {"name": JUDGE_NAME, "id": "agent-judge"}, - {"name": "Senior Developer", "id": "agent-worker"}, - ] - client.comments = [ - {"id": "judge-c", "content": "x", "author_id": "coord", "created_at": _past(5)}, - {"id": "v1", "content": "VERDICT: REJECT\n\nFailed.", "author_id": "agent-judge", - "author_type": "agent", "created_at": _utcnow()}, - ] - - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - - assert "issue:in_progress" in call_order, "issue status update must happen" - assert "post_comment" in call_order, "retrigger post_comment must happen" - assert call_order.index("issue:in_progress") < call_order.index("post_comment"), ( - f"issue:in_progress must precede post_comment, got: {call_order}" - ) + assert line in body + assert REWORK_INSTRUCTIONS in body # --------------------------------------------------------------------------- -# DEBATER_PROMPTS scope-swap clause (WYL-60) +# _advance_round dispatch # --------------------------------------------------------------------------- -def test_debater_prompts_contain_scope_swap_clause(): - """Every entry in DEBATER_PROMPTS must contain the scope-swap detection clause.""" - for name in DEBATER_NAMES: - assert name in DEBATER_PROMPTS, f"DEBATER_PROMPTS missing entry for {name!r}" - assert "SCOPE SWAP" in DEBATER_PROMPTS[name], ( - f"DEBATER_PROMPTS[{name!r}] is missing the scope-swap detection clause" - ) +def test_advance_round_dispatches_convened_to_start(tmp_path): + r, q = _make_round(tmp_path, phase="convened", status="pending") + client = FakeClient() + _advance_round(r, client, q, FakeConfig(), _logger) + assert r.phase == "awaiting_rubric" -# --------------------------------------------------------------------------- -# (end of WYL-60 tests) -# --------------------------------------------------------------------------- - -def test_reject_retrigger_failure_does_not_block_round_completion(tmp_path): - """REJECT retrigger: round completes even if posting the retrigger comment fails.""" - r, q, client = _judge_round_with_agent_assignee(tmp_path) - - original_post = FakeClient.post_comment - call_count = [0] - - def post_with_failure(self, issue_id, content): - call_count[0] += 1 - if call_count[0] > 1: - raise RuntimeError("API error on retrigger") - return original_post(self, issue_id, content) - - FakeClient.post_comment = post_with_failure - try: - client.comments.append(_verdict_comment()) - _advance_awaiting_judge(r, client, q, FakeConfig(), _logger) - assert r.status == "done", "Round must complete even if retrigger posting fails" - assert r.phase == "rejected" - finally: - FakeClient.post_comment = original_post +def test_advance_round_corrects_terminal_phase_with_wrong_status(tmp_path): + r, q = _make_round(tmp_path, phase="accepted", status="running") + client = FakeClient() + _advance_round(r, client, q, FakeConfig(), _logger) + assert r.status == "done"