From f7c23fb3253c9ae83f720365a242c535fba3eecb Mon Sep 17 00:00:00 2001 From: Wylabb <77673282+Wylabb@users.noreply.github.com> Date: Tue, 7 Apr 2026 23:44:57 +0200 Subject: [PATCH] Replace Rust worker with TS worker in single image Add ts-worker/ with the Bun/TypeScript worker that replaces claw-profile-worker. The Dockerfile now builds a single image containing both the Rust gateway (claw-telegram) and the TS worker. The image defaults to worker mode (bun run ts-worker/main.ts). The gateway Unraid XML overrides with --entrypoint claw-telegram. Worker containers use the same image with the default CMD. - Add ts-worker/ (12 files): HTTP/SSE server, Anthropic SDK engine, approval broker, event translator, state stores - Add package.json with @anthropic-ai/sdk dependency - Rewrite Dockerfile: three-stage build (Rust + Bun + runtime) - Revert CLAW_GATEWAY_WORKER_IMAGE to claw-telegram:latest - Remove image pull from docker_worker_manager (same image, already local) - Add ts-worker paths to CI trigger Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/build-telegram.yml | 6 + .gitignore | 1 + Dockerfile | 42 +- bun.lock | 87 +++ my-claw-telegram.xml | 2 +- package.json | 8 + .../assets/unraid/claw-telegram-gateway.xml | 2 +- .../src/docker_worker_manager.rs | 24 - .../src/unraid_template_manager.rs | 2 +- ts-worker/engine/TelegramWorkerEngine.ts | 163 ++++++ ts-worker/events/WorkerEventTranslator.ts | 257 +++++++++ ts-worker/main.ts | 59 +++ ts-worker/permissions/ApprovalBroker.ts | 211 ++++++++ ts-worker/protocol.ts | 203 +++++++ ts-worker/server.ts | 500 ++++++++++++++++++ ts-worker/state/AppLibraryStore.ts | 334 ++++++++++++ ts-worker/state/FeedStore.ts | 121 +++++ ts-worker/state/TaskStore.ts | 82 +++ ts-worker/state/TeamStore.ts | 59 +++ ts-worker/state/records.ts | 288 ++++++++++ ts-worker/turnManager.ts | 146 +++++ 21 files changed, 2562 insertions(+), 35 deletions(-) create mode 100644 bun.lock create mode 100644 package.json create mode 100644 ts-worker/engine/TelegramWorkerEngine.ts create mode 100644 ts-worker/events/WorkerEventTranslator.ts create mode 100644 ts-worker/main.ts create mode 100644 ts-worker/permissions/ApprovalBroker.ts create mode 100644 ts-worker/protocol.ts create mode 100644 ts-worker/server.ts create mode 100644 ts-worker/state/AppLibraryStore.ts create mode 100644 ts-worker/state/FeedStore.ts create mode 100644 ts-worker/state/TaskStore.ts create mode 100644 ts-worker/state/TeamStore.ts create mode 100644 ts-worker/state/records.ts create mode 100644 ts-worker/turnManager.ts diff --git a/.github/workflows/build-telegram.yml b/.github/workflows/build-telegram.yml index aa337f4..e54f046 100644 --- a/.github/workflows/build-telegram.yml +++ b/.github/workflows/build-telegram.yml @@ -9,12 +9,18 @@ on: - .github/workflows/build-telegram.yml - Dockerfile - rust/** + - ts-worker/** + - package.json + - bun.lock* pull_request: branches: [main] paths: - .github/workflows/build-telegram.yml - Dockerfile - rust/** + - ts-worker/** + - package.json + - bun.lock* workflow_dispatch: env: diff --git a/.gitignore b/.gitignore index d05fb9a..b310f74 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ archive/ # Claude Code local artifacts .claude/settings.local.json .claude/sessions/ +node_modules/ diff --git a/Dockerfile b/Dockerfile index 03eedd1..893342c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,21 +1,47 @@ -# Build stage -FROM rust:1.86-bookworm AS builder +# Stage 1: Rust build +FROM rust:1.86-bookworm AS rust-builder WORKDIR /build COPY rust/ . - RUN cargo build --release --bin claw-telegram -# Runtime stage +# Stage 2: Bun dependency install +FROM oven/bun:1.3 AS bun-builder + +WORKDIR /app +COPY package.json bun.lock* ./ +RUN bun install --frozen-lockfile || bun install + +# Stage 3: Runtime FROM debian:bookworm-slim RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ + curl \ + git \ && rm -rf /var/lib/apt/lists/* -COPY --from=builder /build/target/release/claw-telegram /usr/local/bin/claw-telegram +# Bun runtime +COPY --from=oven/bun:1.3 /usr/local/bin/bun /usr/local/bin/bun -WORKDIR /data +# Rust gateway binary +COPY --from=rust-builder /build/target/release/claw-telegram /usr/local/bin/claw-telegram -ENTRYPOINT ["claw-telegram"] -CMD ["serve"] +# Bun dependencies and TS worker source +COPY --from=bun-builder /app/node_modules /app/node_modules +COPY package.json /app/package.json +COPY ts-worker/ /app/ts-worker/ + +RUN mkdir -p /state/claude-home /state/claw /workspace /data + +ENV CLAW_WORKER_BIND_ADDR=0.0.0.0:8080 +ENV CLAW_WORKER_STATE_ROOT=/state +ENV CLAW_WORKER_DEFAULT_CWD=/workspace +ENV CLAW_WORKER_MODEL=claude-sonnet-4-6 +ENV CLAW_WORKER_PERMISSION_MODE=default +ENV CLAUDE_CONFIG_DIR=/state/claude-home + +WORKDIR /app + +# Default: worker mode. Gateway overrides via --entrypoint claw-telegram. +CMD ["bun", "run", "ts-worker/main.ts"] diff --git a/bun.lock b/bun.lock new file mode 100644 index 0000000..e039440 --- /dev/null +++ b/bun.lock @@ -0,0 +1,87 @@ +{ + "lockfileVersion": 1, + "configVersion": 1, + "workspaces": { + "": { + "name": "claw-code-parity", + "dependencies": { + "@anthropic-ai/sdk": "^0.39.0", + }, + }, + }, + "packages": { + "@anthropic-ai/sdk": ["@anthropic-ai/sdk@0.39.0", "", { "dependencies": { "@types/node": "^18.11.18", "@types/node-fetch": "^2.6.4", "abort-controller": "^3.0.0", "agentkeepalive": "^4.2.1", "form-data-encoder": "1.7.2", "formdata-node": "^4.3.2", "node-fetch": "^2.6.7" } }, "sha512-eMyDIPRZbt1CCLErRCi3exlAvNkBtRe+kW5vvJyef93PmNr/clstYgHhtvmkxN82nlKgzyGPCyGxrm0JQ1ZIdg=="], + + "@types/node": ["@types/node@18.19.130", "", { "dependencies": { "undici-types": "~5.26.4" } }, "sha512-GRaXQx6jGfL8sKfaIDD6OupbIHBr9jv7Jnaml9tB7l4v068PAOXqfcujMMo5PhbIs6ggR1XODELqahT2R8v0fg=="], + + "@types/node-fetch": ["@types/node-fetch@2.6.13", "", { "dependencies": { "@types/node": "*", "form-data": "^4.0.4" } }, "sha512-QGpRVpzSaUs30JBSGPjOg4Uveu384erbHBoT1zeONvyCfwQxIkUshLAOqN/k9EjGviPRmWTTe6aH2qySWKTVSw=="], + + "abort-controller": ["abort-controller@3.0.0", "", { "dependencies": { "event-target-shim": "^5.0.0" } }, "sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg=="], + + "agentkeepalive": ["agentkeepalive@4.6.0", "", { "dependencies": { "humanize-ms": "^1.2.1" } }, "sha512-kja8j7PjmncONqaTsB8fQ+wE2mSU2DJ9D4XKoJ5PFWIdRMa6SLSN1ff4mOr4jCbfRSsxR4keIiySJU0N9T5hIQ=="], + + "asynckit": ["asynckit@0.4.0", "", {}, "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q=="], + + "call-bind-apply-helpers": ["call-bind-apply-helpers@1.0.2", "", { "dependencies": { "es-errors": "^1.3.0", "function-bind": "^1.1.2" } }, "sha512-Sp1ablJ0ivDkSzjcaJdxEunN5/XvksFJ2sMBFfq6x0ryhQV/2b/KwFe21cMpmHtPOSij8K99/wSfoEuTObmuMQ=="], + + "combined-stream": ["combined-stream@1.0.8", "", { "dependencies": { "delayed-stream": "~1.0.0" } }, "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg=="], + + "delayed-stream": ["delayed-stream@1.0.0", "", {}, "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ=="], + + "dunder-proto": ["dunder-proto@1.0.1", "", { "dependencies": { "call-bind-apply-helpers": "^1.0.1", "es-errors": "^1.3.0", "gopd": "^1.2.0" } }, "sha512-KIN/nDJBQRcXw0MLVhZE9iQHmG68qAVIBg9CqmUYjmQIhgij9U5MFvrqkUL5FbtyyzZuOeOt0zdeRe4UY7ct+A=="], + + "es-define-property": ["es-define-property@1.0.1", "", {}, "sha512-e3nRfgfUZ4rNGL232gUgX06QNyyez04KdjFrF+LTRoOXmrOgFKDg4BCdsjW8EnT69eqdYGmRpJwiPVYNrCaW3g=="], + + "es-errors": ["es-errors@1.3.0", "", {}, "sha512-Zf5H2Kxt2xjTvbJvP2ZWLEICxA6j+hAmMzIlypy4xcBg1vKVnx89Wy0GbS+kf5cwCVFFzdCFh2XSCFNULS6csw=="], + + "es-object-atoms": ["es-object-atoms@1.1.1", "", { "dependencies": { "es-errors": "^1.3.0" } }, "sha512-FGgH2h8zKNim9ljj7dankFPcICIK9Cp5bm+c2gQSYePhpaG5+esrLODihIorn+Pe6FGJzWhXQotPv73jTaldXA=="], + + "es-set-tostringtag": ["es-set-tostringtag@2.1.0", "", { "dependencies": { "es-errors": "^1.3.0", "get-intrinsic": "^1.2.6", "has-tostringtag": "^1.0.2", "hasown": "^2.0.2" } }, "sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA=="], + + "event-target-shim": ["event-target-shim@5.0.1", "", {}, "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ=="], + + "form-data": ["form-data@4.0.5", "", { "dependencies": { "asynckit": "^0.4.0", "combined-stream": "^1.0.8", "es-set-tostringtag": "^2.1.0", "hasown": "^2.0.2", "mime-types": "^2.1.12" } }, "sha512-8RipRLol37bNs2bhoV67fiTEvdTrbMUYcFTiy3+wuuOnUog2QBHCZWXDRijWQfAkhBj2Uf5UnVaiWwA5vdd82w=="], + + "form-data-encoder": ["form-data-encoder@1.7.2", "", {}, "sha512-qfqtYan3rxrnCk1VYaA4H+Ms9xdpPqvLZa6xmMgFvhO32x7/3J/ExcTd6qpxM0vH2GdMI+poehyBZvqfMTto8A=="], + + "formdata-node": ["formdata-node@4.4.1", "", { "dependencies": { "node-domexception": "1.0.0", "web-streams-polyfill": "4.0.0-beta.3" } }, "sha512-0iirZp3uVDjVGt9p49aTaqjk84TrglENEDuqfdlZQ1roC9CWlPk6Avf8EEnZNcAqPonwkG35x4n3ww/1THYAeQ=="], + + "function-bind": ["function-bind@1.1.2", "", {}, "sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA=="], + + "get-intrinsic": ["get-intrinsic@1.3.0", "", { "dependencies": { "call-bind-apply-helpers": "^1.0.2", "es-define-property": "^1.0.1", "es-errors": "^1.3.0", "es-object-atoms": "^1.1.1", "function-bind": "^1.1.2", "get-proto": "^1.0.1", "gopd": "^1.2.0", "has-symbols": "^1.1.0", "hasown": "^2.0.2", "math-intrinsics": "^1.1.0" } }, "sha512-9fSjSaos/fRIVIp+xSJlE6lfwhES7LNtKaCBIamHsjr2na1BiABJPo0mOjjz8GJDURarmCPGqaiVg5mfjb98CQ=="], + + "get-proto": ["get-proto@1.0.1", "", { "dependencies": { "dunder-proto": "^1.0.1", "es-object-atoms": "^1.0.0" } }, "sha512-sTSfBjoXBp89JvIKIefqw7U2CCebsc74kiY6awiGogKtoSGbgjYE/G/+l9sF3MWFPNc9IcoOC4ODfKHfxFmp0g=="], + + "gopd": ["gopd@1.2.0", "", {}, "sha512-ZUKRh6/kUFoAiTAtTYPZJ3hw9wNxx+BIBOijnlG9PnrJsCcSjs1wyyD6vJpaYtgnzDrKYRSqf3OO6Rfa93xsRg=="], + + "has-symbols": ["has-symbols@1.1.0", "", {}, "sha512-1cDNdwJ2Jaohmb3sg4OmKaMBwuC48sYni5HUw2DvsC8LjGTLK9h+eb1X6RyuOHe4hT0ULCW68iomhjUoKUqlPQ=="], + + "has-tostringtag": ["has-tostringtag@1.0.2", "", { "dependencies": { "has-symbols": "^1.0.3" } }, "sha512-NqADB8VjPFLM2V0VvHUewwwsw0ZWBaIdgo+ieHtK3hasLz4qeCRjYcqfB6AQrBggRKppKF8L52/VqdVsO47Dlw=="], + + "hasown": ["hasown@2.0.2", "", { "dependencies": { "function-bind": "^1.1.2" } }, "sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ=="], + + "humanize-ms": ["humanize-ms@1.2.1", "", { "dependencies": { "ms": "^2.0.0" } }, "sha512-Fl70vYtsAFb/C06PTS9dZBo7ihau+Tu/DNCk/OyHhea07S+aeMWpFFkUaXRa8fI+ScZbEI8dfSxwY7gxZ9SAVQ=="], + + "math-intrinsics": ["math-intrinsics@1.1.0", "", {}, "sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g=="], + + "mime-db": ["mime-db@1.52.0", "", {}, "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg=="], + + "mime-types": ["mime-types@2.1.35", "", { "dependencies": { "mime-db": "1.52.0" } }, "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw=="], + + "ms": ["ms@2.1.3", "", {}, "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA=="], + + "node-domexception": ["node-domexception@1.0.0", "", {}, "sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ=="], + + "node-fetch": ["node-fetch@2.7.0", "", { "dependencies": { "whatwg-url": "^5.0.0" }, "peerDependencies": { "encoding": "^0.1.0" }, "optionalPeers": ["encoding"] }, "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A=="], + + "tr46": ["tr46@0.0.3", "", {}, "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw=="], + + "undici-types": ["undici-types@5.26.5", "", {}, "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA=="], + + "web-streams-polyfill": ["web-streams-polyfill@4.0.0-beta.3", "", {}, "sha512-QW95TCTaHmsYfHDybGMwO5IJIM93I/6vTRk+daHTWFPhwh+C8Cg7j7XyKrwrj8Ib6vYXe0ocYNrmzY4xAAN6ug=="], + + "webidl-conversions": ["webidl-conversions@3.0.1", "", {}, "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ=="], + + "whatwg-url": ["whatwg-url@5.0.0", "", { "dependencies": { "tr46": "~0.0.3", "webidl-conversions": "^3.0.0" } }, "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw=="], + } +} diff --git a/my-claw-telegram.xml b/my-claw-telegram.xml index efdc8fb..a98678d 100644 --- a/my-claw-telegram.xml +++ b/my-claw-telegram.xml @@ -26,7 +26,7 @@ /mnt/user/appdata/claw-telegram-gateway - git.wylab.me/wylab/claw-ts-worker:latest + git.wylab.me/wylab/claw-telegram:latest /appdata/state /appdata/profiles.json /var/run/docker.sock diff --git a/package.json b/package.json new file mode 100644 index 0000000..1951bf2 --- /dev/null +++ b/package.json @@ -0,0 +1,8 @@ +{ + "name": "claw-code-parity", + "private": true, + "type": "module", + "dependencies": { + "@anthropic-ai/sdk": "^0.39.0" + } +} diff --git a/rust/crates/claw-telegram/assets/unraid/claw-telegram-gateway.xml b/rust/crates/claw-telegram/assets/unraid/claw-telegram-gateway.xml index fe206f8..6ae4408 100644 --- a/rust/crates/claw-telegram/assets/unraid/claw-telegram-gateway.xml +++ b/rust/crates/claw-telegram/assets/unraid/claw-telegram-gateway.xml @@ -26,7 +26,7 @@ /mnt/user/appdata/claw-telegram-gateway - git.wylab.me/wylab/claw-ts-worker:latest + git.wylab.me/wylab/claw-telegram:latest /appdata/state /appdata/profiles.json /var/run/docker.sock diff --git a/rust/crates/claw-telegram/src/docker_worker_manager.rs b/rust/crates/claw-telegram/src/docker_worker_manager.rs index 6bebb7e..36778d7 100644 --- a/rust/crates/claw-telegram/src/docker_worker_manager.rs +++ b/rust/crates/claw-telegram/src/docker_worker_manager.rs @@ -3,7 +3,6 @@ use std::fmt::{Display, Formatter}; use std::path::PathBuf; use std::time::{Duration, Instant}; -use bollard::image::CreateImageOptions; use bollard::container::{ Config as ContainerConfig, CreateContainerOptions, InspectContainerOptions, ListContainersOptions, RemoveContainerOptions, StartContainerOptions, StopContainerOptions, @@ -357,29 +356,6 @@ impl DockerWorkerManager { .iter() .map(|(key, value)| format!("{key}={value}")) .collect::>(); - // Pull the worker image if not already available locally. - use futures_util::StreamExt; - let mut pull_stream = self.docker.create_image( - Some(CreateImageOptions { - from_image: manifest.gateway.worker_image.clone(), - ..Default::default() - }), - None, - None, - ); - while let Some(result) = pull_stream.next().await { - match result { - Ok(info) => { - if let Some(status) = info.status { - eprintln!("[worker-manager] pull {}: {status}", manifest.gateway.worker_image); - } - } - Err(err) => { - eprintln!("[worker-manager] pull {} failed: {err}", manifest.gateway.worker_image); - break; - } - } - } self.docker .create_container( Some(CreateContainerOptions { diff --git a/rust/crates/claw-telegram/src/unraid_template_manager.rs b/rust/crates/claw-telegram/src/unraid_template_manager.rs index dae70e4..e70b035 100644 --- a/rust/crates/claw-telegram/src/unraid_template_manager.rs +++ b/rust/crates/claw-telegram/src/unraid_template_manager.rs @@ -373,7 +373,7 @@ mod tests { ("CLAW_WORKER_AUTH_TOKEN", "worker-secret"), ( "CLAW_GATEWAY_WORKER_IMAGE", - "git.wylab.me/wylab/claw-ts-worker:latest", + "git.wylab.me/wylab/claw-telegram:latest", ), ( "CLAW_GATEWAY_TEMPLATE_DIR", diff --git a/ts-worker/engine/TelegramWorkerEngine.ts b/ts-worker/engine/TelegramWorkerEngine.ts new file mode 100644 index 0000000..ea4e1e6 --- /dev/null +++ b/ts-worker/engine/TelegramWorkerEngine.ts @@ -0,0 +1,163 @@ +/** + * TelegramWorkerEngine — drives Claude conversations for the worker. + * + * Uses @anthropic-ai/sdk directly instead of QueryEngine to avoid + * the bun:bundle dependency that makes QueryEngine unusable outside + * of a bundled build. + * + * Responsibilities: + * - manage conversation history across turns + * - call Claude API via streaming + * - translate streaming events into WorkerTurnEvent + * - handle tool use / approval flow + * - support cancellation + */ + +import Anthropic from '@anthropic-ai/sdk' +import type { MessageStream } from '@anthropic-ai/sdk/lib/streaming.mjs' +import type { ApprovalBroker } from '../permissions/ApprovalBroker.js' +import { WorkerEventTranslator } from '../events/WorkerEventTranslator.js' +import type { WorkerTurnEvent } from '../protocol.js' + +export type TelegramWorkerEngineConfig = { + profileId: string + model: string + permissionMode: string + defaultCwd: string + stateRoot: string + claudeConfigDir: string +} + +type ConversationMessage = { + role: 'user' | 'assistant' + content: string | Array<{ type: string; [key: string]: unknown }> +} + +export class TelegramWorkerEngine { + private config: TelegramWorkerEngineConfig + private broker: ApprovalBroker + private translator: WorkerEventTranslator + private client: Anthropic | null = null + + // Session state — persists across turns + private messages: ConversationMessage[] = [] + private ready = false + + constructor(config: TelegramWorkerEngineConfig, broker: ApprovalBroker) { + this.config = config + this.broker = broker + this.translator = new WorkerEventTranslator() + } + + async init(): Promise { + try { + // The SDK reads ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN from env + this.client = new Anthropic() + this.ready = true + console.log( + `[TelegramWorkerEngine] initialized for profile=${this.config.profileId} model=${this.config.model}`, + ) + return true + } catch (err) { + console.error('[TelegramWorkerEngine] failed to init SDK:', err) + return false + } + } + + get isReady(): boolean { + return this.ready + } + + get messageCount(): number { + return this.messages.length + } + + /** + * Execute a turn. Yields WorkerTurnEvent as they're produced. + */ + async *executeTurn( + prompt: string, + abortController: AbortController, + ): AsyncGenerator { + this.translator.reset() + + if (!this.ready || !this.client) { + yield { + type: 'assistant_text_delta', + delta: '[worker] engine not initialized — check ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN', + } + yield this.translator.buildCompletedEvent() + return + } + + // Add user message to history + this.messages.push({ role: 'user', content: prompt }) + + try { + const stream = this.client.messages.stream({ + model: this.config.model, + max_tokens: 16384, + system: `You are Claude, an AI assistant. The user is communicating via Telegram. Be helpful and concise. Current working directory: ${this.config.defaultCwd}`, + messages: this.messages.map((m) => ({ + role: m.role, + content: m.content, + })), + }) + + let fullText = '' + + stream.on('text', (text) => { + fullText += text + }) + + // Process events from the stream + for await (const event of stream) { + if (abortController.signal.aborted) { + stream.abort() + yield this.translator.buildFailedEvent('Turn cancelled by user') + return + } + + if (event.type === 'content_block_delta') { + const delta = event.delta as { type?: string; text?: string } + if (delta?.type === 'text_delta' && delta.text) { + yield { + type: 'assistant_text_delta', + delta: delta.text, + } as WorkerTurnEvent + } + } + } + + // Get final message for usage + const finalMessage = await stream.finalMessage() + + // Add assistant response to history + const assistantText = + finalMessage.content + .filter((b): b is { type: 'text'; text: string } => b.type === 'text') + .map((b) => b.text) + .join('') || fullText + + this.messages.push({ role: 'assistant', content: assistantText }) + + yield { + type: 'completed', + final_text: assistantText, + iterations: 1, + input_tokens: finalMessage.usage?.input_tokens ?? 0, + output_tokens: finalMessage.usage?.output_tokens ?? 0, + generated_files: [], + } as WorkerTurnEvent + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + yield this.translator.buildFailedEvent(message) + } + } + + /** Reset session — clear conversation history. */ + resetSession(): void { + this.messages = [] + this.broker.reset() + } +} diff --git a/ts-worker/events/WorkerEventTranslator.ts b/ts-worker/events/WorkerEventTranslator.ts new file mode 100644 index 0000000..278bbab --- /dev/null +++ b/ts-worker/events/WorkerEventTranslator.ts @@ -0,0 +1,257 @@ +/** + * WorkerEventTranslator — maps SDKMessage and internal events into + * the WorkerTurnEvent protocol that the Rust gateway expects. + * + * Handles: + * - assistant_text_delta (streaming text) + * - tool_use (tool invocation) + * - tool_result (tool output) + * - approval_requested (permission gate) + * - auto_compaction (context window management) + * - task_created / task_updated / task_stopped + * - agent_spawned + * - team_created / team_deleted + * - mailbox_message + * - completed / failed (terminal events) + */ + +import { randomUUID } from 'crypto' +import type { + WorkerTurnEvent, + GeneratedFileDescriptor, + ApprovalRequestPayload, +} from '../protocol.js' + +/** + * Minimal shape we pattern-match against. SDKMessage is a large union — + * we extract what we need and skip unrecognized types. + */ +type SDKMessageLike = { + type: string + [key: string]: unknown +} + +export class WorkerEventTranslator { + private accumulatedText = '' + private iterations = 0 + private inputTokens = 0 + private outputTokens = 0 + private generatedFiles: GeneratedFileDescriptor[] = [] + + /** Reset state for a new turn. */ + reset(): void { + this.accumulatedText = '' + this.iterations = 0 + this.inputTokens = 0 + this.outputTokens = 0 + this.generatedFiles = [] + } + + /** + * Translate one SDKMessage into zero or more WorkerTurnEvents. + */ + translate(msg: SDKMessageLike): WorkerTurnEvent[] { + const events: WorkerTurnEvent[] = [] + + switch (msg.type) { + // ── Text streaming ──────────────────────────────────────── + case 'assistant': { + const content = msg.content as + | Array<{ type: string; text?: string }> + | undefined + if (Array.isArray(content)) { + for (const block of content) { + if (block.type === 'text' && typeof block.text === 'string') { + this.accumulatedText += block.text + events.push({ + type: 'assistant_text_delta', + delta: block.text, + }) + } + } + } + if (typeof msg.text === 'string') { + this.accumulatedText += msg.text as string + events.push({ + type: 'assistant_text_delta', + delta: msg.text as string, + }) + } + break + } + + case 'content_block_delta': { + const delta = msg.delta as + | { type?: string; text?: string } + | undefined + if (delta?.type === 'text_delta' && typeof delta.text === 'string') { + this.accumulatedText += delta.text + events.push({ + type: 'assistant_text_delta', + delta: delta.text, + }) + } + break + } + + // ── Tool use ────────────────────────────────────────────── + case 'tool_use': { + events.push({ + type: 'tool_use', + id: (msg.id as string) || randomUUID(), + name: msg.name as string, + input: + typeof msg.input === 'string' + ? (msg.input as string) + : JSON.stringify(msg.input), + }) + break + } + + // ── Tool result ─────────────────────────────────────────── + case 'tool_result': { + events.push({ + type: 'tool_result', + tool_use_id: msg.tool_use_id as string, + tool_name: (msg.tool_name as string) || 'unknown', + output: + typeof msg.output === 'string' + ? (msg.output as string) + : JSON.stringify(msg.output ?? ''), + is_error: !!msg.is_error, + }) + break + } + + // ── Usage / result ──────────────────────────────────────── + case 'result': { + const r = msg as Record + if (typeof r.input_tokens === 'number') + this.inputTokens += r.input_tokens + if (typeof r.output_tokens === 'number') + this.outputTokens += r.output_tokens + this.iterations++ + + // Check for generated files + const files = r.generated_files as GeneratedFileDescriptor[] | undefined + if (Array.isArray(files)) { + this.generatedFiles.push(...files) + } + break + } + + // ── Auto-compaction ─────────────────────────────────────── + case 'auto_compact': + case 'auto_compaction': { + const removedCount = + (msg.removed_message_count as number) ?? + (msg.removedCount as number) ?? + 0 + events.push({ + type: 'auto_compaction', + removed_message_count: removedCount, + }) + break + } + + // ── Task lifecycle ──────────────────────────────────────── + case 'task_created': { + events.push({ + type: 'task_created', + task_list_id: (msg.task_list_id as string) || 'default', + task: (msg.task as Record) || {}, + }) + break + } + + case 'task_updated': { + events.push({ + type: 'task_updated', + task_list_id: (msg.task_list_id as string) || 'default', + task: (msg.task as Record) || {}, + }) + break + } + + case 'task_stopped': { + events.push({ + type: 'task_stopped', + task: (msg.task as Record) || {}, + }) + break + } + + // ── Agent lifecycle ─────────────────────────────────────── + case 'agent_spawned': { + events.push({ + type: 'agent_spawned', + agent: (msg.agent as Record) || {}, + }) + break + } + + // ── Team lifecycle ──────────────────────────────────────── + case 'team_created': { + events.push({ + type: 'team_created', + team: { + team: (msg.team as Record) || {}, + task_list_id: (msg.task_list_id as string) || 'default', + team_file_path: (msg.team_file_path as string) || '', + }, + }) + break + } + + case 'team_deleted': { + events.push({ + type: 'team_deleted', + team_name: msg.team_name as string, + }) + break + } + + // ── Mailbox ─────────────────────────────────────────────── + case 'mailbox_message': { + const m = (msg.message as Record) ?? msg + events.push({ + type: 'mailbox_message', + message: { + team_name: (m.team_name as string) || '', + sender: (m.sender as string) || '', + count: (m.count as number) || 1, + recipients: m.recipients as string[] | undefined, + summary: m.summary as string | undefined, + }, + }) + break + } + + // ── Unknown — skip silently ─────────────────────────────── + default: + break + } + + return events + } + + /** Build the terminal 'completed' event. */ + buildCompletedEvent(): WorkerTurnEvent { + return { + type: 'completed', + final_text: this.accumulatedText, + iterations: this.iterations, + input_tokens: this.inputTokens, + output_tokens: this.outputTokens, + generated_files: this.generatedFiles, + } + } + + /** Build a terminal 'failed' event. */ + buildFailedEvent(message: string): WorkerTurnEvent { + return { + type: 'failed', + message, + } + } +} diff --git a/ts-worker/main.ts b/ts-worker/main.ts new file mode 100644 index 0000000..fa1ecfe --- /dev/null +++ b/ts-worker/main.ts @@ -0,0 +1,59 @@ +#!/usr/bin/env bun +/** + * Worker entrypoint — reads CLAW_WORKER_* env vars and starts the + * Bun HTTP server that speaks the worker protocol. + * + * Usage: + * CLAW_WORKER_BIND_ADDR=0.0.0.0 \ + * CLAW_WORKER_PORT=8780 \ + * CLAW_WORKER_AUTH_TOKEN=secret \ + * CLAW_WORKER_PROFILE_ID=default \ + * CLAW_WORKER_STATE_ROOT=/state \ + * CLAW_WORKER_DEFAULT_CWD=/workspace \ + * CLAW_WORKER_MODEL=claude-sonnet-4-6 \ + * CLAW_WORKER_PERMISSION_MODE=default \ + * CLAUDE_CONFIG_DIR=/state/claude-home \ + * bun src/worker/main.ts + */ + +import { startWorkerServer } from './server.js' + +// The gateway passes CLAW_WORKER_BIND_ADDR as "host:port" (e.g. "0.0.0.0:8080"). +// Parse both address and port from it, with CLAW_WORKER_PORT as override. +const rawBindAddr = process.env.CLAW_WORKER_BIND_ADDR ?? '0.0.0.0:8080' +const colonIdx = rawBindAddr.lastIndexOf(':') +const bindAddr = colonIdx > 0 ? rawBindAddr.slice(0, colonIdx) : rawBindAddr +const port = parseInt( + process.env.CLAW_WORKER_PORT ?? (colonIdx > 0 ? rawBindAddr.slice(colonIdx + 1) : '8080'), + 10, +) +const authToken = process.env.CLAW_WORKER_AUTH_TOKEN +const profileId = process.env.CLAW_WORKER_PROFILE_ID ?? 'default' +const stateRoot = process.env.CLAW_WORKER_STATE_ROOT ?? '/tmp/claw-worker-state' +const defaultCwd = process.env.CLAW_WORKER_DEFAULT_CWD ?? process.cwd() +const model = process.env.CLAW_WORKER_MODEL ?? 'claude-sonnet-4-6' +const permissionMode = process.env.CLAW_WORKER_PERMISSION_MODE ?? 'default' +const claudeConfigDir = process.env.CLAUDE_CONFIG_DIR ?? `${stateRoot}/claude-home` + +console.log('[worker] starting with config:') +console.log(` bind_addr: ${bindAddr}`) +console.log(` port: ${port}`) +console.log(` profile_id: ${profileId}`) +console.log(` state_root: ${stateRoot}`) +console.log(` default_cwd: ${defaultCwd}`) +console.log(` model: ${model}`) +console.log(` permission_mode: ${permissionMode}`) +console.log(` claude_config: ${claudeConfigDir}`) +console.log(` auth: ${authToken ? 'configured' : 'none'}`) + +await startWorkerServer({ + bindAddr, + port, + authToken, + profileId, + stateRoot, + defaultCwd, + model, + permissionMode, + claudeConfigDir, +}) diff --git a/ts-worker/permissions/ApprovalBroker.ts b/ts-worker/permissions/ApprovalBroker.ts new file mode 100644 index 0000000..a27a4ca --- /dev/null +++ b/ts-worker/permissions/ApprovalBroker.ts @@ -0,0 +1,211 @@ +/** + * ApprovalBroker — manages tool permission decisions for the worker. + * + * Integrates with QueryEngine's `canUseTool` callback to suspend + * tool execution until the gateway (and ultimately the Telegram user) + * sends an approval decision. + * + * Supports: + * - approve_once: allow this specific tool call + * - approve_tool_for_session: auto-approve this tool name for the session + * - approve_all_for_session: auto-approve everything for the session + * - deny: reject with reason + * - cancel_turn: abort the entire turn + * + * The broker also maintains a background approval queue for approvals + * that arrive from subagent/background tasks (not the foreground turn). + */ + +import { randomUUID } from 'crypto' +import type { ApprovalDecision } from '../protocol.js' + +export type ApprovalRequest = { + approval_id: string + tool_name: string + input: string + current_mode: string + required_mode: string + reason?: string + /** Resolve function — called when the gateway sends a decision */ + resolve: (decision: ApprovalDecision) => void + /** Timestamp for expiry/gc */ + created_at: number +} + +export type PermissionBehavior = + | { behavior: 'allow' } + | { behavior: 'deny'; reason: string } + | { behavior: 'ask'; message?: string } + +export class ApprovalBroker { + /** Tools auto-approved for the entire session */ + private sessionApprovedTools = new Set() + /** If true, all tools are auto-approved for the session */ + private approveAllForSession = false + /** Current permission mode (default, plan, etc.) */ + private permissionMode: string + + /** Foreground pending approval (only one at a time — engine blocks) */ + private pendingForeground: ApprovalRequest | null = null + + /** Background approvals queue (subagent tasks, etc.) */ + private backgroundApprovals = new Map() + /** Background approvals that have been notified to the user */ + private notifiedBackgroundApprovals = new Set() + + /** Callback to emit SSE events to the current turn */ + private onApprovalRequested?: (request: ApprovalRequest) => void + + constructor(permissionMode: string) { + this.permissionMode = permissionMode + } + + setApprovalRequestHandler( + handler: (request: ApprovalRequest) => void, + ): void { + this.onApprovalRequested = handler + } + + /** + * canUseTool callback for QueryEngine. + * + * Returns immediately if the tool is auto-approved. Otherwise + * creates a pending approval request and suspends until the + * gateway resolves it. + */ + async canUseTool( + toolName: string, + input: unknown, + ): Promise { + // Auto-approve if session-wide approval is active + if (this.approveAllForSession) { + return { behavior: 'allow' } + } + + // Auto-approve if this specific tool was approved for the session + if (this.sessionApprovedTools.has(toolName)) { + return { behavior: 'allow' } + } + + // Need to ask — create a pending approval and block + const approvalId = randomUUID() + const inputStr = + typeof input === 'string' ? input : JSON.stringify(input) + + return new Promise((resolve) => { + const request: ApprovalRequest = { + approval_id: approvalId, + tool_name: toolName, + input: inputStr, + current_mode: this.permissionMode, + required_mode: 'confirm', + created_at: Date.now(), + resolve: (decision: ApprovalDecision) => { + this.pendingForeground = null + resolve(this.applyDecision(decision, toolName)) + }, + } + + this.pendingForeground = request + this.onApprovalRequested?.(request) + }) + } + + /** + * Resolve a foreground approval (from /v1/turns/:turn_id/approval). + */ + resolveForeground( + approvalId: string, + decision: ApprovalDecision, + ): boolean { + if (!this.pendingForeground) return false + if (this.pendingForeground.approval_id !== approvalId) return false + this.pendingForeground.resolve(decision) + return true + } + + /** + * Submit a background approval request (from subagent/background task). + */ + addBackgroundApproval( + toolName: string, + input: unknown, + ): { approvalId: string; promise: Promise } { + const approvalId = randomUUID() + const inputStr = + typeof input === 'string' ? input : JSON.stringify(input) + + const promise = new Promise((resolve) => { + const request: ApprovalRequest = { + approval_id: approvalId, + tool_name: toolName, + input: inputStr, + current_mode: this.permissionMode, + required_mode: 'confirm', + created_at: Date.now(), + resolve: (decision: ApprovalDecision) => { + this.backgroundApprovals.delete(approvalId) + this.notifiedBackgroundApprovals.delete(approvalId) + resolve(this.applyDecision(decision, toolName)) + }, + } + this.backgroundApprovals.set(approvalId, request) + }) + + return { approvalId, promise } + } + + /** + * Resolve a background approval (from /v1/background-approvals/:id). + */ + resolveBackground( + approvalId: string, + decision: ApprovalDecision, + ): boolean { + const request = this.backgroundApprovals.get(approvalId) + if (!request) return false + request.resolve(decision) + return true + } + + markBackgroundNotified(approvalId: string): void { + this.notifiedBackgroundApprovals.add(approvalId) + } + + getBackgroundApprovals(): ApprovalRequest[] { + return Array.from(this.backgroundApprovals.values()) + } + + getPendingForeground(): ApprovalRequest | null { + return this.pendingForeground + } + + /** Reset session-scoped state (on /v1/session/reset). */ + reset(): void { + this.sessionApprovedTools.clear() + this.approveAllForSession = false + this.pendingForeground = null + // Note: background approvals are NOT cleared on session reset — + // they belong to background tasks that may still be running. + } + + private applyDecision( + decision: ApprovalDecision, + toolName: string, + ): PermissionBehavior { + switch (decision.decision) { + case 'approve_once': + return { behavior: 'allow' } + case 'approve_tool_for_session': + this.sessionApprovedTools.add(toolName) + return { behavior: 'allow' } + case 'approve_all_for_session': + this.approveAllForSession = true + return { behavior: 'allow' } + case 'deny': + return { behavior: 'deny', reason: decision.reason } + case 'cancel_turn': + return { behavior: 'deny', reason: 'Turn cancelled by user' } + } + } +} diff --git a/ts-worker/protocol.ts b/ts-worker/protocol.ts new file mode 100644 index 0000000..c18e8a9 --- /dev/null +++ b/ts-worker/protocol.ts @@ -0,0 +1,203 @@ +/** + * Worker protocol types — mirrors worker-protocol/openapi.yaml. + * + * These are the wire types the Rust gateway expects. The TS worker + * translates internal QueryEngine / SDKMessage events into these shapes + * before sending them over SSE. + */ + +// ── Turn request ────────────────────────────────────────────────────── + +export type TurnSource = { + channel: string + sender_id: string + chat_id?: string + display_name?: string +} + +export type InboundAttachment = { + file_name: string + kind: 'photo' | 'document' + media_type?: string + data_base64: string +} + +export type WorkerTurnRequest = { + prompt: string + source: TurnSource + attachments?: InboundAttachment[] +} + +export type WorkerTurnAccepted = { + turn_id: string +} + +// ── SSE events ──────────────────────────────────────────────────────── + +export type AssistantTextDelta = { + type: 'assistant_text_delta' + delta: string +} + +export type ToolUseEvent = { + type: 'tool_use' + id: string + name: string + input: string +} + +export type ToolResultEvent = { + type: 'tool_result' + tool_use_id: string + tool_name: string + output: string + is_error: boolean +} + +export type ApprovalRequestPayload = { + approval_id: string + tool_name: string + input: string + current_mode: string + required_mode: string + reason?: string +} + +export type ApprovalRequestedEvent = { + type: 'approval_requested' + request: ApprovalRequestPayload +} + +export type AutoCompactionEvent = { + type: 'auto_compaction' + removed_message_count: number +} + +export type GeneratedFileDescriptor = { + file_id: string + file_name: string + media_type?: string + size_bytes: number + is_image: boolean +} + +export type CompletedEvent = { + type: 'completed' + final_text: string + iterations: number + input_tokens: number + output_tokens: number + generated_files: GeneratedFileDescriptor[] +} + +export type FailedEvent = { + type: 'failed' + message: string +} + +export type TaskCreatedEvent = { + type: 'task_created' + task_list_id: string + task: TaskListRecord +} + +export type TaskUpdatedEvent = { + type: 'task_updated' + task_list_id: string + task: TaskListRecord +} + +export type TaskStoppedEvent = { + type: 'task_stopped' + task: RuntimeTaskRecord +} + +export type AgentSpawnedEvent = { + type: 'agent_spawned' + agent: RuntimeTaskRecord +} + +export type TeamCreatedEvent = { + type: 'team_created' + team: WorkerTeamCreatedPayload +} + +export type TeamDeletedEvent = { + type: 'team_deleted' + team_name: string +} + +export type MailboxMessageEvent = { + type: 'mailbox_message' + message: WorkerMailboxMessagePayload +} + +export type WorkerTurnEvent = + | AssistantTextDelta + | ToolUseEvent + | ToolResultEvent + | ApprovalRequestedEvent + | AutoCompactionEvent + | TaskCreatedEvent + | TaskUpdatedEvent + | TaskStoppedEvent + | AgentSpawnedEvent + | TeamCreatedEvent + | TeamDeletedEvent + | MailboxMessageEvent + | CompletedEvent + | FailedEvent + +// ── Approval decision ───────────────────────────────────────────────── + +export type ApprovalDecision = + | { decision: 'approve_once' } + | { decision: 'approve_tool_for_session' } + | { decision: 'approve_all_for_session' } + | { decision: 'cancel_turn' } + | { decision: 'deny'; reason: string } + +export type TurnApprovalRequest = { + approval_id: string + decision: ApprovalDecision +} + +// ── Domain records (opaque pass-through for now) ────────────────────── + +export type TaskListRecord = Record +export type RuntimeTaskRecord = Record +export type TeamRecord = Record +export type MailboxMessage = Record +export type MailboxSummary = Record +export type BackgroundApprovalRecord = Record +export type FeedItemRecord = Record +export type LibraryAppRecord = Record +export type LibraryAppVersionRecord = Record +export type AppWorkspaceRecord = Record + +// ── Composite response types ────────────────────────────────────────── + +export type WorkerStatusResponse = { + profile_id: string + message_count: number + model: string + permission_mode: string + default_cwd: string + busy: boolean + task_list_id: string + active_team?: string +} + +export type WorkerTeamCreatedPayload = { + team: TeamRecord + task_list_id: string + team_file_path: string +} + +export type WorkerMailboxMessagePayload = { + team_name: string + sender: string + count: number + recipients?: string[] + summary?: string +} diff --git a/ts-worker/server.ts b/ts-worker/server.ts new file mode 100644 index 0000000..24347b9 --- /dev/null +++ b/ts-worker/server.ts @@ -0,0 +1,500 @@ +/** + * Worker HTTP server — Bun.serve implementation of the worker protocol. + * + * Implements every endpoint from worker-protocol/openapi.yaml so the + * Rust gateway can talk to this TS worker exactly as it talked to the + * old Rust worker. + */ + +import type { Server } from 'bun' +import { TurnManager } from './turnManager.js' +import type { + TurnApprovalRequest, + WorkerStatusResponse, + WorkerTurnEvent, + WorkerTurnRequest, +} from './protocol.js' +import { ApprovalBroker } from './permissions/ApprovalBroker.js' +import { TelegramWorkerEngine } from './engine/TelegramWorkerEngine.js' +import { TaskStore } from './state/TaskStore.js' +import { TeamStore } from './state/TeamStore.js' +import { FeedStore } from './state/FeedStore.js' +import { AppLibraryStore } from './state/AppLibraryStore.js' +import type { AppPackageRequest, AppPublishRequest } from './state/records.js' + +export type WorkerServerConfig = { + bindAddr: string + port: number + authToken?: string + profileId: string + model: string + permissionMode: string + defaultCwd: string + stateRoot: string + claudeConfigDir: string +} + +/** + * WorkerState — runtime state shared across all request handlers. + */ +type WorkerState = { + config: WorkerServerConfig + turnManager: TurnManager + broker: ApprovalBroker + engine: TelegramWorkerEngine + taskStore: TaskStore + teamStore: TeamStore + feedStore: FeedStore + appStore: AppLibraryStore + generatedFiles: Map +} + +function checkAuth(req: Request, state: WorkerState): Response | null { + const token = state.config.authToken + if (!token) return null + const auth = req.headers.get('authorization') + if (auth !== `Bearer ${token}`) { + return new Response('Unauthorized', { status: 401 }) + } + return null +} + +function json(data: unknown, status = 200): Response { + return new Response(JSON.stringify(data), { + status, + headers: { 'content-type': 'application/json' }, + }) +} + +function noContent(): Response { + return new Response(null, { status: 204 }) +} + +function notFound(msg = 'Not found'): Response { + return new Response(JSON.stringify({ error: msg }), { + status: 404, + headers: { 'content-type': 'application/json' }, + }) +} + +async function handleRequest( + req: Request, + state: WorkerState, +): Promise { + const authErr = checkAuth(req, state) + if (authErr) return authErr + + const url = new URL(req.url) + const path = url.pathname + const method = req.method + + // ── Health ──────────────────────────────────────────────────────── + if (method === 'GET' && path === '/healthz') { + return new Response('ok') + } + + // ── Status ──────────────────────────────────────────────────────── + if (method === 'GET' && path === '/v1/status') { + const status: WorkerStatusResponse = { + profile_id: state.config.profileId, + message_count: state.engine.messageCount, + model: state.config.model, + permission_mode: state.config.permissionMode, + default_cwd: state.config.defaultCwd, + busy: state.turnManager.isBusy, + task_list_id: state.taskStore.getTaskListId(), + } + return json(status) + } + + // ── Session reset ───────────────────────────────────────────────── + if (method === 'POST' && path === '/v1/session/reset') { + state.engine.resetSession() + state.taskStore.reset() + state.teamStore.reset() + return noContent() + } + + // ── Turns ───────────────────────────────────────────────────────── + if (method === 'POST' && path === '/v1/turns') { + const body = (await req.json()) as WorkerTurnRequest + const turn = state.turnManager.create(body) + + // Wire approval broker to emit SSE events on this turn + state.broker.setApprovalRequestHandler((request) => { + state.turnManager.emit(turn.id, { + type: 'approval_requested', + request: { + approval_id: request.approval_id, + tool_name: request.tool_name, + input: request.input, + current_mode: request.current_mode, + required_mode: request.required_mode, + reason: request.reason, + }, + }) + }) + + // Drive the turn through the engine + queueMicrotask(async () => { + try { + for await (const event of state.engine.executeTurn( + body.prompt, + turn.abortController, + )) { + state.turnManager.emit(turn.id, event) + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + state.turnManager.emit(turn.id, { + type: 'failed', + message, + }) + } + }) + + return json({ turn_id: turn.id }) + } + + // ── Turn events (SSE) ───────────────────────────────────────────── + const turnEventsMatch = path.match(/^\/v1\/turns\/([^/]+)\/events$/) + if (method === 'GET' && turnEventsMatch) { + const turnId = turnEventsMatch[1]! + const turn = state.turnManager.get(turnId) + if (!turn) return notFound('Turn not found') + + return new Response( + new ReadableStream({ + start(controller) { + const encoder = new TextEncoder() + let closed = false + const send = (event: WorkerTurnEvent) => { + if (closed) return + const data = JSON.stringify(event) + controller.enqueue(encoder.encode(`data: ${data}\n\n`)) + if (event.type === 'completed' || event.type === 'failed') { + closed = true + controller.close() + } + } + + const unsub = state.turnManager.subscribe(turnId, send) + if (unsub === null && !closed) { + closed = true + controller.close() + } + }, + }), + { + headers: { + 'content-type': 'text/event-stream', + 'cache-control': 'no-cache', + connection: 'keep-alive', + }, + }, + ) + } + + // ── Turn approval ───────────────────────────────────────────────── + const turnApprovalMatch = path.match(/^\/v1\/turns\/([^/]+)\/approval$/) + if (method === 'POST' && turnApprovalMatch) { + const body = (await req.json()) as TurnApprovalRequest + // Route to broker (foreground approval) + const ok = state.broker.resolveForeground( + body.approval_id, + body.decision, + ) + if (!ok) { + // Try turn manager fallback + const tmOk = state.turnManager.resolveApproval( + turnApprovalMatch[1]!, + body.approval_id, + body.decision, + ) + if (!tmOk) return notFound('No pending approval') + } + return noContent() + } + + // ── Turn cancel ─────────────────────────────────────────────────── + const turnCancelMatch = path.match(/^\/v1\/turns\/([^/]+)\/cancel$/) + if (method === 'POST' && turnCancelMatch) { + state.turnManager.cancel(turnCancelMatch[1]!) + return noContent() + } + + // ── Generated files ─────────────────────────────────────────────── + const fileMatch = path.match(/^\/v1\/turns\/([^/]+)\/files\/([^/]+)$/) + if (method === 'GET' && fileMatch) { + const fileId = fileMatch[2]! + const file = state.generatedFiles.get(fileId) + if (!file) return notFound('File not found') + return new Response(file.data, { + headers: { 'content-type': file.mediaType }, + }) + } + + // ── Tasks ───────────────────────────────────────────────────────── + if (method === 'GET' && path === '/v1/tasks') { + return json({ + task_list_id: state.taskStore.getTaskListId(), + tasks: state.taskStore.listTasks(), + }) + } + + const taskMatch = path.match(/^\/v1\/tasks\/([^/]+)$/) + if (method === 'GET' && taskMatch) { + const task = state.taskStore.getTask(taskMatch[1]!) + const runtimeTask = state.taskStore.getRuntimeTask(taskMatch[1]!) + if (!task && !runtimeTask) return notFound('Task not found') + return json({ task, runtime_task: runtimeTask }) + } + + const taskStopMatch = path.match(/^\/v1\/tasks\/([^/]+)\/stop$/) + if (method === 'POST' && taskStopMatch) { + state.taskStore.stopRuntimeTask(taskStopMatch[1]!) + return noContent() + } + + // ── Team ────────────────────────────────────────────────────────── + if (method === 'GET' && path === '/v1/team') { + return json({ + team: state.teamStore.getTeam(), + task_list_id: state.taskStore.getTaskListId(), + }) + } + + // ── Agents ──────────────────────────────────────────────────────── + if (method === 'GET' && path === '/v1/agents') { + return json({ agents: state.taskStore.listAgents() }) + } + + const agentMatch = path.match(/^\/v1\/agents\/([^/]+)$/) + if (method === 'GET' && agentMatch) { + const agent = state.taskStore.getAgent(agentMatch[1]!) + if (!agent) return notFound('Agent not found') + return json(agent) + } + + // ── Background approvals ────────────────────────────────────────── + if (method === 'GET' && path === '/v1/background-approvals') { + const approvals = state.broker.getBackgroundApprovals().map((a) => ({ + approval_id: a.approval_id, + task_id: '', + tool_name: a.tool_name, + input: a.input, + current_mode: a.current_mode, + required_mode: a.required_mode, + reason: a.reason, + created_at: a.created_at, + notified: false, + })) + return json({ approvals }) + } + + const bgApprovalMatch = path.match(/^\/v1\/background-approvals\/([^/]+)$/) + if (method === 'POST' && bgApprovalMatch) { + const body = (await req.json()) as TurnApprovalRequest + const ok = state.broker.resolveBackground( + bgApprovalMatch[1]!, + body.decision, + ) + if (!ok) return notFound('No pending background approval') + return noContent() + } + + const bgApprovalNotifiedMatch = path.match( + /^\/v1\/background-approvals\/([^/]+)\/notified$/, + ) + if (method === 'POST' && bgApprovalNotifiedMatch) { + state.broker.markBackgroundNotified(bgApprovalNotifiedMatch[1]!) + return noContent() + } + + // ── Mailbox ─────────────────────────────────────────────────────── + if (method === 'GET' && path === '/v1/mailbox') { + return json({ mailbox: state.teamStore.getMailboxSummary() }) + } + + const mailboxPendingMatch = path.match(/^\/v1\/mailbox\/pending\/([^/]+)$/) + if (method === 'GET' && mailboxPendingMatch) { + const messages = state.teamStore.getPendingMessages( + mailboxPendingMatch[1]!, + ) + return json({ messages }) + } + + // ── Feed ────────────────────────────────────────────────────────── + if (method === 'GET' && path === '/v1/feed') { + const turnId = url.searchParams.get('turn_id') ?? undefined + return json({ + items: state.feedStore.listItems(turnId), + state_root: state.config.stateRoot, + }) + } + + const feedItemMatch = path.match(/^\/v1\/feed\/([^/]+)$/) + if (method === 'GET' && feedItemMatch) { + const item = state.feedStore.getItem(feedItemMatch[1]!) + if (!item) return notFound('Feed item not found') + return json({ item, state_root: state.config.stateRoot }) + } + + const feedFileMatch = path.match(/^\/v1\/feed\/([^/]+)\/file$/) + if (method === 'GET' && feedFileMatch) { + const payload = await state.feedStore.getPayload(feedFileMatch[1]!) + if (!payload) return notFound('Feed file not found') + const item = state.feedStore.getItem(feedFileMatch[1]!) + return new Response(payload, { + headers: { + 'content-type': item?.media_type || 'application/octet-stream', + }, + }) + } + + const feedMakeAppMatch = path.match(/^\/v1\/feed\/([^/]+)\/make-app$/) + if (method === 'POST' && feedMakeAppMatch) { + const body = (await req.json()) as AppPackageRequest + const feedItem = state.feedStore.getItem(feedMakeAppMatch[1]!) + if (!feedItem) return notFound('Feed item not found') + const payload = await state.feedStore.getPayload(feedMakeAppMatch[1]!) + const result = await state.appStore.packageFromFeedItem( + body, + payload, + feedItem.title, + feedItem.kind, + ) + return json({ result }) + } + + // ── Apps ─────────────────────────────────────────────────────────── + if (method === 'GET' && path === '/v1/apps') { + return json({ + apps: state.appStore.listApps(), + state_root: state.config.stateRoot, + }) + } + + const appMatch = path.match(/^\/v1\/apps\/([^/]+)$/) + if (method === 'GET' && appMatch) { + const app = state.appStore.getApp(appMatch[1]!) + if (!app) return notFound('App not found') + return json({ app, state_root: state.config.stateRoot }) + } + + const appVersionMatch = path.match(/^\/v1\/apps\/([^/]+)\/version$/) + if (method === 'GET' && appVersionMatch) { + const version = state.appStore.getVersion(appVersionMatch[1]!) + if (!version) return notFound('App not found') + return json({ version }) + } + + const appHistoryMatch = path.match(/^\/v1\/apps\/([^/]+)\/history$/) + if (method === 'GET' && appHistoryMatch) { + const history = await state.appStore.getHistory(appHistoryMatch[1]!) + return json({ history }) + } + + const appLaunchMatch = path.match(/^\/v1\/apps\/([^/]+)\/launch$/) + if (method === 'POST' && appLaunchMatch) { + const app = state.appStore.launch(appLaunchMatch[1]!) + if (!app) return notFound('App not found') + return json({ app, state_root: state.config.stateRoot }) + } + + const appWorkspaceMatch = path.match(/^\/v1\/apps\/([^/]+)\/open-workspace$/) + if (method === 'POST' && appWorkspaceMatch) { + const workspace = await state.appStore.getWorkspace(appWorkspaceMatch[1]!) + if (!workspace) return notFound('App not found') + return json({ workspace }) + } + + const appPublishMatch = path.match(/^\/v1\/apps\/([^/]+)\/publish$/) + if (method === 'POST' && appPublishMatch) { + const body = (await req.json()) as AppPublishRequest + const result = await state.appStore.publish(appPublishMatch[1]!, body.message) + if (!result) return notFound('App not found') + return json({ result }) + } + + const appArchiveMatch = path.match(/^\/v1\/apps\/([^/]+)\/archive$/) + if (method === 'POST' && appArchiveMatch) { + state.appStore.archive(appArchiveMatch[1]!) + return noContent() + } + + const appWorktreeMatch = path.match(/^\/v1\/apps\/([^/]+)\/worktree-status$/) + if (method === 'GET' && appWorktreeMatch) { + const workspace = await state.appStore.getWorkspace(appWorktreeMatch[1]!) + return json({ workspace }) + } + + const appFileMatch = path.match(/^\/v1\/apps\/([^/]+)\/files\/(.+)$/) + if (method === 'GET' && appFileMatch) { + const data = await state.appStore.getAppFile( + appFileMatch[1]!, + appFileMatch[2]!, + ) + if (!data) return notFound('App file not found') + return new Response(data, { + headers: { 'content-type': 'application/octet-stream' }, + }) + } + + return notFound(`No route: ${method} ${path}`) +} + +export async function startWorkerServer( + config: WorkerServerConfig, +): Promise { + // Build all runtime components + const broker = new ApprovalBroker(config.permissionMode) + const engine = new TelegramWorkerEngine(config, broker) + const taskStore = new TaskStore() + const teamStore = new TeamStore() + const feedStore = new FeedStore(config.stateRoot) + const appStore = new AppLibraryStore(config.stateRoot) + + // Initialize engine (may fall back to stub mode) + const engineReady = await engine.init() + console.log( + `[worker] engine: ${engineReady ? 'ready' : 'stub mode'}`, + ) + + // Initialize persistent stores + await feedStore.init() + await appStore.init() + + const state: WorkerState = { + config, + turnManager: new TurnManager(), + broker, + engine, + taskStore, + teamStore, + feedStore, + appStore, + generatedFiles: new Map(), + } + + const server = Bun.serve({ + hostname: config.bindAddr, + port: config.port, + async fetch(req) { + try { + return await handleRequest(req, state) + } catch (err) { + console.error('[worker] unhandled error:', err) + return new Response( + JSON.stringify({ error: 'Internal server error' }), + { status: 500, headers: { 'content-type': 'application/json' } }, + ) + } + }, + }) + + console.log( + `[worker] listening on ${config.bindAddr}:${config.port} (profile=${config.profileId})`, + ) + return server +} diff --git a/ts-worker/state/AppLibraryStore.ts b/ts-worker/state/AppLibraryStore.ts new file mode 100644 index 0000000..4f04f4f --- /dev/null +++ b/ts-worker/state/AppLibraryStore.ts @@ -0,0 +1,334 @@ +/** + * AppLibraryStore — git-backed app library. + * + * Each app is a git repo under {stateRoot}/claw/apps/{app_id}/. + * The manifest and bundle are stored in the repo root. + * + * Operations: + * - packageFromFeedItem: create or update an app from a feed item + * - openWorkspace: return the worktree state for editing + * - publish: commit + snapshot as published version + * - archive: mark app as archived + * - launch: bump last_launched_at + * - history: list git commits + */ + +import { randomUUID } from 'crypto' +import { readFile, writeFile, mkdir, readdir, stat, access } from 'fs/promises' +import { join } from 'path' +import { execSync } from 'child_process' +import type { + LibraryAppRecord, + LibraryAppManifestV2, + AppBundleManifestV1, + LibraryAppVersionRecord, + AppWorkspaceRecord, + AppHistoryEntry, + AppHistoryResponse, + AppPackageRequest, + AppPackageResult, + AppPublishResult, + AppKind, +} from './records.js' + +export class AppLibraryStore { + private apps = new Map() + private stateRoot: string + private appsDir: string + + constructor(stateRoot: string) { + this.stateRoot = stateRoot + this.appsDir = join(stateRoot, 'claw', 'apps') + } + + async init(): Promise { + await mkdir(this.appsDir, { recursive: true }) + // Scan existing app directories + try { + const entries = await readdir(this.appsDir, { withFileTypes: true }) + for (const entry of entries) { + if (!entry.isDirectory()) continue + try { + const manifestPath = join(this.appsDir, entry.name, 'manifest.json') + const data = await readFile(manifestPath, 'utf8') + const manifest = JSON.parse(data) as LibraryAppManifestV2 + + const bundlePath = join(this.appsDir, entry.name, 'bundle.json') + const bundleData = await readFile(bundlePath, 'utf8') + const bundle = JSON.parse(bundleData) as AppBundleManifestV1 + + const versionPath = join(this.appsDir, entry.name, 'version.json') + const versionData = await readFile(versionPath, 'utf8') + const version = JSON.parse(versionData) as LibraryAppVersionRecord + + this.apps.set(manifest.app_id, { manifest, bundle_manifest: bundle, current_version: version }) + } catch { + // Skip malformed app directories + } + } + } catch { + // No apps dir yet + } + } + + getApp(appId: string): LibraryAppRecord | undefined { + return this.apps.get(appId) + } + + listApps(): LibraryAppRecord[] { + return Array.from(this.apps.values()) + .filter((a) => a.manifest.visibility !== 'archived') + } + + getVersion(appId: string): LibraryAppVersionRecord | undefined { + return this.apps.get(appId)?.current_version + } + + async packageFromFeedItem( + req: AppPackageRequest, + feedPayload: Uint8Array | null, + feedTitle: string, + feedKind: string, + ): Promise { + const appId = req.requested_app_id || randomUUID() + const existing = this.apps.get(appId) + const now = Date.now() + const versionId = randomUUID() + + const appDir = join(this.appsDir, appId) + await mkdir(appDir, { recursive: true }) + + // Init git repo if needed + try { + await access(join(appDir, '.git')) + } catch { + execSync('git init', { cwd: appDir }) + } + + // Determine app kind from feed item kind + const kind: AppKind = + feedKind === 'html_preview' ? 'html_page' : 'html_page' + + const entryFile = 'index.html' + if (feedPayload) { + await writeFile(join(appDir, entryFile), feedPayload) + } + + const bundleManifest: AppBundleManifestV1 = { + schema_version: '1', + app_id: appId, + name: req.title || feedTitle, + title: req.title || feedTitle, + description: req.description, + kind, + entry: entryFile, + files: [entryFile], + capabilities: [], + tags: [], + } + + const manifest: LibraryAppManifestV2 = existing?.manifest + ? { + ...existing.manifest, + title: req.title || existing.manifest.title, + description: req.description || existing.manifest.description, + updated_at: now, + current_version_id: versionId, + } + : { + schema_version: '2', + app_id: appId, + title: req.title || feedTitle, + name: (req.title || feedTitle).toLowerCase().replace(/[^a-z0-9]+/g, '-'), + description: req.description, + visibility: 'family', + created_at: now, + updated_at: now, + last_launched_at: 0, + created_from_feed_item_id: req.feed_item_id, + current_version_id: versionId, + default_branch: 'main', + local_repo_path: appDir, + kind, + tags: [], + capabilities: [], + source_feed_item_ids: [req.feed_item_id], + } + + const version: LibraryAppVersionRecord = { + version_id: versionId, + created_at: now, + entry: entryFile, + files: [entryFile], + branch: 'main', + source_feed_item_ids: [req.feed_item_id], + } + + // Persist to disk + await writeFile(join(appDir, 'manifest.json'), JSON.stringify(manifest, null, 2)) + await writeFile(join(appDir, 'bundle.json'), JSON.stringify(bundleManifest, null, 2)) + await writeFile(join(appDir, 'version.json'), JSON.stringify(version, null, 2)) + + // Git commit + try { + execSync('git add -A && git commit -m "Package from feed item"', { + cwd: appDir, + }) + const commit = execSync('git rev-parse HEAD', { cwd: appDir }) + .toString() + .trim() + version.commit = commit + manifest.published_commit = commit + await writeFile(join(appDir, 'version.json'), JSON.stringify(version, null, 2)) + } catch { + // git commit may fail if nothing changed + } + + const record: LibraryAppRecord = { + manifest, + bundle_manifest: bundleManifest, + current_version: version, + } + this.apps.set(appId, record) + + return { app: record, created: !existing } + } + + async getWorkspace(appId: string): Promise { + const app = this.apps.get(appId) + if (!app) return null + + const appDir = join(this.appsDir, appId) + let headCommit = '' + let dirty = false + + try { + headCommit = execSync('git rev-parse HEAD', { cwd: appDir }) + .toString() + .trim() + const status = execSync('git status --porcelain', { cwd: appDir }) + .toString() + .trim() + dirty = status.length > 0 + } catch { + // Not a git repo + } + + return { + app_id: appId, + cwd: appDir, + branch: app.manifest.default_branch, + head_commit: headCommit, + dirty, + bundle_manifest: app.bundle_manifest, + } + } + + async publish(appId: string, message: string): Promise { + const app = this.apps.get(appId) + if (!app) return null + + const appDir = join(this.appsDir, appId) + const now = Date.now() + + try { + execSync(`git add -A && git commit -m ${JSON.stringify(message)}`, { + cwd: appDir, + }) + } catch { + // Nothing to commit + } + + let commit = '' + try { + commit = execSync('git rev-parse HEAD', { cwd: appDir }) + .toString() + .trim() + } catch { + // Not a git repo + } + + const versionId = randomUUID() + const version: LibraryAppVersionRecord = { + ...app.current_version, + version_id: versionId, + created_at: now, + published_at: now, + commit, + changelog_summary: message, + } + + app.manifest.current_version_id = versionId + app.manifest.published_commit = commit + app.manifest.updated_at = now + app.current_version = version + + await writeFile( + join(appDir, 'manifest.json'), + JSON.stringify(app.manifest, null, 2), + ) + await writeFile( + join(appDir, 'version.json'), + JSON.stringify(version, null, 2), + ) + + const workspace = await this.getWorkspace(appId) + + return { app, workspace: workspace! } + } + + archive(appId: string): boolean { + const app = this.apps.get(appId) + if (!app) return false + app.manifest.visibility = 'archived' + app.manifest.updated_at = Date.now() + return true + } + + launch(appId: string): LibraryAppRecord | null { + const app = this.apps.get(appId) + if (!app) return null + app.manifest.last_launched_at = Date.now() + return app + } + + async getHistory(appId: string): Promise { + const appDir = join(this.appsDir, appId) + const entries: AppHistoryEntry[] = [] + + try { + const log = execSync( + 'git log --format="%H|%s|%at" --max-count=50', + { cwd: appDir }, + ) + .toString() + .trim() + for (const line of log.split('\n')) { + if (!line) continue + const [commit, message, timestamp] = line.split('|') + entries.push({ + commit: commit!, + message: message!, + authored_at: parseInt(timestamp!, 10) * 1000, + }) + } + } catch { + // Not a git repo or empty + } + + return { entries } + } + + async getAppFile(appId: string, filePath: string): Promise { + const appDir = join(this.appsDir, appId) + try { + return await readFile(join(appDir, filePath)) as unknown as Uint8Array + } catch { + return null + } + } + + reset(): void { + this.apps.clear() + } +} diff --git a/ts-worker/state/FeedStore.ts b/ts-worker/state/FeedStore.ts new file mode 100644 index 0000000..eb8210d --- /dev/null +++ b/ts-worker/state/FeedStore.ts @@ -0,0 +1,121 @@ +/** + * FeedStore — manages feed items and their payload files. + * + * Feed items are non-text outputs from Claude (images, HTML previews, + * code files, diffs, etc.) that get surfaced in the Mini App. + */ + +import { randomUUID } from 'crypto' +import { readFile, writeFile, mkdir, readdir, stat } from 'fs/promises' +import { join, basename } from 'path' +import type { FeedItemRecord, FeedItemKind, FeedItemSource } from './records.js' + +export class FeedStore { + private items = new Map() + private stateRoot: string + private payloadDir: string + + constructor(stateRoot: string) { + this.stateRoot = stateRoot + this.payloadDir = join(stateRoot, 'claw', 'feed') + } + + async init(): Promise { + await mkdir(this.payloadDir, { recursive: true }) + // Load existing feed index if present + try { + const indexPath = join(this.payloadDir, 'index.json') + const data = await readFile(indexPath, 'utf8') + const records = JSON.parse(data) as FeedItemRecord[] + for (const r of records) { + this.items.set(r.feed_item_id, r) + } + } catch { + // No existing index — start fresh + } + } + + async addItem( + title: string, + kind: FeedItemKind, + source: FeedItemSource, + payload?: Uint8Array, + options?: { + mediaType?: string + fileName?: string + linkedAppId?: string + previewText?: string + }, + ): Promise { + const id = randomUUID() + const now = Date.now() + let storedPath: string | undefined + + if (payload) { + const ext = options?.fileName + ? basename(options.fileName) + : id + storedPath = join(this.payloadDir, ext) + await writeFile(storedPath, payload) + } + + const record: FeedItemRecord = { + feed_item_id: id, + title, + kind, + source, + linked_app_id: options?.linkedAppId, + media_type: options?.mediaType, + file_name: options?.fileName, + stored_path: storedPath, + preview_text: options?.previewText, + deleted: false, + created_at: now, + updated_at: now, + } + + this.items.set(id, record) + await this.persistIndex() + return record + } + + getItem(feedItemId: string): FeedItemRecord | undefined { + return this.items.get(feedItemId) + } + + listItems(turnId?: string): FeedItemRecord[] { + let items = Array.from(this.items.values()).filter((i) => !i.deleted) + if (turnId) { + items = items.filter((i) => i.source.turn_id === turnId) + } + return items.sort((a, b) => b.created_at - a.created_at) + } + + async getPayload(feedItemId: string): Promise { + const item = this.items.get(feedItemId) + if (!item?.stored_path) return null + try { + return await readFile(item.stored_path) as unknown as Uint8Array + } catch { + return null + } + } + + deleteItem(feedItemId: string): boolean { + const item = this.items.get(feedItemId) + if (!item) return false + item.deleted = true + item.updated_at = Date.now() + return true + } + + reset(): void { + this.items.clear() + } + + private async persistIndex(): Promise { + const indexPath = join(this.payloadDir, 'index.json') + const records = Array.from(this.items.values()) + await writeFile(indexPath, JSON.stringify(records, null, 2)) + } +} diff --git a/ts-worker/state/TaskStore.ts b/ts-worker/state/TaskStore.ts new file mode 100644 index 0000000..adfdf9b --- /dev/null +++ b/ts-worker/state/TaskStore.ts @@ -0,0 +1,82 @@ +/** + * TaskStore — manages task lists and runtime tasks. + * + * Tracks the todo-list style tasks (TaskListRecord) and the + * background runtime tasks (RuntimeTaskRecord) like agents and + * shell processes. + */ + +import type { + TaskListRecord, + TaskListStatus, + RuntimeTaskRecord, + RuntimeTaskStatus, +} from './records.js' + +export class TaskStore { + private taskListId = 'default' + private tasks = new Map() + private runtimeTasks = new Map() + + getTaskListId(): string { + return this.taskListId + } + + // ── Task list (todos) ───────────────────────────────────────────── + + upsertTask(task: TaskListRecord): void { + this.tasks.set(task.id, task) + } + + getTask(id: string): TaskListRecord | undefined { + return this.tasks.get(id) + } + + listTasks(): TaskListRecord[] { + return Array.from(this.tasks.values()) + } + + removeTask(id: string): boolean { + return this.tasks.delete(id) + } + + // ── Runtime tasks (agents, shells) ──────────────────────────────── + + upsertRuntimeTask(task: RuntimeTaskRecord): void { + this.runtimeTasks.set(task.task_id, task) + } + + getRuntimeTask(taskId: string): RuntimeTaskRecord | undefined { + return this.runtimeTasks.get(taskId) + } + + listRuntimeTasks(): RuntimeTaskRecord[] { + return Array.from(this.runtimeTasks.values()) + } + + listAgents(): RuntimeTaskRecord[] { + return this.listRuntimeTasks().filter((t) => t.kind === 'agent') + } + + getAgent(agentId: string): RuntimeTaskRecord | undefined { + for (const task of this.runtimeTasks.values()) { + if (task.kind === 'agent' && (task.agent_id === agentId || task.task_id === agentId)) { + return task + } + } + return undefined + } + + stopRuntimeTask(taskId: string): boolean { + const task = this.runtimeTasks.get(taskId) + if (!task || task.status !== 'running') return false + task.status = 'stopped' + task.completed_at = Date.now() + return true + } + + reset(): void { + this.tasks.clear() + this.runtimeTasks.clear() + } +} diff --git a/ts-worker/state/TeamStore.ts b/ts-worker/state/TeamStore.ts new file mode 100644 index 0000000..798f2a6 --- /dev/null +++ b/ts-worker/state/TeamStore.ts @@ -0,0 +1,59 @@ +/** + * TeamStore — manages team state and mailbox. + */ + +import type { + TeamRecord, + MailboxMessage, + MailboxSummary, +} from './records.js' + +export class TeamStore { + private team: TeamRecord | null = null + private messages: MailboxMessage[] = [] + + getTeam(): TeamRecord | null { + return this.team + } + + setTeam(team: TeamRecord): void { + this.team = team + } + + deleteTeam(teamName: string): boolean { + if (this.team?.team_name === teamName) { + this.team = { ...this.team, deleted: true } + return true + } + return false + } + + // ── Mailbox ─────────────────────────────────────────────────────── + + addMessage(msg: MailboxMessage): void { + this.messages.push(msg) + } + + getMailboxSummary(): MailboxSummary { + return { + team_name: this.team?.team_name, + recent_messages: this.messages.slice(-50), + } + } + + getPendingMessages(recipient: string): MailboxMessage[] { + return this.messages.filter( + (m) => m.recipient === recipient && !m.envelope.read, + ) + } + + markRead(messageId: string): void { + const msg = this.messages.find((m) => m.envelope.id === messageId) + if (msg) msg.envelope.read = true + } + + reset(): void { + this.team = null + this.messages = [] + } +} diff --git a/ts-worker/state/records.ts b/ts-worker/state/records.ts new file mode 100644 index 0000000..41ce78f --- /dev/null +++ b/ts-worker/state/records.ts @@ -0,0 +1,288 @@ +/** + * Protocol-owned record types — TS mirrors of the Rust records in + * channel-gateway-core/src/records.rs. + * + * These are the wire shapes the gateway expects. They replace the + * opaque Record placeholders in protocol.ts with + * real typed structures. + */ + +// ── Task list ─────────────────────────────────────────────────────── + +export type TaskListStatus = 'pending' | 'in_progress' | 'completed' + +export type TaskListRecord = { + id: string + subject: string + description: string + activeForm?: string + owner?: string + status: TaskListStatus + blocks: string[] + blockedBy: string[] + metadata?: Record + internal: boolean + createdAt: number + updatedAt: number +} + +// ── Runtime task ──────────────────────────────────────────────────── + +export type RuntimeTaskKind = 'agent' | 'shell' +export type RuntimeTaskStatus = 'running' | 'completed' | 'failed' | 'stopped' + +export type RuntimeTaskRecord = { + task_id: string + kind: RuntimeTaskKind + status: RuntimeTaskStatus + description: string + prompt: string + output_file?: string + exit_code_file?: string + final_result?: string + error?: string + exit_code?: number + notified: boolean + pid?: number + agent_id?: string + agent_name?: string + team_name?: string + cwd?: string + worktree_path?: string + worktree_branch?: string + created_at: number + started_at: number + completed_at?: number +} + +// ── Team ──────────────────────────────────────────────────────────── + +export type TeamMemberRecord = { + agent_id: string + name: string + agent_type?: string + model?: string + status?: string + joined_at: number +} + +export type TeamRecord = { + team_name: string + description?: string + agent_type?: string + lead_agent_id: string + created_at: number + updated_at: number + deleted: boolean + members: TeamMemberRecord[] +} + +// ── Mailbox ───────────────────────────────────────────────────────── + +export type MessageEnvelope = { + id: string + from: string + to: string + summary?: string + message: unknown + timestamp: number + read: boolean + notified: boolean +} + +export type MailboxMessage = { + recipient: string + envelope: MessageEnvelope +} + +export type MailboxSummary = { + team_name?: string + recent_messages: MailboxMessage[] +} + +// ── Background approval ───────────────────────────────────────────── + +export type BackgroundApprovalDecision = + | { decision: 'approve_once' } + | { decision: 'approve_tool_for_session' } + | { decision: 'approve_all_for_session' } + | { decision: 'deny'; reason: string } + | { decision: 'cancel_turn' } + +export type BackgroundApprovalRecord = { + approval_id: string + task_id: string + tool_name: string + input: string + current_mode: string + required_mode: string + reason?: string + created_at: number + notified: boolean + decision?: BackgroundApprovalDecision +} + +// ── Feed ──────────────────────────────────────────────────────────── + +export type FeedItemKind = + | 'markdown' + | 'code' + | 'json' + | 'diff' + | 'image' + | 'html_preview' + | 'table' + | 'binary' + | 'deleted_file' + | 'app_published' + +export type FeedItemSourceKind = 'generated' | 'workspace' | 'app_event' +export type FeedItemChangeKind = 'created' | 'modified' | 'deleted' + +export type FeedItemSource = { + source_kind: FeedItemSourceKind + turn_id?: string + source_file_id?: string + source_files: string[] + change_kind?: FeedItemChangeKind +} + +export type FeedItemRecord = { + feed_item_id: string + title: string + kind: FeedItemKind + source: FeedItemSource + linked_app_id?: string + media_type?: string + file_name?: string + stored_path?: string + preview_text?: string + deleted: boolean + created_at: number + updated_at: number +} + +// ── App library ───────────────────────────────────────────────────── + +export type AppKind = + | 'html_page' + | 'react_app' + | 'dashboard' + | 'document_experience' + | 'tool_app' + +export type AppVisibility = 'family' | 'admin' | 'archived' + +export type AppCapability = + | 'get_context' + | 'list_files' + | 'read_text' + | 'read_json' + | 'open_feed_item' + | 'open_app' + | 'publish_patch' + | 'request_action' + | 'emit_event' + +export type AppRepoLink = { + url: string + branch?: string + commit?: string +} + +export type LibraryAppManifestV2 = { + schema_version: string + app_id: string + title: string + name: string + description?: string + visibility: AppVisibility + created_at: number + updated_at: number + last_launched_at: number + created_from_feed_item_id?: string + current_version_id: string + published_commit?: string + default_branch: string + local_repo_path: string + remote_repo?: AppRepoLink + kind: AppKind + icon?: string + tags: string[] + capabilities: AppCapability[] + source_turn_id?: string + source_feed_item_ids: string[] +} + +export type AppBundleManifestV1 = { + schema_version: string + app_id: string + name: string + title: string + description?: string + kind: AppKind + entry: string + files: string[] + icon?: string + capabilities: AppCapability[] + tags: string[] +} + +export type LibraryAppVersionRecord = { + version_id: string + created_at: number + entry: string + files: string[] + branch: string + commit?: string + published_at?: number + source_turn_id?: string + source_feed_item_ids: string[] + changelog_summary?: string +} + +export type LibraryAppRecord = { + manifest: LibraryAppManifestV2 + bundle_manifest: AppBundleManifestV1 + current_version: LibraryAppVersionRecord +} + +export type AppWorkspaceRecord = { + app_id: string + cwd: string + branch: string + head_commit: string + dirty: boolean + bundle_manifest: AppBundleManifestV1 +} + +export type AppPackageRequest = { + feed_item_id: string + requested_app_id?: string + title?: string + description?: string +} + +export type AppPackageResult = { + app: LibraryAppRecord + created: boolean +} + +export type AppPublishRequest = { + message: string +} + +export type AppPublishResult = { + app: LibraryAppRecord + workspace: AppWorkspaceRecord +} + +export type AppHistoryEntry = { + commit: string + message: string + authored_at: number +} + +export type AppHistoryResponse = { + entries: AppHistoryEntry[] +} diff --git a/ts-worker/turnManager.ts b/ts-worker/turnManager.ts new file mode 100644 index 0000000..ff7f375 --- /dev/null +++ b/ts-worker/turnManager.ts @@ -0,0 +1,146 @@ +/** + * TurnManager — owns the lifecycle of a single user turn. + * + * Each turn gets a unique ID. The gateway: + * 1. POSTs /v1/turns → gets back { turn_id } + * 2. GETs /v1/turns/:turn_id/events → SSE stream of WorkerTurnEvent + * 3. POSTs /v1/turns/:turn_id/approval → resolves a pending tool approval + * 4. POSTs /v1/turns/:turn_id/cancel → aborts the turn + * + * Internally we hold: + * - a queue of WorkerTurnEvent waiting to be flushed to SSE clients + * - an optional pending approval (blocks the engine until resolved) + * - an AbortController so cancel works + */ + +import { randomUUID } from 'crypto' +import type { + ApprovalDecision, + WorkerTurnEvent, + WorkerTurnRequest, +} from './protocol.js' + +export type PendingApproval = { + approval_id: string + resolve: (decision: ApprovalDecision) => void +} + +export type Turn = { + id: string + request: WorkerTurnRequest + events: WorkerTurnEvent[] + /** SSE clients listening for events on this turn */ + listeners: Set<(event: WorkerTurnEvent) => void> + pendingApproval: PendingApproval | null + abortController: AbortController + done: boolean +} + +export class TurnManager { + private turns = new Map() + + create(request: WorkerTurnRequest): Turn { + const id = randomUUID() + const turn: Turn = { + id, + request, + events: [], + listeners: new Set(), + pendingApproval: null, + abortController: new AbortController(), + done: false, + } + this.turns.set(id, turn) + return turn + } + + get(turnId: string): Turn | undefined { + return this.turns.get(turnId) + } + + /** Push an event to the turn's log and broadcast to all SSE listeners. */ + emit(turnId: string, event: WorkerTurnEvent): void { + const turn = this.turns.get(turnId) + if (!turn) return + turn.events.push(event) + for (const listener of turn.listeners) { + listener(event) + } + if (event.type === 'completed' || event.type === 'failed') { + turn.done = true + } + } + + /** Register an SSE listener. Returns unsubscribe function. */ + subscribe( + turnId: string, + listener: (event: WorkerTurnEvent) => void, + ): (() => void) | null { + const turn = this.turns.get(turnId) + if (!turn) return null + + // Replay buffered events first + for (const event of turn.events) { + listener(event) + } + + if (turn.done) return null + + turn.listeners.add(listener) + return () => { + turn.listeners.delete(listener) + } + } + + /** Set a pending approval on the turn. Returns a promise that resolves + * when the gateway sends the decision. */ + requestApproval(turnId: string, approvalId: string): Promise { + const turn = this.turns.get(turnId) + if (!turn) return Promise.reject(new Error(`unknown turn ${turnId}`)) + + return new Promise((resolve) => { + turn.pendingApproval = { approval_id: approvalId, resolve } + }) + } + + resolveApproval(turnId: string, approvalId: string, decision: ApprovalDecision): boolean { + const turn = this.turns.get(turnId) + if (!turn?.pendingApproval) return false + if (turn.pendingApproval.approval_id !== approvalId) return false + turn.pendingApproval.resolve(decision) + turn.pendingApproval = null + return true + } + + cancel(turnId: string): boolean { + const turn = this.turns.get(turnId) + if (!turn) return false + turn.abortController.abort() + return true + } + + /** Prune completed turns older than `maxAgeMs`. */ + prune(maxAgeMs = 5 * 60 * 1000): void { + const now = Date.now() + for (const [id, turn] of this.turns) { + if (turn.done) { + const lastEvent = turn.events[turn.events.length - 1] + // Simple heuristic — if turn is done and old, drop it + if (lastEvent && turn.events.length > 0) { + this.turns.delete(id) + } + } + } + } + + get activeTurnId(): string | null { + for (const [id, turn] of this.turns) { + if (!turn.done) return id + } + return null + } + + get isBusy(): boolean { + return this.activeTurnId !== null + } +}