Files
claw-code-parity/ts-worker/server.ts
T
Wylabb f7c23fb325
Build Claw Telegram / build (push) Successful in 5m26s
Build Claw Telegram / cleanup (push) Successful in 0s
Replace Rust worker with TS worker in single image
Add ts-worker/ with the Bun/TypeScript worker that replaces
claw-profile-worker. The Dockerfile now builds a single image
containing both the Rust gateway (claw-telegram) and the TS worker.

The image defaults to worker mode (bun run ts-worker/main.ts).
The gateway Unraid XML overrides with --entrypoint claw-telegram.
Worker containers use the same image with the default CMD.

- Add ts-worker/ (12 files): HTTP/SSE server, Anthropic SDK engine,
  approval broker, event translator, state stores
- Add package.json with @anthropic-ai/sdk dependency
- Rewrite Dockerfile: three-stage build (Rust + Bun + runtime)
- Revert CLAW_GATEWAY_WORKER_IMAGE to claw-telegram:latest
- Remove image pull from docker_worker_manager (same image, already local)
- Add ts-worker paths to CI trigger

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 23:44:57 +02:00

501 lines
17 KiB
TypeScript

/**
* Worker HTTP server — Bun.serve implementation of the worker protocol.
*
* Implements every endpoint from worker-protocol/openapi.yaml so the
* Rust gateway can talk to this TS worker exactly as it talked to the
* old Rust worker.
*/
import type { Server } from 'bun'
import { TurnManager } from './turnManager.js'
import type {
TurnApprovalRequest,
WorkerStatusResponse,
WorkerTurnEvent,
WorkerTurnRequest,
} from './protocol.js'
import { ApprovalBroker } from './permissions/ApprovalBroker.js'
import { TelegramWorkerEngine } from './engine/TelegramWorkerEngine.js'
import { TaskStore } from './state/TaskStore.js'
import { TeamStore } from './state/TeamStore.js'
import { FeedStore } from './state/FeedStore.js'
import { AppLibraryStore } from './state/AppLibraryStore.js'
import type { AppPackageRequest, AppPublishRequest } from './state/records.js'
export type WorkerServerConfig = {
bindAddr: string
port: number
authToken?: string
profileId: string
model: string
permissionMode: string
defaultCwd: string
stateRoot: string
claudeConfigDir: string
}
/**
* WorkerState — runtime state shared across all request handlers.
*/
type WorkerState = {
config: WorkerServerConfig
turnManager: TurnManager
broker: ApprovalBroker
engine: TelegramWorkerEngine
taskStore: TaskStore
teamStore: TeamStore
feedStore: FeedStore
appStore: AppLibraryStore
generatedFiles: Map<string, { data: Uint8Array; mediaType: string }>
}
function checkAuth(req: Request, state: WorkerState): Response | null {
const token = state.config.authToken
if (!token) return null
const auth = req.headers.get('authorization')
if (auth !== `Bearer ${token}`) {
return new Response('Unauthorized', { status: 401 })
}
return null
}
function json(data: unknown, status = 200): Response {
return new Response(JSON.stringify(data), {
status,
headers: { 'content-type': 'application/json' },
})
}
function noContent(): Response {
return new Response(null, { status: 204 })
}
function notFound(msg = 'Not found'): Response {
return new Response(JSON.stringify({ error: msg }), {
status: 404,
headers: { 'content-type': 'application/json' },
})
}
async function handleRequest(
req: Request,
state: WorkerState,
): Promise<Response> {
const authErr = checkAuth(req, state)
if (authErr) return authErr
const url = new URL(req.url)
const path = url.pathname
const method = req.method
// ── Health ────────────────────────────────────────────────────────
if (method === 'GET' && path === '/healthz') {
return new Response('ok')
}
// ── Status ────────────────────────────────────────────────────────
if (method === 'GET' && path === '/v1/status') {
const status: WorkerStatusResponse = {
profile_id: state.config.profileId,
message_count: state.engine.messageCount,
model: state.config.model,
permission_mode: state.config.permissionMode,
default_cwd: state.config.defaultCwd,
busy: state.turnManager.isBusy,
task_list_id: state.taskStore.getTaskListId(),
}
return json(status)
}
// ── Session reset ─────────────────────────────────────────────────
if (method === 'POST' && path === '/v1/session/reset') {
state.engine.resetSession()
state.taskStore.reset()
state.teamStore.reset()
return noContent()
}
// ── Turns ─────────────────────────────────────────────────────────
if (method === 'POST' && path === '/v1/turns') {
const body = (await req.json()) as WorkerTurnRequest
const turn = state.turnManager.create(body)
// Wire approval broker to emit SSE events on this turn
state.broker.setApprovalRequestHandler((request) => {
state.turnManager.emit(turn.id, {
type: 'approval_requested',
request: {
approval_id: request.approval_id,
tool_name: request.tool_name,
input: request.input,
current_mode: request.current_mode,
required_mode: request.required_mode,
reason: request.reason,
},
})
})
// Drive the turn through the engine
queueMicrotask(async () => {
try {
for await (const event of state.engine.executeTurn(
body.prompt,
turn.abortController,
)) {
state.turnManager.emit(turn.id, event)
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
state.turnManager.emit(turn.id, {
type: 'failed',
message,
})
}
})
return json({ turn_id: turn.id })
}
// ── Turn events (SSE) ─────────────────────────────────────────────
const turnEventsMatch = path.match(/^\/v1\/turns\/([^/]+)\/events$/)
if (method === 'GET' && turnEventsMatch) {
const turnId = turnEventsMatch[1]!
const turn = state.turnManager.get(turnId)
if (!turn) return notFound('Turn not found')
return new Response(
new ReadableStream<Uint8Array>({
start(controller) {
const encoder = new TextEncoder()
let closed = false
const send = (event: WorkerTurnEvent) => {
if (closed) return
const data = JSON.stringify(event)
controller.enqueue(encoder.encode(`data: ${data}\n\n`))
if (event.type === 'completed' || event.type === 'failed') {
closed = true
controller.close()
}
}
const unsub = state.turnManager.subscribe(turnId, send)
if (unsub === null && !closed) {
closed = true
controller.close()
}
},
}),
{
headers: {
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
connection: 'keep-alive',
},
},
)
}
// ── Turn approval ─────────────────────────────────────────────────
const turnApprovalMatch = path.match(/^\/v1\/turns\/([^/]+)\/approval$/)
if (method === 'POST' && turnApprovalMatch) {
const body = (await req.json()) as TurnApprovalRequest
// Route to broker (foreground approval)
const ok = state.broker.resolveForeground(
body.approval_id,
body.decision,
)
if (!ok) {
// Try turn manager fallback
const tmOk = state.turnManager.resolveApproval(
turnApprovalMatch[1]!,
body.approval_id,
body.decision,
)
if (!tmOk) return notFound('No pending approval')
}
return noContent()
}
// ── Turn cancel ───────────────────────────────────────────────────
const turnCancelMatch = path.match(/^\/v1\/turns\/([^/]+)\/cancel$/)
if (method === 'POST' && turnCancelMatch) {
state.turnManager.cancel(turnCancelMatch[1]!)
return noContent()
}
// ── Generated files ───────────────────────────────────────────────
const fileMatch = path.match(/^\/v1\/turns\/([^/]+)\/files\/([^/]+)$/)
if (method === 'GET' && fileMatch) {
const fileId = fileMatch[2]!
const file = state.generatedFiles.get(fileId)
if (!file) return notFound('File not found')
return new Response(file.data, {
headers: { 'content-type': file.mediaType },
})
}
// ── Tasks ─────────────────────────────────────────────────────────
if (method === 'GET' && path === '/v1/tasks') {
return json({
task_list_id: state.taskStore.getTaskListId(),
tasks: state.taskStore.listTasks(),
})
}
const taskMatch = path.match(/^\/v1\/tasks\/([^/]+)$/)
if (method === 'GET' && taskMatch) {
const task = state.taskStore.getTask(taskMatch[1]!)
const runtimeTask = state.taskStore.getRuntimeTask(taskMatch[1]!)
if (!task && !runtimeTask) return notFound('Task not found')
return json({ task, runtime_task: runtimeTask })
}
const taskStopMatch = path.match(/^\/v1\/tasks\/([^/]+)\/stop$/)
if (method === 'POST' && taskStopMatch) {
state.taskStore.stopRuntimeTask(taskStopMatch[1]!)
return noContent()
}
// ── Team ──────────────────────────────────────────────────────────
if (method === 'GET' && path === '/v1/team') {
return json({
team: state.teamStore.getTeam(),
task_list_id: state.taskStore.getTaskListId(),
})
}
// ── Agents ────────────────────────────────────────────────────────
if (method === 'GET' && path === '/v1/agents') {
return json({ agents: state.taskStore.listAgents() })
}
const agentMatch = path.match(/^\/v1\/agents\/([^/]+)$/)
if (method === 'GET' && agentMatch) {
const agent = state.taskStore.getAgent(agentMatch[1]!)
if (!agent) return notFound('Agent not found')
return json(agent)
}
// ── Background approvals ──────────────────────────────────────────
if (method === 'GET' && path === '/v1/background-approvals') {
const approvals = state.broker.getBackgroundApprovals().map((a) => ({
approval_id: a.approval_id,
task_id: '',
tool_name: a.tool_name,
input: a.input,
current_mode: a.current_mode,
required_mode: a.required_mode,
reason: a.reason,
created_at: a.created_at,
notified: false,
}))
return json({ approvals })
}
const bgApprovalMatch = path.match(/^\/v1\/background-approvals\/([^/]+)$/)
if (method === 'POST' && bgApprovalMatch) {
const body = (await req.json()) as TurnApprovalRequest
const ok = state.broker.resolveBackground(
bgApprovalMatch[1]!,
body.decision,
)
if (!ok) return notFound('No pending background approval')
return noContent()
}
const bgApprovalNotifiedMatch = path.match(
/^\/v1\/background-approvals\/([^/]+)\/notified$/,
)
if (method === 'POST' && bgApprovalNotifiedMatch) {
state.broker.markBackgroundNotified(bgApprovalNotifiedMatch[1]!)
return noContent()
}
// ── Mailbox ───────────────────────────────────────────────────────
if (method === 'GET' && path === '/v1/mailbox') {
return json({ mailbox: state.teamStore.getMailboxSummary() })
}
const mailboxPendingMatch = path.match(/^\/v1\/mailbox\/pending\/([^/]+)$/)
if (method === 'GET' && mailboxPendingMatch) {
const messages = state.teamStore.getPendingMessages(
mailboxPendingMatch[1]!,
)
return json({ messages })
}
// ── Feed ──────────────────────────────────────────────────────────
if (method === 'GET' && path === '/v1/feed') {
const turnId = url.searchParams.get('turn_id') ?? undefined
return json({
items: state.feedStore.listItems(turnId),
state_root: state.config.stateRoot,
})
}
const feedItemMatch = path.match(/^\/v1\/feed\/([^/]+)$/)
if (method === 'GET' && feedItemMatch) {
const item = state.feedStore.getItem(feedItemMatch[1]!)
if (!item) return notFound('Feed item not found')
return json({ item, state_root: state.config.stateRoot })
}
const feedFileMatch = path.match(/^\/v1\/feed\/([^/]+)\/file$/)
if (method === 'GET' && feedFileMatch) {
const payload = await state.feedStore.getPayload(feedFileMatch[1]!)
if (!payload) return notFound('Feed file not found')
const item = state.feedStore.getItem(feedFileMatch[1]!)
return new Response(payload, {
headers: {
'content-type': item?.media_type || 'application/octet-stream',
},
})
}
const feedMakeAppMatch = path.match(/^\/v1\/feed\/([^/]+)\/make-app$/)
if (method === 'POST' && feedMakeAppMatch) {
const body = (await req.json()) as AppPackageRequest
const feedItem = state.feedStore.getItem(feedMakeAppMatch[1]!)
if (!feedItem) return notFound('Feed item not found')
const payload = await state.feedStore.getPayload(feedMakeAppMatch[1]!)
const result = await state.appStore.packageFromFeedItem(
body,
payload,
feedItem.title,
feedItem.kind,
)
return json({ result })
}
// ── Apps ───────────────────────────────────────────────────────────
if (method === 'GET' && path === '/v1/apps') {
return json({
apps: state.appStore.listApps(),
state_root: state.config.stateRoot,
})
}
const appMatch = path.match(/^\/v1\/apps\/([^/]+)$/)
if (method === 'GET' && appMatch) {
const app = state.appStore.getApp(appMatch[1]!)
if (!app) return notFound('App not found')
return json({ app, state_root: state.config.stateRoot })
}
const appVersionMatch = path.match(/^\/v1\/apps\/([^/]+)\/version$/)
if (method === 'GET' && appVersionMatch) {
const version = state.appStore.getVersion(appVersionMatch[1]!)
if (!version) return notFound('App not found')
return json({ version })
}
const appHistoryMatch = path.match(/^\/v1\/apps\/([^/]+)\/history$/)
if (method === 'GET' && appHistoryMatch) {
const history = await state.appStore.getHistory(appHistoryMatch[1]!)
return json({ history })
}
const appLaunchMatch = path.match(/^\/v1\/apps\/([^/]+)\/launch$/)
if (method === 'POST' && appLaunchMatch) {
const app = state.appStore.launch(appLaunchMatch[1]!)
if (!app) return notFound('App not found')
return json({ app, state_root: state.config.stateRoot })
}
const appWorkspaceMatch = path.match(/^\/v1\/apps\/([^/]+)\/open-workspace$/)
if (method === 'POST' && appWorkspaceMatch) {
const workspace = await state.appStore.getWorkspace(appWorkspaceMatch[1]!)
if (!workspace) return notFound('App not found')
return json({ workspace })
}
const appPublishMatch = path.match(/^\/v1\/apps\/([^/]+)\/publish$/)
if (method === 'POST' && appPublishMatch) {
const body = (await req.json()) as AppPublishRequest
const result = await state.appStore.publish(appPublishMatch[1]!, body.message)
if (!result) return notFound('App not found')
return json({ result })
}
const appArchiveMatch = path.match(/^\/v1\/apps\/([^/]+)\/archive$/)
if (method === 'POST' && appArchiveMatch) {
state.appStore.archive(appArchiveMatch[1]!)
return noContent()
}
const appWorktreeMatch = path.match(/^\/v1\/apps\/([^/]+)\/worktree-status$/)
if (method === 'GET' && appWorktreeMatch) {
const workspace = await state.appStore.getWorkspace(appWorktreeMatch[1]!)
return json({ workspace })
}
const appFileMatch = path.match(/^\/v1\/apps\/([^/]+)\/files\/(.+)$/)
if (method === 'GET' && appFileMatch) {
const data = await state.appStore.getAppFile(
appFileMatch[1]!,
appFileMatch[2]!,
)
if (!data) return notFound('App file not found')
return new Response(data, {
headers: { 'content-type': 'application/octet-stream' },
})
}
return notFound(`No route: ${method} ${path}`)
}
export async function startWorkerServer(
config: WorkerServerConfig,
): Promise<Server> {
// Build all runtime components
const broker = new ApprovalBroker(config.permissionMode)
const engine = new TelegramWorkerEngine(config, broker)
const taskStore = new TaskStore()
const teamStore = new TeamStore()
const feedStore = new FeedStore(config.stateRoot)
const appStore = new AppLibraryStore(config.stateRoot)
// Initialize engine (may fall back to stub mode)
const engineReady = await engine.init()
console.log(
`[worker] engine: ${engineReady ? 'ready' : 'stub mode'}`,
)
// Initialize persistent stores
await feedStore.init()
await appStore.init()
const state: WorkerState = {
config,
turnManager: new TurnManager(),
broker,
engine,
taskStore,
teamStore,
feedStore,
appStore,
generatedFiles: new Map(),
}
const server = Bun.serve({
hostname: config.bindAddr,
port: config.port,
async fetch(req) {
try {
return await handleRequest(req, state)
} catch (err) {
console.error('[worker] unhandled error:', err)
return new Response(
JSON.stringify({ error: 'Internal server error' }),
{ status: 500, headers: { 'content-type': 'application/json' } },
)
}
},
})
console.log(
`[worker] listening on ${config.bindAddr}:${config.port} (profile=${config.profileId})`,
)
return server
}