Files
sim-package/research/run_experiments.py
T

582 lines
21 KiB
Python

import csv
import json
import math
import os
import random
import sqlite3
import subprocess
import time
import urllib.error
import urllib.request
from collections import Counter, defaultdict
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
ENGINE_DIR = ROOT / "sim-engine"
ENGINE_BIN = ENGINE_DIR / "target" / "release" / "sim-engine"
RESULTS_DIR = ROOT / "research" / "results"
RUNS_DIR = RESULTS_DIR / "runs"
def http_json(method: str, url: str, payload=None, timeout=20):
data = None
headers = {"Content-Type": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url=url, data=data, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def wait_for_engine(base_url: str, timeout_s: float = 15.0):
start = time.time()
while time.time() - start < timeout_s:
try:
http_json("GET", f"{base_url}/config", timeout=2)
return
except Exception:
time.sleep(0.2)
raise RuntimeError("engine did not become ready")
def gini(values):
xs = sorted(values)
n = len(xs)
if n == 0:
return 0.0
total = sum(xs)
if total == 0:
return 0.0
weighted = 0
for i, x in enumerate(xs, start=1):
weighted += i * x
return (2 * weighted) / (n * total) - (n + 1) / n
def choose_action(
rng,
turn,
max_turns,
agent_id,
state_by_agent,
pending_sign,
pending_arb_rulings,
due_confirmations,
regime,
):
me = state_by_agent[agent_id]
balance = me["balance"]
for c in pending_arb_rulings:
if c["arbitrator"] == agent_id and c["status"] == "disputed":
ruling_for = c["proposer"] if rng.random() < 0.55 else c["counterparty"]
return {
"action": "arbitrator_ruling",
"contract_id": c["contract_id"],
"ruling_for": ruling_for,
}, None
for c in pending_sign:
if c["counterparty"] == agent_id and c["status"] == "proposed":
if rng.random() < regime["sign_prob"]:
return {
"action": "sign_contract",
"contract_id": c["contract_id"],
"role": "counterparty",
}, None
if c["arbitrator"] == agent_id and c["status"] in {"proposed", "countersigned"}:
if rng.random() < regime["arb_sign_prob"]:
return {
"action": "sign_contract",
"contract_id": c["contract_id"],
"role": "arbitrator",
}, None
for c in due_confirmations:
if c["status"] != "active":
continue
if c["proposer"] == agent_id or c["counterparty"] == agent_id:
if rng.random() < regime["confirm_prob"]:
return {"action": "confirm_delivery", "contract_id": c["contract_id"]}, None
if rng.random() < regime["dispute_prob"]:
return {"action": "dispute_delivery", "contract_id": c["contract_id"]}, None
if turn == 1:
core_id = f"core_{rng.randrange(regime['num_cores'])}"
bid = max(1, int(abs(rng.gauss(regime["core_bid_mean"], regime["core_bid_std"]))))
bid = min(bid, max(1, int(balance * 0.35)))
return {"action": "bid_core", "core_id": core_id, "amount": bid}, None
if balance < 0:
if rng.random() < 0.6:
return {"action": "job"}, "taking job to cover debt"
return {"action": "mine"}, None
if balance < regime["low_balance_threshold"] and rng.random() < 0.35:
return {"action": "job"}, None
r = rng.random()
if r < regime["mine_prob"]:
return {"action": "mine"}, None
r -= regime["mine_prob"]
if r < regime["stake_prob"] and balance > 20:
amount = min(int(balance * rng.uniform(0.05, 0.25)), max(5, int(balance) - 1))
return {"action": "stake", "amount": max(1, amount)}, None
r -= regime["stake_prob"]
if r < regime["burn_prob"] and balance > 20:
amount = min(int(balance * rng.uniform(0.03, 0.2)), max(3, int(balance) - 1))
return {"action": "burn", "amount": max(1, amount)}, None
r -= regime["burn_prob"]
if r < regime["transfer_prob"] and balance > 15:
others = [a for a in state_by_agent if a != agent_id]
to = rng.choice(others)
amount = max(1, min(int(balance * rng.uniform(0.01, 0.08)), 30))
return {"action": "transfer", "to": to, "amount": amount, "fee": 1}, "small transfer"
r -= regime["transfer_prob"]
if r < regime["propose_prob"] and turn < max_turns - 2 and balance > 80:
others = [a for a in state_by_agent if a != agent_id]
counterparty = rng.choice(others)
arb = rng.choice([a for a in others if a != counterparty])
delivery = min(max_turns - 1, turn + rng.randint(2, 6))
price = rng.randint(15, 80)
p_lock = rng.randint(10, 40)
cp_lock = rng.randint(10, 40)
a_lock = rng.randint(5, 20)
contract_type = rng.choice(["service", "information_sale", "forward", "loan"])
return {
"action": "propose_contract",
"contract": {
"counterparty": counterparty,
"arbitrator": arb,
"contract_type": contract_type,
"terms": {
"description": f"auto-generated {contract_type} at turn {turn}",
"price": price,
"delivery_turn": delivery,
"condition": None,
"pool_members": None,
},
"collateral": {
"proposer_locked": p_lock,
"counterparty_locked": cp_lock,
"arbitrator_locked": a_lock,
},
"settlement_type": "attested",
"penalty": rng.randint(5, 20),
"arbitrator_fee": rng.randint(2, 8),
"payload": None,
},
}, "contract proposal"
if balance > 100 and rng.random() < regime["study_prob"]:
return {"action": "study"}, None
return {"action": "mine"}, None
def fetch_contracts(db_path: Path):
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute("SELECT contract_id, status, data FROM contracts").fetchall()
conn.close()
contracts = []
for row in rows:
data = json.loads(row["data"])
data["status"] = row["status"]
contracts.append(data)
return contracts
def assign_core_shares(db_path: Path, bids, num_cores):
conn = sqlite3.connect(db_path)
by_core = defaultdict(list)
for bid in bids:
by_core[bid["core_id"]].append((bid["agent_id"], int(bid["amount"])))
winners = {}
for i in range(num_cores):
core = f"core_{i}"
entries = by_core.get(core, [])
if entries:
winners[core] = max(entries, key=lambda x: x[1])[0]
if len(winners) < num_cores:
seen = set(winners.values())
all_agents = sorted({b["agent_id"] for b in bids})
fill = [a for a in all_agents if a not in seen] + all_agents
idx = 0
for i in range(num_cores):
core = f"core_{i}"
if core in winners:
continue
winners[core] = fill[idx % len(fill)]
idx += 1
conn.execute("DELETE FROM core_shares")
for core_id, owner in winners.items():
conn.execute(
"INSERT INTO core_shares(core_id, owner, proportion) VALUES (?, ?, ?)",
(core_id, owner, 1.0),
)
conn.commit()
conn.close()
return winners
def run_single(seed, regime_name, regime, turns=40, n_agents=8, port=3100):
rng = random.Random(seed)
agent_ids = [f"agent_{i}" for i in range(n_agents)]
db_path = RUNS_DIR / f"run_{regime_name}_{seed}.db"
if db_path.exists():
db_path.unlink()
env = os.environ.copy()
env["DB_PATH"] = str(db_path)
env["PORT"] = str(port)
proc = subprocess.Popen(
[str(ENGINE_BIN)],
cwd=str(ENGINE_DIR),
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
base_url = f"http://127.0.0.1:{port}"
try:
wait_for_engine(base_url)
cfg = {
"num_agents": n_agents,
"num_cores": regime["num_cores"],
"genesis_tokens_per_agent": regime["genesis_tokens_per_agent"],
"commons_threshold_per_turn": regime["commons_threshold_per_turn"],
"base_inference_rate": regime["base_inference_rate"],
"thinking_layer_discount": 0.1,
"mine_base_weight": regime["mine_base_weight"],
"stake_weight_per_token": regime["stake_weight_per_token"],
"burn_weight_per_token": regime["burn_weight_per_token"],
"burn_decay_rate": 0.02,
"burn_maturity_turns": 3,
"unstake_delay_turns": 5,
"interest_rate_per_turn": regime["interest_rate_per_turn"],
"signing_bonus": 50,
"block_threshold": regime["block_threshold"],
"attested_confirmation_window": 3,
"slash_both_on_timeout": True,
}
http_json("POST", f"{base_url}/init", {"config": cfg, "agent_ids": agent_ids})
turn_rows = []
action_counts = Counter()
winner_counts = Counter()
total_errors = 0
core_bids = []
for turn in range(1, turns + 1):
state = http_json("GET", f"{base_url}/state")
state_by_agent = {a["agent_id"]: a for a in state.get("agents", [])}
contracts = fetch_contracts(db_path)
pending_sign = [
c for c in contracts if c["status"] in {"proposed", "countersigned"}
]
due_confirm = [
c
for c in contracts
if c["status"] == "active" and int(c["terms"]["delivery_turn"]) <= turn
]
pending_rulings = [c for c in contracts if c["status"] == "disputed"]
inputs = []
for agent_id in agent_ids:
action, speech = choose_action(
rng,
turn,
turns,
agent_id,
state_by_agent,
pending_sign,
pending_rulings,
due_confirm,
regime,
)
action_counts[action["action"]] += 1
if action["action"] == "bid_core":
core_bids.append(
{
"turn": turn,
"agent_id": agent_id,
"core_id": action["core_id"],
"amount": action["amount"],
}
)
inputs.append(
{
"agent_id": agent_id,
"thinking": "",
"action": action,
"speech": speech,
"thinking_units": rng.randint(10, 100),
"output_units": rng.randint(20, 180),
}
)
out = http_json("POST", f"{base_url}/turn", {"inputs": inputs})
if not out.get("ok", False):
raise RuntimeError(out.get("error", "unknown turn error"))
data = out.get("data", {})
if turn == 1:
assign_core_shares(db_path, core_bids, regime["num_cores"])
winner = data.get("block_winner")
if winner:
winner_counts[winner] += 1
errs = data.get("errors", [])
total_errors += len(errs)
turn_rows.append(
{
"turn": turn,
"block_winner": winner or "",
"inference_fees": int(data.get("inference_fees_collected", 0)),
"contracts_settled": len(data.get("contracts_settled", [])),
"contracts_defaulted": len(data.get("contracts_defaulted", [])),
"errors": len(errs),
"dividends_total": int(sum(data.get("dividends_paid", {}).values())),
"interest_total": int(sum(data.get("interest_charged", {}).values())),
}
)
final_state = http_json("GET", f"{base_url}/state")
agents = final_state.get("agents", [])
balances = [int(a.get("balance", 0)) for a in agents]
staked = [int(a.get("staked", 0)) for a in agents]
wealth = [b + s for b, s in zip(balances, staked)]
contracts = fetch_contracts(db_path)
status_counts = Counter(c["status"] for c in contracts)
total_fees = sum(r["inference_fees"] for r in turn_rows)
total_interest = sum(r["interest_total"] for r in turn_rows)
total_dividends = sum(r["dividends_total"] for r in turn_rows)
total_settled = sum(r["contracts_settled"] for r in turn_rows)
total_defaulted = sum(r["contracts_defaulted"] for r in turn_rows)
hhi = 0.0
if winner_counts:
total_wins = sum(winner_counts.values())
hhi = sum((count / total_wins) ** 2 for count in winner_counts.values())
run_summary = {
"run_id": f"{regime_name}_{seed}",
"seed": seed,
"regime": regime_name,
"turns": turns,
"agents": n_agents,
"contracts_total": len(contracts),
"contracts_settled_total": total_settled,
"contracts_defaulted_total": total_defaulted,
"contracts_final_status": dict(status_counts),
"inference_fees_total": total_fees,
"interest_total": total_interest,
"dividends_total": total_dividends,
"token_supply_final": int(final_state.get("token_supply", 0)),
"mean_balance": sum(balances) / len(balances),
"median_balance": sorted(balances)[len(balances) // 2],
"mean_wealth": sum(wealth) / len(wealth),
"gini_balance": gini([max(0, b + 2000) for b in balances]),
"gini_wealth": gini([max(0, w + 2000) for w in wealth]),
"negative_balance_agents": sum(1 for b in balances if b < 0),
"winner_hhi": hhi,
"blocks_produced": sum(1 for r in turn_rows if r["block_winner"]),
"action_counts": dict(action_counts),
"winner_counts": dict(winner_counts),
"total_errors": total_errors,
"balances_final": {a["agent_id"]: int(a.get("balance", 0)) for a in agents},
"staked_final": {a["agent_id"]: int(a.get("staked", 0)) for a in agents},
"turn_rows": turn_rows,
}
return run_summary
finally:
proc.terminate()
try:
proc.wait(timeout=3)
except subprocess.TimeoutExpired:
proc.kill()
def summarize_by_regime(runs):
grouped = defaultdict(list)
for r in runs:
grouped[r["regime"]].append(r)
rows = []
for regime, rs in grouped.items():
n = len(rs)
def mean(key):
return sum(x[key] for x in rs) / n
rows.append(
{
"regime": regime,
"n_runs": n,
"contracts_total_mean": mean("contracts_total"),
"contracts_settled_mean": mean("contracts_settled_total"),
"contracts_defaulted_mean": mean("contracts_defaulted_total"),
"fees_total_mean": mean("inference_fees_total"),
"interest_total_mean": mean("interest_total"),
"dividends_total_mean": mean("dividends_total"),
"token_supply_final_mean": mean("token_supply_final"),
"gini_balance_mean": mean("gini_balance"),
"gini_wealth_mean": mean("gini_wealth"),
"neg_balance_agents_mean": mean("negative_balance_agents"),
"winner_hhi_mean": mean("winner_hhi"),
"blocks_produced_mean": mean("blocks_produced"),
"errors_mean": mean("total_errors"),
}
)
return sorted(rows, key=lambda x: x["regime"])
def write_outputs(runs, regime_rows):
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
RUNS_DIR.mkdir(parents=True, exist_ok=True)
with open(RESULTS_DIR / "run_summaries.json", "w", encoding="utf-8") as f:
json.dump(runs, f, indent=2)
with open(RESULTS_DIR / "regime_summary.json", "w", encoding="utf-8") as f:
json.dump(regime_rows, f, indent=2)
with open(RESULTS_DIR / "run_summaries.csv", "w", newline="", encoding="utf-8") as f:
fields = [
"run_id",
"seed",
"regime",
"contracts_total",
"contracts_settled_total",
"contracts_defaulted_total",
"inference_fees_total",
"interest_total",
"dividends_total",
"token_supply_final",
"gini_balance",
"gini_wealth",
"negative_balance_agents",
"winner_hhi",
"blocks_produced",
"total_errors",
]
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for r in runs:
w.writerow({k: r[k] for k in fields})
with open(RESULTS_DIR / "regime_summary.csv", "w", newline="", encoding="utf-8") as f:
fields = list(regime_rows[0].keys()) if regime_rows else []
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for row in regime_rows:
w.writerow(row)
def main():
RUNS_DIR.mkdir(parents=True, exist_ok=True)
if not ENGINE_BIN.exists():
raise RuntimeError(f"engine binary not found at {ENGINE_BIN}")
regimes = {
"baseline": {
"num_cores": 4,
"genesis_tokens_per_agent": 1000,
"commons_threshold_per_turn": 100,
"base_inference_rate": 1,
"mine_base_weight": 10.0,
"stake_weight_per_token": 0.01,
"burn_weight_per_token": 0.05,
"block_threshold": 20.0,
"interest_rate_per_turn": 0.01,
"core_bid_mean": 120,
"core_bid_std": 55,
"low_balance_threshold": 120,
"mine_prob": 0.46,
"stake_prob": 0.14,
"burn_prob": 0.10,
"transfer_prob": 0.10,
"propose_prob": 0.12,
"study_prob": 0.08,
"sign_prob": 0.62,
"arb_sign_prob": 0.58,
"confirm_prob": 0.66,
"dispute_prob": 0.08,
},
"coordination_push": {
"num_cores": 4,
"genesis_tokens_per_agent": 1000,
"commons_threshold_per_turn": 100,
"base_inference_rate": 1,
"mine_base_weight": 10.0,
"stake_weight_per_token": 0.01,
"burn_weight_per_token": 0.05,
"block_threshold": 20.0,
"interest_rate_per_turn": 0.01,
"core_bid_mean": 150,
"core_bid_std": 60,
"low_balance_threshold": 100,
"mine_prob": 0.36,
"stake_prob": 0.15,
"burn_prob": 0.08,
"transfer_prob": 0.10,
"propose_prob": 0.22,
"study_prob": 0.09,
"sign_prob": 0.80,
"arb_sign_prob": 0.75,
"confirm_prob": 0.84,
"dispute_prob": 0.04,
},
"high_compute_cost": {
"num_cores": 4,
"genesis_tokens_per_agent": 1000,
"commons_threshold_per_turn": 60,
"base_inference_rate": 3,
"mine_base_weight": 10.0,
"stake_weight_per_token": 0.01,
"burn_weight_per_token": 0.05,
"block_threshold": 20.0,
"interest_rate_per_turn": 0.02,
"core_bid_mean": 100,
"core_bid_std": 45,
"low_balance_threshold": 220,
"mine_prob": 0.42,
"stake_prob": 0.12,
"burn_prob": 0.08,
"transfer_prob": 0.08,
"propose_prob": 0.10,
"study_prob": 0.05,
"sign_prob": 0.50,
"arb_sign_prob": 0.48,
"confirm_prob": 0.52,
"dispute_prob": 0.14,
},
}
runs = []
seeds = list(range(101, 113))
port = 3100
for regime_name, regime in regimes.items():
for seed in seeds:
run = run_single(seed=seed, regime_name=regime_name, regime=regime, turns=40, n_agents=8, port=port)
runs.append(run)
port += 1
regime_rows = summarize_by_regime(runs)
write_outputs(runs, regime_rows)
print(json.dumps({"runs": len(runs), "regimes": len(regime_rows)}, indent=2))
if __name__ == "__main__":
main()