Files
claw-code-parity/rust/crates/claw-telegram/src/gateway.rs
T
2026-04-08 01:44:02 +02:00

2204 lines
80 KiB
Rust

use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use channel_gateway_core::{
AttachmentKind, AttachmentRef, BackgroundApprovalRecord, GatewayManifest, GatewaySettings,
MailboxMessage, ManifestError, ProfileId, ProfileRecord, RuntimeTaskRecord,
RuntimeTaskStatus, TaskListRecord, TurnSource, WorkerAgentListResponse,
WorkerApprovalDecision,
WorkerBackgroundApprovalListResponse, WorkerDefaults, WorkerMailboxMessageEvent,
WorkerMailboxSummaryResponse, WorkerTaskListResponse, WorkerTaskSnapshotResponse,
WorkerTeamCreatedEvent, WorkerTeamSnapshotResponse, WorkerTurnEvent,
};
use tokio::sync::Mutex;
use crate::config::GatewayConfig;
use crate::docker_worker_manager::{DockerWorkerManager, ReconcileResult, WorkerManagerError};
use crate::miniapp;
use crate::registry::{AgentRegistry, RegistryError};
use crate::telegram_api::{
CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, TelegramApi,
TelegramApiError, Update, User, WebAppInfo,
};
use crate::unraid_template_manager::{TemplateError, UnraidTemplateManager};
use crate::worker_client::{WorkerClient, WorkerClientError};
const BUSY_MESSAGE: &str =
"An agent turn is already running for this profile. Use /cancel to stop it, or wait for it to finish.";
const BACKGROUND_AGENT_NOTIFY_INTERVAL_SECS: u64 = 5;
pub struct TelegramGateway {
config: GatewayConfig,
api: TelegramApi,
worker_manager: DockerWorkerManager,
active_turns: Arc<Mutex<BTreeMap<String, ActiveTurnState>>>,
}
#[derive(Clone)]
struct ActiveTurnState {
turn_id: String,
worker_base_url: String,
pending_approval: Option<PendingApprovalState>,
}
#[derive(Clone)]
struct PendingApprovalState {
approval_id: String,
message_id: i64,
}
impl TelegramGateway {
pub fn new(config: GatewayConfig) -> Result<Self, GatewayError> {
std::fs::create_dir_all(&config.state_root)?;
persist_gateway_template(&config)?;
let api = TelegramApi::new(config.bot_token())?;
let worker_manager = DockerWorkerManager::new(config.clone())?;
Ok(Self {
config,
api,
worker_manager,
active_turns: Arc::new(Mutex::new(BTreeMap::new())),
})
}
pub async fn run(&self) -> Result<(), GatewayError> {
let manifest = load_or_init_manifest(&self.config)?;
self.worker_manager.reconcile_manifest(&manifest).await?;
let me = self.api.get_me().await?;
eprintln!(
"telegram gateway connected: id={} username={}",
me.id,
me.username.as_deref().unwrap_or("(unknown)")
);
tokio::spawn(self.spawn_background_agent_notifier());
if let Some(bind_addr) = self.config.miniapp_bind_addr.clone() {
let config = self.config.clone();
tokio::spawn(async move {
if let Err(error) = miniapp::serve(config, bind_addr.clone()).await {
eprintln!("miniapp server exited on {bind_addr}: {error}");
}
});
}
let mut offset = self.load_offset()?;
loop {
match self
.api
.get_updates(offset, self.config.poll_timeout_secs)
.await
{
Ok(updates) => {
for update in updates {
offset = Some(update.update_id + 1);
self.store_offset(offset)?;
if let Err(error) = self.handle_update(update).await {
eprintln!("gateway update handling error: {error}");
}
}
}
Err(error) => {
eprintln!("telegram polling error: {error}");
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
}
async fn handle_update(&self, update: Update) -> Result<(), GatewayError> {
if let Some(message) = update.message {
self.handle_message(message).await?;
} else if let Some(callback_query) = update.callback_query {
self.handle_callback_query(callback_query).await?;
}
Ok(())
}
async fn handle_message(&self, message: Message) -> Result<(), GatewayError> {
if message.chat.kind != "private" {
let _ = self
.api
.send_message(
message.chat.id,
"This bot only supports direct messages in v1.",
None,
)
.await;
return Ok(());
}
let Some(from) = &message.from else {
return Ok(());
};
let manifest = load_manifest(&self.config)?;
let Some(profile) = manifest.resolve_profile_for_telegram_user(from.id).cloned() else {
let _ = self
.api
.send_message(
message.chat.id,
"You are not mapped to a profile in the gateway manifest.",
None,
)
.await;
return Ok(());
};
if let Some(command) = parse_command(&message) {
return self
.handle_command(&manifest, &profile, &message, command)
.await;
}
if self.is_profile_busy(profile.profile_id.as_str()).await {
self.api
.send_message(message.chat.id, BUSY_MESSAGE, None)
.await?;
return Ok(());
}
let attachments = match self
.download_attachments(&profile, &message, self.config.max_upload_bytes())
.await
{
Ok(attachments) => attachments,
Err(GatewayError::Other(message_text)) => {
self.api
.send_message(message.chat.id, &message_text, None)
.await?;
return Ok(());
}
Err(error) => return Err(error),
};
let prompt = message.text_or_caption().unwrap_or("").trim().to_string();
if prompt.is_empty() && attachments.is_empty() {
self.api
.send_message(
message.chat.id,
"Send a message, photo, or document to start a turn.",
None,
)
.await?;
return Ok(());
}
let worker = self
.worker_manager
.ensure_profile_worker(&manifest, &profile)
.await?;
let client = WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
let turn_id = client
.post_turn(prompt, turn_source(from, &message), &attachments)
.await?;
self.insert_active_turn(
profile.profile_id.as_str(),
ActiveTurnState {
turn_id: turn_id.clone(),
worker_base_url: worker.base_url.clone(),
pending_approval: None,
},
)
.await;
let delivery = self.spawn_delivery_task(
profile.profile_id.as_str().to_string(),
message.chat.id,
turn_id,
worker.base_url,
);
tokio::spawn(delivery);
Ok(())
}
async fn handle_command(
&self,
manifest: &GatewayManifest,
profile: &ProfileRecord,
message: &Message,
command: Command,
) -> Result<(), GatewayError> {
match command {
Command::Start | Command::Help => {
self.api
.send_message(message.chat.id, &render_help(), None)
.await?;
}
Command::Status => {
let worker = self
.worker_manager
.ensure_profile_worker(manifest, profile)
.await?;
let client = WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
let status = client.status().await?;
let text = format!(
"Status\nProfile: {}\nContainer: {}\nMessages: {}\nModel: {}\nPermission mode: {}\nWorking directory: {}\nTask list: {}\nActive team: {}\nBusy: {}",
status.profile_id,
profile.worker.container_name,
status.message_count,
status.model,
status.permission_mode,
status.default_cwd,
status.task_list_id,
status.active_team.as_deref().unwrap_or("(none)"),
if status.busy { "yes" } else { "no" }
);
self.api.send_message(message.chat.id, &text, None).await?;
}
Command::Tasks => {
let worker = self
.worker_manager
.ensure_profile_worker(manifest, profile)
.await?;
let client = WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
let tasks = client.list_tasks().await?;
self.api
.send_message(message.chat.id, &render_task_list(&tasks), None)
.await?;
}
Command::Task(task_id) => {
let worker = self
.worker_manager
.ensure_profile_worker(manifest, profile)
.await?;
let client = WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
let task = client.get_task(&task_id).await?;
self.api
.send_message(message.chat.id, &render_task_snapshot(&task_id, &task), None)
.await?;
}
Command::Team => {
let worker = self
.worker_manager
.ensure_profile_worker(manifest, profile)
.await?;
let client = WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
let team = client.team().await?;
self.api
.send_message(message.chat.id, &render_team_snapshot(&team), None)
.await?;
}
Command::Agents => {
let worker = self
.worker_manager
.ensure_profile_worker(manifest, profile)
.await?;
let client = WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
let agents = client.agents().await?;
self.api
.send_message(message.chat.id, &render_agents_snapshot(&agents), None)
.await?;
}
Command::Agent(agent_id) => {
let worker = self
.worker_manager
.ensure_profile_worker(manifest, profile)
.await?;
let client = WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
let agent = client.agent(&agent_id).await?;
self.api
.send_message(message.chat.id, &render_agent_snapshot(&agent), None)
.await?;
}
Command::StopTask(task_id) => {
let worker = self
.worker_manager
.ensure_profile_worker(manifest, profile)
.await?;
let client = WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
client.stop_task(&task_id).await?;
self.api
.send_message(
message.chat.id,
&format!("Stop requested for runtime task `{task_id}`."),
None,
)
.await?;
}
Command::Messages => {
let worker = self
.worker_manager
.ensure_profile_worker(manifest, profile)
.await?;
let client = WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
let mailbox = client.mailbox().await?;
self.api
.send_message(message.chat.id, &render_mailbox_summary(&mailbox), None)
.await?;
}
Command::MiniApp => {
let Some(base_url) = self.config.miniapp_public_base_url.as_deref() else {
self.api
.send_message(
message.chat.id,
"Mini App is not configured on this gateway. Set `CLAW_GATEWAY_MINIAPP_PUBLIC_BASE_URL` and `CLAW_GATEWAY_MINIAPP_BIND_ADDR`.",
None,
)
.await?;
return Ok(());
};
let url = format!("{}/miniapp", base_url.trim_end_matches('/'));
let keyboard = InlineKeyboardMarkup {
inline_keyboard: vec![vec![InlineKeyboardButton {
text: "Open Mini App".to_string(),
callback_data: None,
url: None,
web_app: Some(WebAppInfo { url }),
}]],
};
self.api
.send_message(
message.chat.id,
"Open the Mini App for a richer view of your tasks, team, agents, mailbox, and approvals.",
Some(&keyboard),
)
.await?;
}
Command::New => {
if self.is_profile_busy(profile.profile_id.as_str()).await {
self.api
.send_message(message.chat.id, BUSY_MESSAGE, None)
.await?;
} else {
let worker = self
.worker_manager
.ensure_profile_worker(manifest, profile)
.await?;
let client =
WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
client.reset_session().await?;
self.api
.send_message(
message.chat.id,
"Started a fresh session for this profile.",
None,
)
.await?;
}
}
Command::Cancel => {
let response = self.cancel_turn(profile.profile_id.as_str()).await?;
self.api
.send_message(message.chat.id, &response, None)
.await?;
}
}
Ok(())
}
async fn handle_callback_query(&self, query: CallbackQuery) -> Result<(), GatewayError> {
let Some(data) = query.data.as_deref() else {
self.api
.answer_callback_query(&query.id, Some("Missing callback payload."))
.await?;
return Ok(());
};
let Some(action) = ApprovalAction::parse(data) else {
self.api
.answer_callback_query(&query.id, Some("Unsupported callback payload."))
.await?;
return Ok(());
};
let manifest = load_manifest(&self.config)?;
let Some(profile) = manifest.resolve_profile_for_telegram_user(query.from.id) else {
self.api
.answer_callback_query(&query.id, Some("Not authorized."))
.await?;
return Ok(());
};
let profile_id = profile.profile_id.as_str().to_string();
match &action.target {
ApprovalTarget::Foreground { turn_id } => {
let pending = {
let mut active = self.active_turns.lock().await;
let Some(turn) = active.get_mut(&profile_id) else {
self.api
.answer_callback_query(&query.id, Some("No active turn."))
.await?;
return Ok(());
};
if turn.turn_id != *turn_id {
self.api
.answer_callback_query(
&query.id,
Some("This approval is no longer active."),
)
.await?;
return Ok(());
}
let Some(pending) = turn.pending_approval.clone() else {
self.api
.answer_callback_query(&query.id, Some("Approval already handled."))
.await?;
return Ok(());
};
if pending.approval_id != action.approval_id {
self.api
.answer_callback_query(
&query.id,
Some("This approval is no longer active."),
)
.await?;
return Ok(());
}
let worker_base_url = turn.worker_base_url.clone();
turn.pending_approval = None;
(pending, worker_base_url)
};
let client = WorkerClient::new(&pending.1, &self.config.worker_auth_token)?;
client
.post_approval(turn_id, &action.approval_id, action.decision())
.await?;
if let Some(message) = query.message {
let _ = self
.api
.edit_message_reply_markup(message.chat.id, pending.0.message_id, None)
.await;
}
}
ApprovalTarget::Background => {
let worker = self
.worker_manager
.ensure_profile_worker(&manifest, profile)
.await?;
let client = WorkerClient::new(&worker.base_url, &self.config.worker_auth_token)?;
client
.post_background_approval(&action.approval_id, action.decision())
.await?;
if let Some(message) = query.message {
let _ = self
.api
.edit_message_reply_markup(message.chat.id, message.message_id, None)
.await;
}
}
}
self.api
.answer_callback_query(&query.id, Some(action.label()))
.await?;
Ok(())
}
fn spawn_background_agent_notifier(
&self,
) -> impl std::future::Future<Output = ()> + Send + 'static {
let api = self.api.clone();
let config = self.config.clone();
async move {
loop {
if let Err(error) = notify_background_agents_once(&api, &config).await {
eprintln!("background agent notifier error: {error}");
}
tokio::time::sleep(Duration::from_secs(
BACKGROUND_AGENT_NOTIFY_INTERVAL_SECS,
))
.await;
}
}
}
fn spawn_delivery_task(
&self,
profile_id: String,
chat_id: i64,
turn_id: String,
worker_base_url: String,
) -> impl std::future::Future<Output = ()> + Send + 'static {
let api = self.api.clone();
let active_turns = self.active_turns.clone();
let state_root = self.config.state_root.clone();
let auth_token = self.config.worker_auth_token.clone();
let max_upload_bytes = self.config.max_upload_bytes();
let miniapp_base_url = self.config.miniapp_public_base_url.clone();
let _miniapp_ttl = self.config.miniapp_session_ttl_secs;
async move {
let client = match WorkerClient::new(&worker_base_url, &auth_token) {
Ok(client) => client,
Err(error) => {
eprintln!("failed to build worker client for delivery: {error}");
clear_active_turn(&active_turns, &profile_id).await;
return;
}
};
let mut receiver = match client.stream_turn_events(&turn_id).await {
Ok(receiver) => receiver,
Err(error) => {
let _ = api
.send_message(
chat_id,
&format!("Failed to open worker event stream: {error}"),
None,
)
.await;
clear_active_turn(&active_turns, &profile_id).await;
return;
}
};
let mut status_message = match api.send_message(chat_id, "Thinking...", None).await {
Ok(message) => Some(TrackedMessage::new(
chat_id,
message.message_id,
"Thinking...",
)),
Err(error) => {
eprintln!("failed to send initial Telegram message: {error}");
None
}
};
let typing_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
let typing_handle = spawn_typing_loop(api.clone(), chat_id, typing_done.clone());
let mut text_buffer = String::new();
let mut text_messages = status_message
.clone()
.map(|message| vec![message])
.unwrap_or_default();
while let Some(next_event) = receiver.recv().await {
let event = match next_event {
Ok(event) => event,
Err(error) => {
let text = format!("Worker stream failed: {error}");
if let Some(message_handle) = &mut status_message {
let _ = api
.edit_message_text(
message_handle.chat_id,
message_handle.message_id,
&text,
None,
)
.await;
} else {
let _ = api.send_message(chat_id, &text, None).await;
}
break;
}
};
match event {
WorkerTurnEvent::AssistantTextDelta { delta } => {
text_buffer.push_str(&delta);
if let Err(error) =
sync_text_messages(&api, chat_id, &mut text_messages, &text_buffer)
.await
{
eprintln!("failed to sync Telegram text output: {error}");
}
}
WorkerTurnEvent::ToolUse { name, .. } => {
if text_buffer.is_empty() {
if let Some(message) = &mut status_message {
let text = format!("Running tool: {name}");
if let Err(error) = api
.edit_message_text(
message.chat_id,
message.message_id,
&text,
None,
)
.await
{
eprintln!("failed to edit tool status message: {error}");
} else {
message.last_text = text;
}
}
}
}
WorkerTurnEvent::ToolResult {
tool_name,
is_error,
..
} => {
if is_error && text_buffer.is_empty() {
if let Some(message) = &mut status_message {
let text = format!("Tool `{tool_name}` returned an error.");
if let Err(error) = api
.edit_message_text(
message.chat_id,
message.message_id,
&text,
None,
)
.await
{
eprintln!("failed to edit error status message: {error}");
} else {
message.last_text = text;
}
}
}
}
WorkerTurnEvent::ApprovalRequested { request } => {
let keyboard =
approval_keyboard(&turn_id, &request.approval_id, &request.tool_name);
match api
.send_message(
chat_id,
&format!(
"Approval required\nTool: {}\nCurrent mode: {}\nRequired mode: {}\nInput: {}\n{}",
request.tool_name,
request.current_mode,
request.required_mode,
request.input,
request
.reason
.as_deref()
.map_or(String::new(), |value| format!("Reason: {value}"))
),
Some(&keyboard),
)
.await
{
Ok(message) => {
let mut active = active_turns.lock().await;
if let Some(state) = active.get_mut(&profile_id) {
state.pending_approval = Some(PendingApprovalState {
approval_id: request.approval_id,
message_id: message.message_id,
});
}
}
Err(error) => {
eprintln!("failed to send approval request: {error}");
let deny = WorkerApprovalDecision::Deny {
reason: "failed to deliver Telegram approval prompt".to_string(),
};
let _ = client.post_approval(&turn_id, &request.approval_id, deny).await;
}
}
}
WorkerTurnEvent::AutoCompaction {
removed_message_count,
} => {
let note = format!(
"Context was compacted automatically after removing {} older messages.",
removed_message_count
);
let _ = api.send_message(chat_id, &note, None).await;
}
WorkerTurnEvent::TaskCreated { task_list_id, task } => {
let _ = api
.send_message(
chat_id,
&render_task_created_notice(&task_list_id, &task),
None,
)
.await;
}
WorkerTurnEvent::TaskUpdated { task_list_id, task } => {
let _ = api
.send_message(
chat_id,
&render_task_updated_notice(&task_list_id, &task),
None,
)
.await;
}
WorkerTurnEvent::TaskStopped { task } => {
let _ = api
.send_message(chat_id, &render_task_stopped_notice(&task), None)
.await;
}
WorkerTurnEvent::AgentSpawned { agent } => {
let _ = api
.send_message(chat_id, &render_agent_spawned_notice(&agent), None)
.await;
}
WorkerTurnEvent::TeamCreated { team } => {
let _ = api
.send_message(chat_id, &render_team_created_notice(&team), None)
.await;
}
WorkerTurnEvent::TeamDeleted { team_name } => {
let _ = api
.send_message(chat_id, &render_team_deleted_notice(&team_name), None)
.await;
}
WorkerTurnEvent::MailboxMessage { message } => {
let _ = api
.send_message(chat_id, &render_mailbox_message_notice(&message), None)
.await;
}
WorkerTurnEvent::Completed {
final_text,
generated_files,
..
} => {
if text_buffer.is_empty() && !final_text.trim().is_empty() {
text_buffer = final_text;
let _ =
sync_text_messages(&api, chat_id, &mut text_messages, &text_buffer)
.await;
} else if text_buffer.is_empty() && generated_files.is_empty() {
if let Some(message) = &mut status_message {
let _ = api
.edit_message_text(
message.chat_id,
message.message_id,
"Done.",
None,
)
.await;
}
}
let miniapp_base = miniapp_base_url.as_deref();
let feed_items = if miniapp_base.is_some() {
match client.list_feed(Some(&turn_id)).await {
Ok(response) => response.items,
Err(error) => {
eprintln!(
"failed to load feed for completed turn {}: {error}",
turn_id
);
Vec::new()
}
}
} else {
Vec::new()
};
if let Some(base_url) = miniapp_base {
if !feed_items.is_empty() {
let feed_url = format!(
"{}/miniapp?tab=feed&turn_id={}",
base_url.trim_end_matches('/'),
turn_id,
);
let keyboard = crate::telegram_api::InlineKeyboardMarkup {
inline_keyboard: vec![vec![
crate::telegram_api::InlineKeyboardButton {
text: format!("Open Feed ({})", feed_items.len()),
callback_data: None,
url: None,
web_app: Some(crate::telegram_api::WebAppInfo {
url: feed_url,
}),
},
]],
};
let _ = api
.send_message(
chat_id,
&format!(
"Added {} item{} to the family feed.",
feed_items.len(),
if feed_items.len() == 1 { "" } else { "s" }
),
Some(&keyboard),
)
.await;
}
break;
}
if !generated_files.is_empty() {
eprintln!(
"artifact routing: {} files, miniapp_base={:?}",
generated_files.len(),
miniapp_base,
);
}
for descriptor in generated_files {
if descriptor.size_bytes > max_upload_bytes {
let _ = api
.send_message(
chat_id,
&format!(
"Generated file {} is larger than the configured upload limit and was not sent.",
descriptor.file_name
),
None,
)
.await;
continue;
}
let download_dir = state_root
.join("profiles")
.join(&profile_id)
.join("generated")
.join(&turn_id);
let downloaded = match client
.download_generated_file(&turn_id, &descriptor, &download_dir)
.await
{
Ok(path) => path,
Err(error) => {
eprintln!(
"failed to download generated file {}: {error}",
descriptor.file_name
);
continue;
}
};
let send_result = if descriptor.is_image || is_image_path(&downloaded) {
api.send_photo_path(
chat_id,
&downloaded,
Some("Generated by the agent"),
)
.await
} else {
api.send_document_path(
chat_id,
&downloaded,
Some("Generated by the agent"),
)
.await
};
if let Err(error) = send_result {
eprintln!(
"failed to send generated file {}: {error}",
downloaded.display()
);
}
}
break;
}
WorkerTurnEvent::Failed { message } => {
eprintln!(
"gateway turn failed: profile_id={} chat_id={} error={message}",
profile_id,
chat_id,
);
if let Some(message_handle) = &mut status_message {
let text = format!("Request failed: {message}");
let _ = api
.edit_message_text(
message_handle.chat_id,
message_handle.message_id,
&text,
None,
)
.await;
} else {
let _ = api
.send_message(chat_id, &format!("Request failed: {message}"), None)
.await;
}
break;
}
}
}
typing_done.store(true, std::sync::atomic::Ordering::SeqCst);
let _ = typing_handle.await;
clear_active_turn(&active_turns, &profile_id).await;
}
}
async fn download_attachments(
&self,
profile: &ProfileRecord,
message: &Message,
max_upload_bytes: u64,
) -> Result<Vec<AttachmentRef>, GatewayError> {
let inbound_root = self
.config
.state_root
.join("profiles")
.join(profile.profile_id.as_str())
.join("inbound");
tokio::fs::create_dir_all(&inbound_root).await?;
let mut attachments = Vec::new();
if let Some(photo) = message.photo.last() {
attachments.push(
self.download_telegram_file(
&photo.file_id,
photo.file_size,
"photo",
".jpg",
AttachmentKind::Photo,
None,
&inbound_root,
max_upload_bytes,
)
.await?,
);
}
if let Some(document) = &message.document {
let file_name = document
.file_name
.clone()
.unwrap_or_else(|| "document.bin".to_string());
let extension = Path::new(&file_name)
.extension()
.and_then(|value| value.to_str())
.map(|value| format!(".{value}"))
.unwrap_or_else(|| ".bin".to_string());
attachments.push(
self.download_telegram_file(
&document.file_id,
document.file_size,
"document",
&extension,
AttachmentKind::Document,
Some(file_name),
&inbound_root,
max_upload_bytes,
)
.await?,
);
}
Ok(attachments)
}
async fn download_telegram_file(
&self,
file_id: &str,
inline_size: Option<u64>,
prefix: &str,
extension: &str,
kind: AttachmentKind,
original_name: Option<String>,
inbound_root: &Path,
max_upload_bytes: u64,
) -> Result<AttachmentRef, GatewayError> {
if inline_size.is_some_and(|size| size > max_upload_bytes) {
return Err(GatewayError::Other(format!(
"attachment exceeds the configured {} MB limit",
self.config.max_upload_mb
)));
}
let file = self.api.get_file(file_id).await?;
let size = file.file_size.or(inline_size).unwrap_or(0);
if size > max_upload_bytes {
return Err(GatewayError::Other(format!(
"attachment exceeds the configured {} MB limit",
self.config.max_upload_mb
)));
}
let file_path = file.file_path.ok_or_else(|| {
GatewayError::Other(format!(
"Telegram did not return a downloadable path for `{file_id}`"
))
})?;
let destination = inbound_root.join(format!(
"{}-{}{}",
prefix,
now_millis(),
sanitize_extension(extension)
));
self.api.download_file(&file_path, &destination).await?;
Ok(AttachmentRef {
path: destination,
kind,
original_name,
file_size_bytes: size,
})
}
async fn cancel_turn(&self, profile_id: &str) -> Result<String, GatewayError> {
let turn = { self.active_turns.lock().await.get(profile_id).cloned() };
let Some(turn) = turn else {
return Ok("No active turn to cancel.".to_string());
};
let client = WorkerClient::new(&turn.worker_base_url, &self.config.worker_auth_token)?;
client.cancel_turn(&turn.turn_id).await?;
{
let mut active = self.active_turns.lock().await;
if let Some(state) = active.get_mut(profile_id) {
state.pending_approval = None;
}
}
Ok(
"Cancellation requested. The turn will stop at the next model or approval boundary."
.to_string(),
)
}
async fn insert_active_turn(&self, profile_id: &str, state: ActiveTurnState) {
self.active_turns
.lock()
.await
.insert(profile_id.to_string(), state);
}
async fn is_profile_busy(&self, profile_id: &str) -> bool {
self.active_turns.lock().await.contains_key(profile_id)
}
fn offset_path(&self) -> PathBuf {
self.config.state_root.join("offset")
}
fn load_offset(&self) -> Result<Option<i64>, GatewayError> {
let path = self.offset_path();
if !path.exists() {
return Ok(None);
}
let contents = std::fs::read_to_string(path)?;
let value = contents.trim();
if value.is_empty() {
Ok(None)
} else {
Ok(Some(value.parse::<i64>().map_err(|_| {
GatewayError::Other("stored Telegram update offset is invalid".to_string())
})?))
}
}
fn store_offset(&self, offset: Option<i64>) -> Result<(), GatewayError> {
let path = self.offset_path();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(
path,
offset.map_or(String::new(), |value| value.to_string()),
)?;
Ok(())
}
}
pub async fn reconcile_workers(config: &GatewayConfig) -> Result<ReconcileResult, GatewayError> {
persist_gateway_template(config)?;
let manifest = load_manifest(config)?;
let manager = DockerWorkerManager::new(config.clone())?;
let result = manager.reconcile_manifest(&manifest).await?;
Ok(result)
}
fn persist_gateway_template(config: &GatewayConfig) -> Result<(), GatewayError> {
let manager = UnraidTemplateManager::new(
config.template_dir.clone(),
config.template_archive_dir.clone(),
config.template_file_prefix.clone(),
);
let anthropic_auth_token = std::env::var("ANTHROPIC_AUTH_TOKEN")
.ok()
.filter(|value| !value.trim().is_empty());
let anthropic_api_key = std::env::var("ANTHROPIC_API_KEY")
.ok()
.filter(|value| !value.trim().is_empty());
manager.write_gateway_template(
config,
anthropic_auth_token.as_deref(),
anthropic_api_key.as_deref(),
)?;
Ok(())
}
pub fn load_manifest(config: &GatewayConfig) -> Result<GatewayManifest, GatewayError> {
if !config.manifest_path.exists() {
return Err(GatewayError::Other(format!(
"gateway manifest does not exist at {}",
config.manifest_path.display()
)));
}
Ok(GatewayManifest::load(&config.manifest_path)?)
}
pub fn load_or_init_manifest(config: &GatewayConfig) -> Result<GatewayManifest, GatewayError> {
if config.manifest_path.exists() {
return Ok(GatewayManifest::load(&config.manifest_path)?);
}
let manifest = default_manifest(config)?;
manifest.save_atomic(&config.manifest_path)?;
Ok(manifest)
}
pub fn save_manifest(
config: &GatewayConfig,
manifest: &GatewayManifest,
) -> Result<(), GatewayError> {
manifest.save_atomic(&config.manifest_path)?;
Ok(())
}
pub fn migrate_standalone_registry(
config: &GatewayConfig,
source_registry_path: &Path,
) -> Result<GatewayManifest, GatewayError> {
if !source_registry_path.exists() {
return Err(GatewayError::Other(format!(
"standalone registry does not exist at {}",
source_registry_path.display()
)));
}
let registry = AgentRegistry::load_or_create(source_registry_path)?;
let mut manifest = default_manifest(config)?;
for record in registry.records() {
let profile_id = ProfileId::new(record.agent_instance_id.as_str());
let display_name = display_name_from_record(
record.telegram.first_name.as_deref(),
record.telegram.last_name.as_deref(),
record.telegram.username.as_deref(),
);
manifest.add_profile(profile_id, display_name, Some(record.telegram.user_id))?;
}
manifest.save_atomic(&config.manifest_path)?;
Ok(manifest)
}
pub fn default_manifest(config: &GatewayConfig) -> Result<GatewayManifest, GatewayError> {
let worker_image = config.worker_image.clone().ok_or_else(|| {
GatewayError::Other(
"CLAW_GATEWAY_WORKER_IMAGE must be set before creating a new manifest".to_string(),
)
})?;
let inherited_env = if config.inherited_env.is_empty() {
let mut inferred = Vec::new();
if std::env::var("ANTHROPIC_AUTH_TOKEN")
.ok()
.filter(|value| !value.trim().is_empty())
.is_some()
{
inferred.push("ANTHROPIC_AUTH_TOKEN".to_string());
}
if std::env::var("ANTHROPIC_API_KEY")
.ok()
.filter(|value| !value.trim().is_empty())
.is_some()
{
inferred.push("ANTHROPIC_API_KEY".to_string());
}
inferred
} else {
config.inherited_env.clone()
};
Ok(GatewayManifest {
version: 1,
gateway: GatewaySettings {
worker_image,
worker_network: config.worker_network.clone(),
template_dir: config.template_dir.clone(),
template_file_prefix: config.template_file_prefix.clone(),
template_archive_dir: config.template_archive_dir.clone(),
inherited_env,
},
worker_defaults: WorkerDefaults {
bind_port: config.worker_bind_port,
default_cwd: config.worker_default_cwd.clone(),
permission_mode: config.worker_permission_mode.as_str().to_string(),
model: config.worker_model.clone(),
host_state_root: config.worker_host_state_root.clone(),
host_workspace_root: config.worker_host_workspace_root.clone(),
},
profiles: Vec::new(),
})
}
async fn clear_active_turn(
active_turns: &Arc<Mutex<BTreeMap<String, ActiveTurnState>>>,
profile_id: &str,
) {
active_turns.lock().await.remove(profile_id);
}
async fn notify_background_agents_once(
api: &TelegramApi,
config: &GatewayConfig,
) -> Result<(), GatewayError> {
let manifest = load_manifest(config)?;
for profile in &manifest.profiles {
let chat_ids = profile
.channels
.iter()
.filter_map(|channel| channel.telegram_user_id())
.collect::<Vec<_>>();
if chat_ids.is_empty() {
continue;
}
let base_url = format!(
"http://{}:{}",
profile.worker.container_name, manifest.worker_defaults.bind_port
);
let client = match WorkerClient::new(&base_url, &config.worker_auth_token) {
Ok(client) => client,
Err(error) => {
eprintln!(
"background notifier skipped profile {}: {error}",
profile.profile_id
);
continue;
}
};
notify_background_approvals_for_profile(
api,
&client,
profile.profile_id.as_str(),
&chat_ids,
)
.await;
notify_pending_mailbox_messages_for_profile(
api,
&client,
profile.profile_id.as_str(),
&chat_ids,
)
.await;
let agents = match client.agents().await {
Ok(agents) => agents,
Err(error) => {
eprintln!(
"background notifier could not list agents for profile {}: {error}",
profile.profile_id
);
continue;
}
};
for agent in agents
.agents
.into_iter()
.filter(|agent| agent.status.is_terminal() && !agent.notified)
{
let text = render_background_agent_terminal_notice(&agent);
let mut delivered = false;
for chat_id in &chat_ids {
if api.send_message(*chat_id, &text, None).await.is_ok() {
delivered = true;
}
}
if delivered {
let _ = client.mark_agent_notified(&agent.task_id).await;
}
}
}
Ok(())
}
async fn notify_pending_mailbox_messages_for_profile(
api: &TelegramApi,
client: &WorkerClient,
profile_id: &str,
chat_ids: &[i64],
) {
let messages = match client.pending_mailbox_messages("lead").await {
Ok(response) => response.messages,
Err(error) => {
eprintln!(
"background notifier could not list mailbox messages for profile {profile_id}: {error}"
);
return;
}
};
if messages.is_empty() {
return;
}
let mut delivered_ids = Vec::new();
for message in messages {
let text = render_pending_mailbox_notice(&message);
let mut delivered = false;
for chat_id in chat_ids {
if api.send_message(*chat_id, &text, None).await.is_ok() {
delivered = true;
}
}
if delivered {
delivered_ids.push(message.envelope.id.clone());
}
}
if !delivered_ids.is_empty() {
let _ = client
.mark_mailbox_messages_notified("lead", &delivered_ids)
.await;
}
}
async fn notify_background_approvals_for_profile(
api: &TelegramApi,
client: &WorkerClient,
profile_id: &str,
chat_ids: &[i64],
) {
let approvals = match client.background_approvals().await {
Ok(WorkerBackgroundApprovalListResponse { approvals }) => approvals,
Err(error) => {
eprintln!(
"background notifier could not list approvals for profile {profile_id}: {error}"
);
return;
}
};
for approval in approvals.into_iter().filter(|record| !record.notified) {
let text = render_background_approval_notice(&approval);
let keyboard = background_approval_keyboard(&approval.approval_id, &approval.tool_name);
let mut delivered = false;
for chat_id in chat_ids {
if api.send_message(*chat_id, &text, Some(&keyboard)).await.is_ok() {
delivered = true;
}
}
if delivered {
let _ = client
.mark_background_approval_notified(&approval.approval_id)
.await;
}
}
}
fn turn_source(user: &User, message: &Message) -> TurnSource {
TurnSource {
channel: "telegram".to_string(),
sender_id: user.id.to_string(),
chat_id: Some(message.chat.id.to_string()),
display_name: display_name_from_record(
user.first_name.as_deref(),
user.last_name.as_deref(),
user.username.as_deref(),
),
}
}
fn display_name_from_record(
first_name: Option<&str>,
last_name: Option<&str>,
username: Option<&str>,
) -> Option<String> {
let full_name = [first_name.unwrap_or(""), last_name.unwrap_or("")]
.join(" ")
.trim()
.to_string();
if !full_name.is_empty() {
return Some(full_name);
}
username.map(ToString::to_string)
}
#[derive(Clone)]
struct TrackedMessage {
chat_id: i64,
message_id: i64,
last_text: String,
}
impl TrackedMessage {
fn new(chat_id: i64, message_id: i64, last_text: impl Into<String>) -> Self {
Self {
chat_id,
message_id,
last_text: last_text.into(),
}
}
}
async fn sync_text_messages(
api: &TelegramApi,
chat_id: i64,
tracked: &mut Vec<TrackedMessage>,
text: &str,
) -> Result<(), TelegramApiError> {
let segments = split_text(text, 3500);
if segments.is_empty() {
return Ok(());
}
for (index, segment) in segments.iter().enumerate() {
if let Some(message) = tracked.get_mut(index) {
if message.last_text != *segment {
api.edit_message_text(message.chat_id, message.message_id, segment, None)
.await?;
message.last_text = segment.clone();
}
} else {
let sent = api.send_message(chat_id, segment, None).await?;
tracked.push(TrackedMessage::new(chat_id, sent.message_id, segment));
}
}
Ok(())
}
fn split_text(text: &str, max_chars: usize) -> Vec<String> {
if text.trim().is_empty() {
return Vec::new();
}
if max_chars == 0 {
return vec![text.to_string()];
}
let mut chunks = Vec::new();
let mut current = String::new();
let mut current_len = 0;
for line in text.lines() {
let line_len = line.chars().count();
let candidate_len = if current.is_empty() {
line_len
} else {
current_len + 1 + line_len
};
if candidate_len > max_chars && !current.is_empty() {
chunks.push(current);
current = String::new();
current_len = 0;
if line_len > max_chars {
push_wrapped_line(line, max_chars, &mut chunks, &mut current, &mut current_len);
} else {
current.push_str(line);
current_len = line_len;
}
} else if candidate_len > max_chars {
push_wrapped_line(line, max_chars, &mut chunks, &mut current, &mut current_len);
} else {
if !current.is_empty() {
current.push('\n');
}
current.push_str(line);
current_len = candidate_len;
}
}
if !current.is_empty() {
chunks.push(current);
}
chunks
}
fn push_wrapped_line(
line: &str,
max_chars: usize,
chunks: &mut Vec<String>,
current: &mut String,
current_len: &mut usize,
) {
let mut buffer = String::new();
let mut buffer_len = 0;
for ch in line.chars() {
buffer.push(ch);
buffer_len += 1;
if buffer_len >= max_chars {
chunks.push(std::mem::take(&mut buffer));
buffer_len = 0;
}
}
*current = buffer;
*current_len = buffer_len;
}
fn render_help() -> String {
"Commands\n/start or /help - show help\n/status - show the routed worker status\n/tasks - list task-list items\n/task <id> - inspect a task-list or runtime task\n/team - show the active team context\n/agents - list spawned agents\n/agent <id> - inspect a spawned agent\n/messages - show recent team mailbox messages\n/miniapp - open the Telegram Mini App\n/stop_task <id> - stop a runtime task\n/new - start a fresh session\n/cancel - cancel the active turn\n\nSend a normal message, photo, or document to talk to your profile worker.".to_string()
}
enum Command {
Start,
Help,
Status,
Tasks,
Task(String),
Team,
Agents,
Agent(String),
StopTask(String),
Messages,
MiniApp,
New,
Cancel,
}
fn parse_command(message: &Message) -> Option<Command> {
let text = message.text_or_caption()?.trim();
let mut parts = text.split_whitespace();
let command = parts.next()?;
let command = command.split('@').next().unwrap_or(command);
match command {
"/start" => Some(Command::Start),
"/help" => Some(Command::Help),
"/status" => Some(Command::Status),
"/tasks" => Some(Command::Tasks),
"/task" => parts.next().map(|value| Command::Task(value.to_string())),
"/team" => Some(Command::Team),
"/agents" => Some(Command::Agents),
"/agent" => parts.next().map(|value| Command::Agent(value.to_string())),
"/stop_task" => parts.next().map(|value| Command::StopTask(value.to_string())),
"/messages" => Some(Command::Messages),
"/miniapp" => Some(Command::MiniApp),
"/new" => Some(Command::New),
"/cancel" => Some(Command::Cancel),
_ => None,
}
}
fn render_task_list(response: &WorkerTaskListResponse) -> String {
if response.tasks.is_empty() {
return format!("Tasks\nTask list: {}\nNo tasks.", response.task_list_id);
}
let mut lines = vec![format!("Tasks\nTask list: {}", response.task_list_id)];
for task in &response.tasks {
let blocked = if task.blocked_by.is_empty() {
String::new()
} else {
format!(" blocked by {}", task.blocked_by.join(", "))
};
lines.push(format!(
"{} [{}] {}{}",
task.id, task.status, task.subject, blocked
));
}
lines.join("\n")
}
fn render_task_snapshot(task_id: &str, response: &WorkerTaskSnapshotResponse) -> String {
if let Some(task) = &response.task {
return format!(
"Task {}\nSubject: {}\nStatus: {}\nOwner: {}\nDescription: {}\nBlocks: {}\nBlocked by: {}",
task.id,
task.subject,
task.status,
task.owner.as_deref().unwrap_or("(unassigned)"),
task.description,
if task.blocks.is_empty() {
"(none)".to_string()
} else {
task.blocks.join(", ")
},
if task.blocked_by.is_empty() {
"(none)".to_string()
} else {
task.blocked_by.join(", ")
}
);
}
if let Some(runtime_task) = &response.runtime_task {
return render_runtime_task(task_id, runtime_task);
}
format!("Task `{task_id}` was not found.")
}
fn render_team_snapshot(response: &WorkerTeamSnapshotResponse) -> String {
let Some(team) = &response.team else {
return format!(
"Team\nNo active team.\nTask list: {}",
response.task_list_id
);
};
let members = team
.members
.iter()
.map(|member| {
let mut parts = vec![member.name.clone()];
if let Some(status) = member.status.as_deref() {
parts.push(format!("[{status}]"));
}
if let Some(agent_type) = member.agent_type.as_deref() {
parts.push(format!("type={agent_type}"));
}
if let Some(model) = member.model.as_deref() {
parts.push(format!("model={model}"));
}
parts.join(" ")
})
.collect::<Vec<_>>();
format!(
"Team\nName: {}\nLead: {}\nTask list: {}\nMembers: {}\nDescription: {}",
team.team_name,
team.lead_agent_id,
response.task_list_id,
if members.is_empty() {
"(none)".to_string()
} else {
members.join(", ")
},
team.description.as_deref().unwrap_or("(none)")
)
}
fn render_agents_snapshot(response: &WorkerAgentListResponse) -> String {
if response.agents.is_empty() {
return "Agents\nNo spawned agents.".to_string();
}
let mut lines = vec!["Agents".to_string()];
for agent in &response.agents {
lines.push(format!(
"{} [{}] {}",
agent.task_id,
agent.status,
agent
.agent_name
.as_deref()
.or(agent.agent_id.as_deref())
.unwrap_or("agent")
));
}
lines.join("\n")
}
fn render_agent_snapshot(agent: &RuntimeTaskRecord) -> String {
render_runtime_task(
agent.agent_id.as_deref().unwrap_or(agent.task_id.as_str()),
agent,
)
}
fn render_background_agent_terminal_notice(agent: &RuntimeTaskRecord) -> String {
let title = match agent.status {
RuntimeTaskStatus::Completed => "Background agent completed",
RuntimeTaskStatus::Failed => "Background agent failed",
RuntimeTaskStatus::Stopped => "Background agent stopped",
RuntimeTaskStatus::Running => "Background agent update",
};
let mut lines = vec![
title.to_string(),
format!(
"{} [{}] {}",
agent
.agent_name
.as_deref()
.or(agent.agent_id.as_deref())
.unwrap_or(agent.task_id.as_str()),
agent.status,
agent.description
),
format!("Team: {}", agent.team_name.as_deref().unwrap_or("(none)")),
format!("CWD: {}", agent.cwd.as_deref().unwrap_or("(none)")),
];
if let Some(worktree_path) = agent.worktree_path.as_deref() {
lines.push(format!("Worktree: {worktree_path}"));
lines.push(format!(
"Worktree branch: {}",
agent.worktree_branch.as_deref().unwrap_or("(none)")
));
}
if let Some(result) = agent.final_result.as_deref().filter(|value| !value.trim().is_empty()) {
lines.push(String::from("Result:"));
lines.push(truncate_notice_text(result, 1200));
}
if let Some(error) = agent.error.as_deref().filter(|value| !value.trim().is_empty()) {
lines.push(String::from("Error:"));
lines.push(truncate_notice_text(error, 1200));
}
if let Some(path) = agent.output_file.as_deref() {
lines.push(format!("Output file: {path}"));
}
lines.join("\n")
}
fn render_background_approval_notice(approval: &BackgroundApprovalRecord) -> String {
let mut lines = vec![
String::from("Approval required"),
format!("Background task: {}", approval.task_id),
format!("Tool: {}", approval.tool_name),
format!("Current mode: {}", approval.current_mode),
format!("Required mode: {}", approval.required_mode),
];
if let Some(reason) = approval.reason.as_deref().filter(|value| !value.trim().is_empty()) {
lines.push(format!("Reason: {reason}"));
}
if !approval.input.trim().is_empty() {
lines.push(String::from("Input:"));
lines.push(truncate_notice_text(&approval.input, 1200));
}
lines.join("\n")
}
fn truncate_notice_text(value: &str, max_chars: usize) -> String {
let mut truncated = String::new();
for ch in value.chars().take(max_chars) {
truncated.push(ch);
}
if value.chars().count() > max_chars {
truncated.push_str("\n");
}
truncated
}
fn render_runtime_task(label: &str, task: &RuntimeTaskRecord) -> String {
let worktree = task.worktree_path.as_deref().map_or_else(String::new, |path| {
format!(
"\nWorktree: {}\nWorktree branch: {}",
path,
task.worktree_branch.as_deref().unwrap_or("(none)")
)
});
format!(
"Runtime task {}\nKind: {:?}\nStatus: {}\nDescription: {}\nTeam: {}\nCWD: {}{}\nOutput file: {}",
label,
task.kind,
task.status,
task.description,
task.team_name.as_deref().unwrap_or("(none)"),
task.cwd.as_deref().unwrap_or("(none)"),
worktree,
task.output_file.as_deref().unwrap_or("(none)")
)
}
fn render_task_created_notice(task_list_id: &str, task: &TaskListRecord) -> String {
format!(
"Task created\nTask list: {}\n{} [{}] {}",
task_list_id, task.id, task.status, task.subject
)
}
fn render_task_updated_notice(task_list_id: &str, task: &TaskListRecord) -> String {
format!(
"Task updated\nTask list: {}\n{} [{}] {}",
task_list_id, task.id, task.status, task.subject
)
}
fn render_task_stopped_notice(task: &RuntimeTaskRecord) -> String {
format!(
"Task stopped\n{} [{}] {}",
task.task_id, task.status, task.description
)
}
fn render_agent_spawned_notice(agent: &RuntimeTaskRecord) -> String {
let worktree = agent.worktree_path.as_deref().map_or_else(String::new, |path| {
format!(
"\nWorktree: {}\nWorktree branch: {}",
path,
agent.worktree_branch.as_deref().unwrap_or("(none)")
)
});
format!(
"Agent spawned\n{} [{}] {}\nTeam: {}\nCWD: {}{}",
agent
.agent_name
.as_deref()
.or(agent.agent_id.as_deref())
.unwrap_or(agent.task_id.as_str()),
agent.status,
agent.description,
agent.team_name.as_deref().unwrap_or("(none)"),
agent.cwd.as_deref().unwrap_or("(none)"),
worktree
)
}
fn render_team_created_notice(team: &WorkerTeamCreatedEvent) -> String {
format!(
"Team created\nName: {}\nLead: {}\nTask list: {}\nConfig: {}",
team.team.team_name, team.team.lead_agent_id, team.task_list_id, team.team_file_path
)
}
fn render_team_deleted_notice(team_name: &str) -> String {
format!("Team deleted\nName: {team_name}")
}
fn render_mailbox_message_notice(message: &WorkerMailboxMessageEvent) -> String {
format!(
"Message delivered\nTeam: {}\nFrom: {}\nTo: {}\nSummary: {}",
message.team_name,
message.sender,
if message.recipients.is_empty() {
"(none)".to_string()
} else {
message.recipients.join(", ")
},
message.summary.as_deref().unwrap_or("(none)")
)
}
fn render_pending_mailbox_notice(message: &MailboxMessage) -> String {
format!(
"Team message\nFrom: {}\nTo: {}\nSummary: {}\nMessage: {}",
message.envelope.from,
message.recipient,
message.envelope.summary.as_deref().unwrap_or("(none)"),
mailbox_message_preview(&message.envelope.message)
)
}
fn render_mailbox_summary(response: &WorkerMailboxSummaryResponse) -> String {
if response.mailbox.recent_messages.is_empty() {
return format!(
"Messages\nTeam: {}\nNo recent team messages.",
response.mailbox.team_name.as_deref().unwrap_or("(none)")
);
}
let mut lines = vec![format!(
"Messages\nTeam: {}",
response.mailbox.team_name.as_deref().unwrap_or("(none)")
)];
for item in &response.mailbox.recent_messages {
let preview = item
.envelope
.summary
.clone()
.unwrap_or_else(|| mailbox_message_preview(&item.envelope.message));
lines.push(format!(
"{} -> {}: {}",
item.envelope.from, item.recipient, preview
));
}
lines.join("\n")
}
fn mailbox_message_preview(message: &serde_json::Value) -> String {
match message {
serde_json::Value::String(value) => truncate_notice_text(value, 600),
other => truncate_notice_text(&other.to_string(), 600),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ApprovalAction {
approval_id: String,
target: ApprovalTarget,
action: ApprovalKind,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum ApprovalTarget {
Foreground { turn_id: String },
Background,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ApprovalKind {
AllowOnce,
AllowToolForSession,
AllowAllForSession,
Deny,
Cancel,
}
impl ApprovalAction {
fn parse(value: &str) -> Option<Self> {
let parts = value.split(':').collect::<Vec<_>>();
match parts.as_slice() {
["cta", turn_id, approval_id, action] => Some(Self {
approval_id: (*approval_id).to_string(),
target: ApprovalTarget::Foreground {
turn_id: (*turn_id).to_string(),
},
action: parse_approval_kind(action)?,
}),
["bga", approval_id, action] => Some(Self {
approval_id: (*approval_id).to_string(),
target: ApprovalTarget::Background,
action: parse_approval_kind(action)?,
}),
_ => None,
}
}
fn decision(&self) -> WorkerApprovalDecision {
match self.action {
ApprovalKind::AllowOnce => WorkerApprovalDecision::ApproveOnce,
ApprovalKind::AllowToolForSession => WorkerApprovalDecision::ApproveToolForSession,
ApprovalKind::AllowAllForSession => WorkerApprovalDecision::ApproveAllForSession,
ApprovalKind::Deny => WorkerApprovalDecision::Deny {
reason: "tool call denied from Telegram approval prompt".to_string(),
},
ApprovalKind::Cancel => WorkerApprovalDecision::CancelTurn,
}
}
fn label(&self) -> &'static str {
match self.action {
ApprovalKind::AllowOnce => "Approved once.",
ApprovalKind::AllowToolForSession => "Tool allowed for this session.",
ApprovalKind::AllowAllForSession => "All tools allowed for this session.",
ApprovalKind::Deny => "Denied.",
ApprovalKind::Cancel => "Cancelled.",
}
}
}
fn parse_approval_kind(value: &str) -> Option<ApprovalKind> {
match value {
"allow" | "allow_once" => Some(ApprovalKind::AllowOnce),
"allow_tool_session" => Some(ApprovalKind::AllowToolForSession),
"allow_all_session" => Some(ApprovalKind::AllowAllForSession),
"deny" => Some(ApprovalKind::Deny),
"cancel" => Some(ApprovalKind::Cancel),
_ => None,
}
}
fn approval_keyboard(turn_id: &str, approval_id: &str, tool_name: &str) -> InlineKeyboardMarkup {
approval_keyboard_with_prefix("cta", Some(turn_id), approval_id, tool_name)
}
fn background_approval_keyboard(approval_id: &str, tool_name: &str) -> InlineKeyboardMarkup {
approval_keyboard_with_prefix("bga", None, approval_id, tool_name)
}
fn approval_keyboard_with_prefix(
prefix: &str,
turn_id: Option<&str>,
approval_id: &str,
tool_name: &str,
) -> InlineKeyboardMarkup {
let encode = |action: &str| match turn_id {
Some(turn_id) => format!("{prefix}:{turn_id}:{approval_id}:{action}"),
None => format!("{prefix}:{approval_id}:{action}"),
};
let tool_label = session_tool_button_label(tool_name);
InlineKeyboardMarkup {
inline_keyboard: vec![
vec![
InlineKeyboardButton {
text: "Approve once".to_string(),
callback_data: Some(encode("allow_once")),
url: None,
web_app: None,
},
InlineKeyboardButton {
text: tool_label,
callback_data: Some(encode("allow_tool_session")),
url: None,
web_app: None,
},
],
vec![
InlineKeyboardButton {
text: "Allow all for session".to_string(),
callback_data: Some(encode("allow_all_session")),
url: None,
web_app: None,
},
InlineKeyboardButton {
text: "Deny".to_string(),
callback_data: Some(encode("deny")),
url: None,
web_app: None,
},
],
vec![InlineKeyboardButton {
text: "Cancel turn".to_string(),
callback_data: Some(encode("cancel")),
url: None,
web_app: None,
}],
],
}
}
fn session_tool_button_label(tool_name: &str) -> String {
const MAX_LABEL_LEN: usize = 26;
let trimmed = tool_name.trim();
let short_name = if trimmed.chars().count() > MAX_LABEL_LEN {
let shortened = trimmed.chars().take(MAX_LABEL_LEN - 1).collect::<String>();
format!("{shortened}")
} else if trimmed.is_empty() {
"tool".to_string()
} else {
trimmed.to_string()
};
format!("Allow {short_name} for session")
}
fn sanitize_extension(extension: &str) -> String {
let trimmed = extension.trim();
if trimmed.starts_with('.') {
trimmed.to_string()
} else {
format!(".{trimmed}")
}
}
fn now_millis() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or(0)
}
fn is_image_path(path: &Path) -> bool {
path.extension()
.and_then(|value| value.to_str())
.is_some_and(|extension| {
matches!(
extension.to_ascii_lowercase().as_str(),
"png" | "jpg" | "jpeg" | "gif" | "webp"
)
})
}
fn spawn_typing_loop(
api: TelegramApi,
chat_id: i64,
done: Arc<std::sync::atomic::AtomicBool>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while !done.load(std::sync::atomic::Ordering::SeqCst) {
let _ = api.send_chat_action(chat_id, "typing").await;
tokio::time::sleep(Duration::from_secs(4)).await;
}
})
}
#[derive(Debug)]
pub enum GatewayError {
Io(std::io::Error),
Telegram(TelegramApiError),
Manifest(ManifestError),
Registry(RegistryError),
WorkerManager(WorkerManagerError),
WorkerClient(WorkerClientError),
Template(TemplateError),
Other(String),
}
impl Display for GatewayError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(error) => write!(f, "{error}"),
Self::Telegram(error) => write!(f, "{error}"),
Self::Manifest(error) => write!(f, "{error}"),
Self::Registry(error) => write!(f, "{error}"),
Self::WorkerManager(error) => write!(f, "{error}"),
Self::WorkerClient(error) => write!(f, "{error}"),
Self::Template(error) => write!(f, "{error}"),
Self::Other(message) => write!(f, "{message}"),
}
}
}
impl std::error::Error for GatewayError {}
impl From<std::io::Error> for GatewayError {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
impl From<TelegramApiError> for GatewayError {
fn from(value: TelegramApiError) -> Self {
Self::Telegram(value)
}
}
impl From<ManifestError> for GatewayError {
fn from(value: ManifestError) -> Self {
Self::Manifest(value)
}
}
impl From<RegistryError> for GatewayError {
fn from(value: RegistryError) -> Self {
Self::Registry(value)
}
}
impl From<WorkerManagerError> for GatewayError {
fn from(value: WorkerManagerError) -> Self {
Self::WorkerManager(value)
}
}
impl From<WorkerClientError> for GatewayError {
fn from(value: WorkerClientError) -> Self {
Self::WorkerClient(value)
}
}
impl From<TemplateError> for GatewayError {
fn from(value: TemplateError) -> Self {
Self::Template(value)
}
}
#[cfg(test)]
mod tests {
use std::time::{SystemTime, UNIX_EPOCH};
use super::{
load_or_init_manifest, parse_command, session_tool_button_label, split_text,
ApprovalAction, ApprovalTarget,
};
use crate::config::GatewayConfig;
use crate::telegram_api::{Chat, Message, User};
#[test]
fn split_text_breaks_long_lines() {
let chunks = split_text(&"a".repeat(7200), 3500);
assert_eq!(chunks.len(), 3);
}
#[test]
fn approval_action_parses_callback_data() {
let parsed = ApprovalAction::parse("cta:turn:approval:allow").expect("action should parse");
assert_eq!(
parsed.target,
ApprovalTarget::Foreground {
turn_id: "turn".to_string()
}
);
assert_eq!(parsed.approval_id, "approval");
assert_eq!(parsed.label(), "Approved once.");
}
#[test]
fn approval_action_supports_session_scope_buttons() {
let tool = ApprovalAction::parse("cta:turn:approval:allow_tool_session")
.expect("tool approval should parse");
assert_eq!(tool.label(), "Tool allowed for this session.");
let all = ApprovalAction::parse("cta:turn:approval:allow_all_session")
.expect("global approval should parse");
assert_eq!(all.label(), "All tools allowed for this session.");
}
#[test]
fn approval_action_parses_background_callback_data() {
let parsed =
ApprovalAction::parse("bga:approval:allow_tool_session").expect("action should parse");
assert_eq!(parsed.target, ApprovalTarget::Background);
assert_eq!(parsed.approval_id, "approval");
assert_eq!(parsed.label(), "Tool allowed for this session.");
}
#[test]
fn session_tool_button_label_truncates_long_names() {
let label = session_tool_button_label("very-long-tool-name-that-keeps-going");
assert!(label.starts_with("Allow very-long-tool-name-tha"));
assert!(label.ends_with("… for session"));
}
#[test]
fn parse_command_recognizes_native_commands() {
let message = Message {
message_id: 1,
chat: Chat {
id: 1,
kind: "private".to_string(),
},
from: Some(User {
id: 1,
username: None,
first_name: None,
last_name: None,
}),
text: Some("/status".to_string()),
caption: None,
photo: Vec::new(),
document: None,
};
assert!(parse_command(&message).is_some());
}
#[test]
fn load_or_init_manifest_creates_missing_manifest() {
let state_root = std::env::temp_dir().join(format!(
"claw-telegram-gateway-state-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock should be after epoch")
.as_nanos()
));
let manifest_path = state_root.join("profiles.json");
let state_root_string = state_root.display().to_string();
let manifest_path_string = manifest_path.display().to_string();
let config = GatewayConfig::from_iter([
("CLAW_GATEWAY_TELEGRAM_BOT_TOKEN", "secret"),
("CLAW_WORKER_AUTH_TOKEN", "worker-secret"),
("CLAW_GATEWAY_STATE_ROOT", state_root_string.as_str()),
("CLAW_GATEWAY_MANIFEST", manifest_path_string.as_str()),
(
"CLAW_GATEWAY_WORKER_IMAGE",
"git.wylab.me/wylab/claw-telegram:latest",
),
])
.expect("gateway config should parse");
let manifest = load_or_init_manifest(&config).expect("manifest should initialize");
assert!(manifest_path.exists());
assert_eq!(manifest.gateway.worker_image, "git.wylab.me/wylab/claw-telegram:latest");
let _ = std::fs::remove_dir_all(&state_root);
}
}