Files
sim-package/sim-engine/src/ledger.rs
T

441 lines
16 KiB
Rust

use rusqlite::{Connection, params};
use crate::types::*;
use crate::error::SimError;
use std::collections::HashMap;
pub struct Ledger {
conn: Connection,
}
impl Ledger {
pub fn open(path: &str) -> Result<Self, SimError> {
let conn = Connection::open(path)?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
let mut ledger = Self { conn };
ledger.init_schema()?;
Ok(ledger)
}
fn init_schema(&mut self) -> Result<(), SimError> {
self.conn.execute_batch("
CREATE TABLE IF NOT EXISTS agents (
agent_id TEXT PRIMARY KEY,
data TEXT NOT NULL -- JSON blob of AgentState
);
CREATE TABLE IF NOT EXISTS transactions (
tx_id TEXT PRIMARY KEY,
turn INTEGER NOT NULL,
data TEXT NOT NULL,
block_id TEXT, -- null if still in mempool
FOREIGN KEY (block_id) REFERENCES blocks(block_id)
);
CREATE TABLE IF NOT EXISTS blocks (
block_id TEXT PRIMARY KEY,
prev_block_id TEXT NOT NULL,
turn INTEGER NOT NULL,
height INTEGER NOT NULL,
cumulative_weight REAL NOT NULL,
data TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS contracts (
contract_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
data TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS core_shares (
core_id TEXT NOT NULL,
owner TEXT NOT NULL,
proportion REAL NOT NULL,
PRIMARY KEY (core_id, owner)
);
CREATE TABLE IF NOT EXISTS world_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS speech_log (
turn INTEGER NOT NULL,
agent_id TEXT NOT NULL,
message TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_tx_turn ON transactions(turn);
CREATE INDEX IF NOT EXISTS idx_tx_block ON transactions(block_id);
CREATE INDEX IF NOT EXISTS idx_blocks_height ON blocks(height);
CREATE INDEX IF NOT EXISTS idx_contracts_status ON contracts(status);
")?;
Ok(())
}
// ─── Agent ───────────────────────────────────────────────────────────────
pub fn upsert_agent(&self, agent: &AgentState) -> Result<(), SimError> {
let data = serde_json::to_string(agent)?;
self.conn.execute(
"INSERT OR REPLACE INTO agents (agent_id, data) VALUES (?1, ?2)",
params![agent.agent_id, data],
)?;
Ok(())
}
pub fn get_agent(&self, agent_id: &str) -> Result<Option<AgentState>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT data FROM agents WHERE agent_id = ?1"
)?;
let mut rows = stmt.query(params![agent_id])?;
if let Some(row) = rows.next()? {
let data: String = row.get(0)?;
Ok(Some(serde_json::from_str(&data)?))
} else {
Ok(None)
}
}
pub fn get_all_agents(&self) -> Result<Vec<AgentState>, SimError> {
let mut stmt = self.conn.prepare("SELECT data FROM agents")?;
let rows = stmt.query_map([], |row| {
let data: String = row.get(0)?;
Ok(data)
})?;
let mut agents = Vec::new();
for data in rows {
agents.push(serde_json::from_str(&data?)?);
}
Ok(agents)
}
// ─── Transactions ─────────────────────────────────────────────────────────
pub fn insert_tx(&self, tx: &Transaction) -> Result<(), SimError> {
let data = serde_json::to_string(tx)?;
self.conn.execute(
"INSERT OR IGNORE INTO transactions (tx_id, turn, data, block_id)
VALUES (?1, ?2, ?3, NULL)",
params![tx.tx_id, tx.turn, data],
)?;
Ok(())
}
pub fn get_mempool(&self) -> Result<Vec<Transaction>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT data FROM transactions WHERE block_id IS NULL ORDER BY turn ASC"
)?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
let mut txs = Vec::new();
for d in rows {
txs.push(serde_json::from_str::<Transaction>(&d?)?);
}
Ok(txs)
}
pub fn get_tx(&self, tx_id: &str) -> Result<Option<Transaction>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT data FROM transactions WHERE tx_id = ?1"
)?;
let mut rows = stmt.query(params![tx_id])?;
if let Some(row) = rows.next()? {
let data: String = row.get(0)?;
Ok(Some(serde_json::from_str(&data)?))
} else {
Ok(None)
}
}
// mark a set of tx_ids as included in a block
pub fn finalize_transactions(
&self,
tx_ids: &[TxId],
block_id: &str,
) -> Result<(), SimError> {
for tx_id in tx_ids {
self.conn.execute(
"UPDATE transactions SET block_id = ?1 WHERE tx_id = ?2",
params![block_id, tx_id],
)?;
}
Ok(())
}
// get all burn transactions older than `min_age` turns, for PoB proof
pub fn get_mature_burn_txs(
&self,
agent_id: &str,
current_turn: u64,
min_age: u64,
) -> Result<Vec<Transaction>, SimError> {
let max_turn = current_turn.saturating_sub(min_age);
let mut stmt = self.conn.prepare(
"SELECT data FROM transactions
WHERE block_id IS NOT NULL
AND turn <= ?1
AND json_extract(data, '$.sender') = ?2
AND json_extract(data, '$.tx_type') = 'burn'"
)?;
let rows = stmt.query_map(params![max_turn, agent_id], |row| {
row.get::<_, String>(0)
})?;
let mut txs = Vec::new();
for d in rows {
txs.push(serde_json::from_str::<Transaction>(&d?)?);
}
Ok(txs)
}
// ─── Blocks ───────────────────────────────────────────────────────────────
pub fn insert_block(
&self,
block: &Block,
cumulative_weight: f64,
) -> Result<(), SimError> {
let data = serde_json::to_string(block)?;
self.conn.execute(
"INSERT OR IGNORE INTO blocks
(block_id, prev_block_id, turn, height, cumulative_weight, data)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
block.block_id,
block.prev_block_id,
block.turn,
block.height,
cumulative_weight,
data,
],
)?;
Ok(())
}
pub fn get_chain_tip(&self) -> Result<Option<ChainTip>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT block_id, height, cumulative_weight
FROM blocks ORDER BY cumulative_weight DESC, height DESC LIMIT 1"
)?;
let mut rows = stmt.query([])?;
if let Some(row) = rows.next()? {
Ok(Some(ChainTip {
block_id: row.get(0)?,
height: row.get(1)?,
cumulative_weight: row.get(2)?,
}))
} else {
Ok(None)
}
}
pub fn get_block(&self, block_id: &str) -> Result<Option<Block>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT data FROM blocks WHERE block_id = ?1"
)?;
let mut rows = stmt.query(params![block_id])?;
if let Some(row) = rows.next()? {
let data: String = row.get(0)?;
Ok(Some(serde_json::from_str(&data)?))
} else {
Ok(None)
}
}
// get last N blocks in height order — used for burn decay calculation
pub fn get_recent_mine_blocks(&self, since_turn: u64) -> Result<Vec<Block>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT data FROM blocks
WHERE turn >= ?1
AND json_extract(data, '$.validator_type') = 'mine'
ORDER BY height ASC"
)?;
let rows = stmt.query_map(params![since_turn], |row| row.get::<_, String>(0))?;
let mut blocks = Vec::new();
for d in rows {
blocks.push(serde_json::from_str::<Block>(&d?)?);
}
Ok(blocks)
}
// check if an agent has signed conflicting blocks (duplicate stake detection)
pub fn has_duplicate_validation(
&self,
agent_id: &str,
turn: u64,
) -> Result<bool, SimError> {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM blocks
WHERE turn = ?1
AND json_extract(data, '$.validator_id') = ?2",
params![turn, agent_id],
|row| row.get(0),
)?;
Ok(count > 1)
}
// ─── Contracts ────────────────────────────────────────────────────────────
pub fn upsert_contract(&self, contract: &Contract) -> Result<(), SimError> {
let data = serde_json::to_string(contract)?;
let status = format!("{:?}", contract.status).to_lowercase();
self.conn.execute(
"INSERT OR REPLACE INTO contracts (contract_id, status, data)
VALUES (?1, ?2, ?3)",
params![contract.contract_id, status, data],
)?;
Ok(())
}
pub fn get_contract(&self, contract_id: &str) -> Result<Option<Contract>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT data FROM contracts WHERE contract_id = ?1"
)?;
let mut rows = stmt.query(params![contract_id])?;
if let Some(row) = rows.next()? {
let data: String = row.get(0)?;
Ok(Some(serde_json::from_str(&data)?))
} else {
Ok(None)
}
}
pub fn get_active_contracts(&self) -> Result<Vec<Contract>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT data FROM contracts WHERE status IN ('active', 'disputed')"
)?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
let mut contracts = Vec::new();
for d in rows {
contracts.push(serde_json::from_str::<Contract>(&d?)?);
}
Ok(contracts)
}
pub fn get_contracts_due(&self, turn: u64) -> Result<Vec<Contract>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT data FROM contracts
WHERE status = 'active'
AND json_extract(data, '$.terms.delivery_turn') <= ?1"
)?;
let rows = stmt.query_map(params![turn], |row| row.get::<_, String>(0))?;
let mut contracts = Vec::new();
for d in rows {
contracts.push(serde_json::from_str::<Contract>(&d?)?);
}
Ok(contracts)
}
// ─── Core shares ──────────────────────────────────────────────────────────
pub fn upsert_core_share(&self, share: &CoreShare) -> Result<(), SimError> {
self.conn.execute(
"INSERT OR REPLACE INTO core_shares (core_id, owner, proportion)
VALUES (?1, ?2, ?3)",
params![share.core_id, share.owner, share.proportion],
)?;
Ok(())
}
pub fn get_core_shares(&self) -> Result<Vec<CoreShare>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT core_id, owner, proportion FROM core_shares"
)?;
let rows = stmt.query_map([], |row| {
Ok(CoreShare {
core_id: row.get(0)?,
owner: row.get(1)?,
proportion: row.get(2)?,
})
})?;
let mut shares = Vec::new();
for s in rows {
shares.push(s?);
}
Ok(shares)
}
// total shares owned per agent across all cores, weighted by proportion
pub fn get_inference_share_per_agent(&self) -> Result<HashMap<AgentId, f64>, SimError> {
let shares = self.get_core_shares()?;
let num_cores = {
let mut stmt = self.conn.prepare(
"SELECT COUNT(DISTINCT core_id) FROM core_shares"
)?;
stmt.query_row([], |row| row.get::<_, i64>(0))?
} as f64;
let mut result: HashMap<AgentId, f64> = HashMap::new();
for share in shares {
*result.entry(share.owner).or_insert(0.0) += share.proportion / num_cores;
}
Ok(result)
}
// ─── Speech log ───────────────────────────────────────────────────────────
pub fn log_speech(&self, turn: u64, agent_id: &str, message: &str) -> Result<(), SimError> {
self.conn.execute(
"INSERT INTO speech_log (turn, agent_id, message) VALUES (?1, ?2, ?3)",
params![turn, agent_id, message],
)?;
Ok(())
}
pub fn get_speech_log(&self, turn: u64) -> Result<Vec<(AgentId, String)>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT agent_id, message FROM speech_log WHERE turn = ?1 ORDER BY rowid ASC"
)?;
let rows = stmt.query_map(params![turn], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
let mut log = Vec::new();
for r in rows {
log.push(r?);
}
Ok(log)
}
// ─── Config ───────────────────────────────────────────────────────────────
pub fn save_config(&self, config: &WorldConfig) -> Result<(), SimError> {
let data = serde_json::to_string(config)?;
self.conn.execute(
"INSERT OR REPLACE INTO world_config (key, value) VALUES ('config', ?1)",
params![data],
)?;
Ok(())
}
pub fn load_config(&self) -> Result<Option<WorldConfig>, SimError> {
let mut stmt = self.conn.prepare(
"SELECT value FROM world_config WHERE key = 'config'"
)?;
let mut rows = stmt.query([])?;
if let Some(row) = rows.next()? {
let data: String = row.get(0)?;
Ok(Some(serde_json::from_str(&data)?))
} else {
Ok(None)
}
}
// ─── Analytics helpers ────────────────────────────────────────────────────
pub fn get_token_supply(&self) -> Result<Tokens, SimError> {
// sum of all agent balances + staked + locked in contracts
// burned tokens are subtracted (they don't appear in any balance)
let balance_sum: i64 = self.conn.query_row(
"SELECT COALESCE(SUM(CAST(json_extract(data, '$.balance') AS INTEGER)), 0)
FROM agents",
[],
|row| row.get(0),
)?;
let staked_sum: i64 = self.conn.query_row(
"SELECT COALESCE(SUM(CAST(json_extract(data, '$.staked') AS INTEGER)), 0)
FROM agents",
[],
|row| row.get(0),
)?;
Ok(balance_sum + staked_sum)
}
}