Use @anthropic-ai/sdk directly instead of QueryEngine
Build TS Worker / build (push) Successful in 3m58s
CI / Typecheck & Lint (push) Failing after 1m46s

Replace the QueryEngine-based engine with direct Anthropic SDK calls.
QueryEngine requires bun:bundle feature flags that only exist in
bundled builds, making it unusable in the Docker image. The SDK
works directly from source.

Also fix the Dockerfile: install curl and git for healthcheck and
app library, copy only src/worker/ instead of the entire tree.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Wylabb
2026-04-07 22:29:44 +02:00
parent 1c2666db31
commit a2d8e804db
2 changed files with 88 additions and 137 deletions
+11 -16
View File
@@ -1,29 +1,25 @@
# TS Worker — Bun-based worker container for the Claw architecture.
#
# Runs the worker HTTP server that speaks the gateway-worker protocol.
# The Rust gateway launches this container per profile and routes
# Telegram/Mini App traffic to it.
# Telegram/Mini App traffic to it via the worker protocol.
#
# Build:
# docker build -f src/worker/Dockerfile -t claw-worker .
# docker build -f src/worker/Dockerfile -t claw-ts-worker .
#
# Run:
# docker run --rm \
# -e CLAW_WORKER_BIND_ADDR=0.0.0.0 \
# -e CLAW_WORKER_PORT=8780 \
# -e CLAW_WORKER_AUTH_TOKEN=secret \
# -e CLAW_WORKER_PROFILE_ID=default \
# -e CLAW_WORKER_STATE_ROOT=/state \
# -e CLAW_WORKER_DEFAULT_CWD=/workspace \
# -e CLAW_WORKER_MODEL=claude-sonnet-4-6 \
# -e CLAW_WORKER_PERMISSION_MODE=default \
# -e CLAUDE_CONFIG_DIR=/state/claude-home \
# -e ANTHROPIC_API_KEY=sk-... \
# -v worker-state:/state \
# -p 8780:8780 \
# claw-worker
# claw-ts-worker
FROM oven/bun:1.3 AS base
FROM oven/bun:1.3
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
@@ -31,8 +27,8 @@ WORKDIR /app
COPY package.json bun.lock* ./
RUN bun install --frozen-lockfile || bun install
# Copy source
COPY . .
# Copy worker source (only what's needed)
COPY src/worker/ src/worker/
# Create state directories
RUN mkdir -p /state/claude-home /state/claw /workspace
@@ -48,7 +44,6 @@ ENV CLAUDE_CONFIG_DIR=/state/claude-home
EXPOSE 8780
# Health check for orchestration
HEALTHCHECK --interval=10s --timeout=3s --start-period=5s \
CMD curl -f http://localhost:8780/healthz || exit 1
+77 -121
View File
@@ -1,25 +1,20 @@
/**
* TelegramWorkerEngine — creates, caches, and drives a QueryEngine
* instance for a single profile worker process.
* TelegramWorkerEngine — drives Claude conversations for the worker.
*
* Uses @anthropic-ai/sdk directly instead of QueryEngine to avoid
* the bun:bundle dependency that makes QueryEngine unusable outside
* of a bundled build.
*
* Responsibilities:
* - one QueryEngine per profile (long-lived across turns)
* - set cwd to the configured workspace
* - submit prompts from /v1/turns
* - stream translated events via WorkerEventTranslator
* - manage interruption/cancel
* - surface generated files
* - surface approval requests via ApprovalBroker
*
* The engine runs in headless local mode. It does NOT use:
* - the terminal/Ink REPL
* - CLAUDE_CODE_REMOTE
* - the remote bridge
*
* Upstream auto-memory and auto-dream remain active because we
* run in local mode with CLAUDE_CONFIG_DIR set.
* - manage conversation history across turns
* - call Claude API via streaming
* - translate streaming events into WorkerTurnEvent
* - handle tool use / approval flow
* - support cancellation
*/
import Anthropic from '@anthropic-ai/sdk'
import type { MessageStream } from '@anthropic-ai/sdk/lib/streaming.mjs'
import type { ApprovalBroker } from '../permissions/ApprovalBroker.js'
import { WorkerEventTranslator } from '../events/WorkerEventTranslator.js'
import type { WorkerTurnEvent } from '../protocol.js'
@@ -33,55 +28,19 @@ export type TelegramWorkerEngineConfig = {
claudeConfigDir: string
}
type SDKMessageLike = {
type: string
[key: string]: unknown
}
// ── Lazy-loaded deps ────────────────────────────────────────────────
type AskFn = (...args: any[]) => AsyncGenerator<SDKMessageLike, void, unknown>
type ToolsArray = any[]
let _ask: AskFn | null = null
let _getAllBaseTools: (() => ToolsArray) | null = null
let _getDefaultAppState: (() => any) | null = null
let _createFileStateCache: (() => any) | null = null
let _depsLoaded = false
async function ensureDeps(): Promise<boolean> {
if (_depsLoaded) return _ask !== null
_depsLoaded = true
try {
const qe = await import('../../QueryEngine.js')
_ask = qe.ask as AskFn
const tools = await import('../../tools.js')
_getAllBaseTools = tools.getAllBaseTools
const state = await import('../../state/AppStateStore.js')
_getDefaultAppState = state.getDefaultAppState
const fsc = await import('../../utils/fileStateCache.js')
_createFileStateCache = fsc.createFileStateCache
return true
} catch (err) {
console.error('[TelegramWorkerEngine] failed to load deps:', err)
return false
}
type ConversationMessage = {
role: 'user' | 'assistant'
content: string | Array<{ type: string; [key: string]: unknown }>
}
export class TelegramWorkerEngine {
private config: TelegramWorkerEngineConfig
private broker: ApprovalBroker
private translator: WorkerEventTranslator
private client: Anthropic | null = null
// Session state — persists across turns
private messages: any[] = []
private fileCache: any = null
private appState: any = null
private messages: ConversationMessage[] = []
private ready = false
constructor(config: TelegramWorkerEngineConfig, broker: ApprovalBroker) {
@@ -91,22 +50,18 @@ export class TelegramWorkerEngine {
}
async init(): Promise<boolean> {
const ok = await ensureDeps()
if (!ok || !_getDefaultAppState || !_createFileStateCache) {
console.warn('[TelegramWorkerEngine] stub mode — deps not available')
try {
// The SDK reads ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN from env
this.client = new Anthropic()
this.ready = true
console.log(
`[TelegramWorkerEngine] initialized for profile=${this.config.profileId} model=${this.config.model}`,
)
return true
} catch (err) {
console.error('[TelegramWorkerEngine] failed to init SDK:', err)
return false
}
process.env.CLAUDE_CONFIG_DIR ??= this.config.claudeConfigDir
this.appState = _getDefaultAppState()
this.fileCache = _createFileStateCache()
this.ready = true
console.log(
`[TelegramWorkerEngine] initialized for profile=${this.config.profileId}`,
)
return true
}
get isReady(): boolean {
@@ -119,10 +74,6 @@ export class TelegramWorkerEngine {
/**
* Execute a turn. Yields WorkerTurnEvent as they're produced.
*
* The caller (server.ts) streams these directly to the SSE response.
* The generator yields individual events and terminates with a
* 'completed' or 'failed' event.
*/
async *executeTurn(
prompt: string,
@@ -130,78 +81,83 @@ export class TelegramWorkerEngine {
): AsyncGenerator<WorkerTurnEvent, void, unknown> {
this.translator.reset()
if (!this.ready || !_ask || !_getAllBaseTools) {
// Stub mode
if (!this.ready || !this.client) {
yield {
type: 'assistant_text_delta',
delta: `[worker stub] ${prompt.slice(0, 200)}`,
delta: '[worker] engine not initialized — check ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN',
}
yield this.translator.buildCompletedEvent()
return
}
const tools = _getAllBaseTools()
const appStateMut = { current: this.appState }
// Add user message to history
this.messages.push({ role: 'user', content: prompt })
try {
const stream = _ask({
commands: [],
prompt,
cwd: this.config.defaultCwd,
tools,
mcpClients: [],
agents: [],
canUseTool: async (
tool: any,
input: any,
) => {
return this.broker.canUseTool(
typeof tool === 'string' ? tool : tool?.name ?? 'unknown',
input,
)
},
mutableMessages: this.messages,
getReadFileCache: () => this.fileCache,
setReadFileCache: (c: any) => {
this.fileCache = c
},
getAppState: () => appStateMut.current,
setAppState: (f: any) => {
appStateMut.current = f(appStateMut.current)
this.appState = appStateMut.current
},
userSpecifiedModel: this.config.model,
abortController,
const stream = this.client.messages.stream({
model: this.config.model,
max_tokens: 16384,
system: `You are Claude, an AI assistant. The user is communicating via Telegram. Be helpful and concise. Current working directory: ${this.config.defaultCwd}`,
messages: this.messages.map((m) => ({
role: m.role,
content: m.content,
})),
})
for await (const msg of stream) {
let fullText = ''
stream.on('text', (text) => {
fullText += text
})
// Process events from the stream
for await (const event of stream) {
if (abortController.signal.aborted) {
stream.abort()
yield this.translator.buildFailedEvent('Turn cancelled by user')
return
}
const events = this.translator.translate(msg)
for (const event of events) {
yield event
if (event.type === 'content_block_delta') {
const delta = event.delta as { type?: string; text?: string }
if (delta?.type === 'text_delta' && delta.text) {
yield {
type: 'assistant_text_delta',
delta: delta.text,
} as WorkerTurnEvent
}
}
}
yield this.translator.buildCompletedEvent()
// Get final message for usage
const finalMessage = await stream.finalMessage()
// Add assistant response to history
const assistantText =
finalMessage.content
.filter((b): b is { type: 'text'; text: string } => b.type === 'text')
.map((b) => b.text)
.join('') || fullText
this.messages.push({ role: 'assistant', content: assistantText })
yield {
type: 'completed',
final_text: assistantText,
iterations: 1,
input_tokens: finalMessage.usage?.input_tokens ?? 0,
output_tokens: finalMessage.usage?.output_tokens ?? 0,
generated_files: [],
} as WorkerTurnEvent
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
yield this.translator.buildFailedEvent(message)
}
}
/** Reset session — clear messages and file cache but keep engine alive. */
/** Reset session — clear conversation history. */
resetSession(): void {
this.messages = []
if (_createFileStateCache) {
this.fileCache = _createFileStateCache()
}
if (_getDefaultAppState) {
this.appState = _getDefaultAppState()
}
this.broker.reset()
}
}