#!/usr/bin/env python3 """Standalone smoke test for debate round orchestration (WYL-45). Runs one full round end-to-end against the real multica API, cleans up, and prints verdict OK or FAIL. Usage: python smoke_test.py Requires env vars (or ~/.coordinator/env): COORDINATOR_SERVER_URL COORDINATOR_WORKSPACE_ID COORDINATOR_TOKEN """ from __future__ import annotations import logging import os import sys import tempfile from pathlib import Path def main() -> int: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stderr, ) log = logging.getLogger("smoke_test") # ---- load coordinator package ------------------------------------------ src = Path(__file__).parent / "src" if src.is_dir(): sys.path.insert(0, str(src)) try: from coordinator.config import Config, load_env_file from coordinator.multica_client import MulticaClient from coordinator.queue import DebateQueue from coordinator.state import SeenState from coordinator.__main__ import poll_once from coordinator.orchestrator import ( DEBATER_NAMES, _advance_awaiting_debaters, _advance_awaiting_judge, _start_round, ) except ImportError as exc: print(f"FAIL: cannot import coordinator package: {exc}", file=sys.stderr) print(" Run 'pip install -e .' in the coordinator directory first.") return 1 load_env_file() required = ["COORDINATOR_SERVER_URL", "COORDINATOR_WORKSPACE_ID", "COORDINATOR_TOKEN"] missing = [k for k in required if not os.environ.get(k)] if missing: print(f"FAIL: missing env vars: {', '.join(missing)}", file=sys.stderr) return 1 try: cfg = Config.from_env() except SystemExit as exc: print(f"FAIL: {exc}", file=sys.stderr) return 1 client = MulticaClient(cfg.server_url, cfg.workspace_id, cfg.token) issue_id: str | None = None failures: list[str] = [] def check(cond: bool, msg: str) -> None: if not cond: failures.append(msg) log.error("ASSERTION FAILED: %s", msg) with tempfile.TemporaryDirectory() as td: tmp = Path(td) state = SeenState.load(tmp / "seen.json") queue = DebateQueue.load(tmp / "queue.json") try: # 1. Create scratch issue log.info("creating scratch issue...") issue = client.create_issue( title="[DEBATE-SMOKE-TEST] WYL-45 smoke — auto-delete", description=( "Smoke test issue for WYL-45. Auto-deleted after test.\n\n" "Commit: https://git.wylab.me/multica/coordinator/commit/smoke000" ), ) issue_id = issue["id"] log.info("created issue %s", issue_id) # 2. Set in_review and enqueue via poll_once client.update_issue_status(issue_id, "in_review") poll_once(client, state, queue, log) check(bool(queue.pending()), "round not enqueued after poll_once") if not queue.pending(): return 1 round_ = queue.pending()[0] check(round_.issue_id == issue_id, "wrong issue_id in round") # 3. Start the round log.info("starting round...") _start_round(round_, client, queue, cfg, log) check(round_.status == "running", f"status={round_.status!r} (want 'running')") check(round_.phase == "awaiting_debaters", f"phase={round_.phase!r} (want 'awaiting_debaters')") check(bool(round_.coordinator_comment_id), "coordinator_comment_id not set") # Verify comment on real issue real_comments = client.list_comments(issue_id) coord_comment = next( (c for c in real_comments if c.get("id") == round_.coordinator_comment_id), None, ) check(coord_comment is not None, "coordinator comment not found in issue") if coord_comment: for name in DEBATER_NAMES: check(name in coord_comment["content"], f"debater '{name}' not mentioned in coordinator comment") # 4. Simulate debater responses (content-marker fallback) log.info("posting fake debater responses...") for name in DEBATER_NAMES: client.post_comment( issue_id, f"[{name} response]: Smoke-test evidence. " f"This simulates {name}'s analysis.", ) # 5. Advance to awaiting_judge log.info("advancing past debaters...") _advance_awaiting_debaters(round_, client, queue, cfg, log) check(round_.phase == "awaiting_judge", f"phase={round_.phase!r} (want 'awaiting_judge')") check(bool(round_.judge_comment_id), "judge_comment_id not set") real_comments = client.list_comments(issue_id) judge_comment = next( (c for c in real_comments if c.get("id") == round_.judge_comment_id), None, ) check(judge_comment is not None, "judge comment not found in issue") if judge_comment: check("Verdict requested" in judge_comment["content"], "judge comment missing 'Verdict requested'") # 6. Simulate judge verdict log.info("posting fake judge verdict...") client.post_comment( issue_id, "VERDICT: ACCEPT\n\nSmoke test passed. Implementation is complete.", ) # 7. Advance past judge log.info("processing verdict...") _advance_awaiting_judge(round_, client, queue, cfg, log) check(round_.status == "done", f"status={round_.status!r} (want 'done')") check(round_.phase == "accepted", f"phase={round_.phase!r} (want 'accepted')") refreshed = client.get_issue(issue_id) check(refreshed.get("status") == "done", f"issue status={refreshed.get('status')!r} (want 'done')") # 8. Race-fix check: poll_once must not double-enqueue OUR issue. # (Other in_review issues may also be enqueued — that's expected.) rounds_before = len(queue.rounds) poll_once(client, state, queue, log) our_rounds_after = [ r for r in queue.rounds if r.issue_id == issue_id ] check( len(our_rounds_after) == 1, f"double-enqueue: issue {issue_id} appears {len(our_rounds_after)} time(s) " f"in queue (want 1)", ) except Exception as exc: log.exception("unexpected error: %s", exc) failures.append(f"unexpected exception: {exc}") finally: if issue_id: try: client.update_issue_status(issue_id, "done") log.info("cleanup: issue %s → done", issue_id) except Exception as exc: log.warning("cleanup failed: %s", exc) if failures: print(f"\nFAIL ({len(failures)} assertion(s)):") for f in failures: print(f" - {f}") return 1 print("\nverdict OK") return 0 if __name__ == "__main__": sys.exit(main())