Add Unraid Telegram gateway workers
Build Claw Telegram / build (push) Failing after 1m46s
Build Claw Telegram / cleanup (push) Has been skipped

This commit is contained in:
Wylabb
2026-04-04 20:10:19 +02:00
parent 3a6d1031ca
commit 1c349197c6
20 changed files with 6814 additions and 16 deletions
+546 -8
View File
@@ -17,6 +17,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "api"
version = "0.1.0"
@@ -29,12 +38,107 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-trait"
version = "0.1.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "autocfg"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "axum"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit",
"memchr",
"mime",
"multer",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-core"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "base64"
version = "0.22.1"
@@ -65,6 +169,50 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bollard"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d41711ad46fda47cd701f6908e59d1bd6b9a2b7464c0d0aeab95c6d37096ff8a"
dependencies = [
"base64",
"bollard-stubs",
"bytes",
"futures-core",
"futures-util",
"hex",
"http",
"http-body-util",
"hyper",
"hyper-named-pipe",
"hyper-util",
"hyperlocal",
"log",
"pin-project-lite",
"serde",
"serde_derive",
"serde_json",
"serde_repr",
"serde_urlencoded",
"thiserror 1.0.69",
"tokio",
"tokio-util",
"tower-service",
"url",
"winapi",
]
[[package]]
name = "bollard-stubs"
version = "1.45.0-rc.26.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7c5415e3a6bc6d3e99eff6268e488fd4ee25e7b28c10f08fa6760bd9de16e4"
dependencies = [
"serde",
"serde_repr",
"serde_with",
]
[[package]]
name = "bumpalo"
version = "3.20.2"
@@ -99,16 +247,61 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "channel-gateway-core"
version = "0.1.0"
dependencies = [
"api",
"plugins",
"runtime",
"serde",
"serde_json",
"tokio",
"tools",
]
[[package]]
name = "chrono"
version = "0.4.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0"
dependencies = [
"iana-time-zone",
"num-traits",
"serde",
"windows-link",
]
[[package]]
name = "claw-profile-worker"
version = "0.1.0"
dependencies = [
"async-stream",
"axum",
"base64",
"channel-gateway-core",
"futures-core",
"reqwest",
"serde",
"serde_json",
"tokio",
]
[[package]]
name = "claw-telegram"
version = "0.1.0"
dependencies = [
"api",
"base64",
"bollard",
"channel-gateway-core",
"futures-util",
"plugins",
"reqwest",
"runtime",
"serde",
"serde_json",
"sha2",
"tokio",
"tools",
]
@@ -140,6 +333,12 @@ dependencies = [
"tools",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpufeatures"
version = "0.2.17"
@@ -200,6 +399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c"
dependencies = [
"powerfmt",
"serde_core",
]
[[package]]
@@ -223,6 +423,21 @@ dependencies = [
"syn",
]
[[package]]
name = "dyn-clone"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
[[package]]
name = "encoding_rs"
version = "0.8.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3"
dependencies = [
"cfg-if",
]
[[package]]
name = "endian-type"
version = "0.1.2"
@@ -259,7 +474,7 @@ checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78"
dependencies = [
"cfg-if",
"rustix 1.1.4",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -315,6 +530,17 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
[[package]]
name = "futures-macro"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.32"
@@ -335,6 +561,7 @@ checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
@@ -394,12 +621,24 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "home"
version = "0.5.12"
@@ -448,6 +687,12 @@ version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "httpdate"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "hyper"
version = "1.9.0"
@@ -461,6 +706,7 @@ dependencies = [
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"smallvec",
@@ -468,6 +714,21 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-named-pipe"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278"
dependencies = [
"hex",
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
"winapi",
]
[[package]]
name = "hyper-rustls"
version = "0.27.7"
@@ -508,6 +769,45 @@ dependencies = [
"tracing",
]
[[package]]
name = "hyperlocal"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7"
dependencies = [
"hex",
"http-body-util",
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "iana-time-zone"
version = "0.1.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"log",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "2.1.1"
@@ -610,6 +910,17 @@ dependencies = [
"icu_properties",
]
[[package]]
name = "indexmap"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
"serde",
]
[[package]]
name = "indexmap"
version = "2.13.0"
@@ -617,7 +928,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017"
dependencies = [
"equivalent",
"hashbrown",
"hashbrown 0.16.1",
"serde",
"serde_core",
]
[[package]]
@@ -705,6 +1018,12 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "matchit"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "memchr"
version = "2.8.0"
@@ -758,6 +1077,23 @@ dependencies = [
"tokio",
]
[[package]]
name = "multer"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http",
"httparse",
"memchr",
"mime",
"spin",
"version_check",
]
[[package]]
name = "nibble_vec"
version = "0.1.0"
@@ -785,6 +1121,15 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967"
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "once_cell"
version = "1.21.4"
@@ -861,7 +1206,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "740ebea15c5d1428f910cd1a5f52cebf8d25006245ed8ade92702f4943d91e07"
dependencies = [
"base64",
"indexmap",
"indexmap 2.13.0",
"quick-xml",
"serde",
"time",
@@ -950,7 +1295,7 @@ dependencies = [
"rustc-hash",
"rustls",
"socket2",
"thiserror",
"thiserror 2.0.18",
"tokio",
"tracing",
"web-time",
@@ -971,7 +1316,7 @@ dependencies = [
"rustls",
"rustls-pki-types",
"slab",
"thiserror",
"thiserror 2.0.18",
"tinyvec",
"tracing",
"web-time",
@@ -1054,6 +1399,26 @@ dependencies = [
"bitflags",
]
[[package]]
name = "ref-cast"
version = "1.0.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d"
dependencies = [
"ref-cast-impl",
]
[[package]]
name = "ref-cast-impl"
version = "1.0.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "regex"
version = "1.12.3"
@@ -1114,12 +1479,14 @@ dependencies = [
"sync_wrapper",
"tokio",
"tokio-rustls",
"tokio-util",
"tower",
"tower-http",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
]
@@ -1169,7 +1536,7 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys 0.4.15",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -1283,6 +1650,30 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "schemars"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f"
dependencies = [
"dyn-clone",
"ref-cast",
"serde",
"serde_json",
]
[[package]]
name = "schemars"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc"
dependencies = [
"dyn-clone",
"ref-cast",
"serde",
"serde_json",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -1332,6 +1723,28 @@ dependencies = [
"zmij",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457"
dependencies = [
"itoa",
"serde",
"serde_core",
]
[[package]]
name = "serde_repr"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@@ -1344,6 +1757,24 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_with"
version = "3.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd5414fad8e6907dbdd5bc441a50ae8d6e26151a03b1de04d89a5576de61d01f"
dependencies = [
"base64",
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.13.0",
"schemars 0.9.0",
"schemars 1.2.1",
"serde_core",
"serde_json",
"time",
]
[[package]]
name = "sha2"
version = "0.10.9"
@@ -1420,6 +1851,12 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "stable_deref_trait"
version = "1.2.1"
@@ -1479,7 +1916,7 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"thiserror",
"thiserror 2.0.18",
"walkdir",
"yaml-rust",
]
@@ -1492,13 +1929,33 @@ dependencies = [
"serde_json",
]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl 1.0.69",
]
[[package]]
name = "thiserror"
version = "2.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4"
dependencies = [
"thiserror-impl",
"thiserror-impl 2.0.18",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
@@ -1605,6 +2062,19 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tools"
version = "0.1.0"
@@ -1631,6 +2101,7 @@ dependencies = [
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -1669,6 +2140,7 @@ version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"log",
"pin-project-lite",
"tracing-core",
]
@@ -1843,6 +2315,19 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "wasm-streams"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.93"
@@ -1903,12 +2388,65 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.62.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-implement"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-interface"
version = "0.59.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-result"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
@@ -0,0 +1,18 @@
[package]
name = "channel-gateway-core"
version.workspace = true
edition.workspace = true
license.workspace = true
publish.workspace = true
[dependencies]
api = { path = "../api" }
plugins = { path = "../plugins" }
runtime = { path = "../runtime" }
serde = { version = "1", features = ["derive"] }
serde_json.workspace = true
tokio = { version = "1", features = ["fs", "rt-multi-thread", "sync", "time"] }
tools = { path = "../tools" }
[lints]
workspace = true
@@ -0,0 +1,22 @@
pub mod manifest;
pub mod protocol;
pub mod runtime_host;
pub mod unraid_template;
pub use runtime::PermissionMode;
pub use manifest::{
ChannelIdentity, DmKind, GatewayManifest, GatewaySettings, ManifestError, ProfileId,
ProfileRecord, WorkerDefaults, WorkerSpec,
};
pub use protocol::{
GeneratedFileDescriptor, InboundAttachment, TurnSource, WorkerApprovalDecision,
WorkerStatusResponse, WorkerTurnAccepted, WorkerTurnEvent, WorkerTurnRequest,
};
pub use runtime_host::{
ApprovalDecision, ApprovalRequestPayload, ApprovalResponder, AttachmentKind, AttachmentRef,
HostError, RuntimeEvent, RuntimeHost, RuntimeHostConfig,
};
pub use unraid_template::{
ManagedTemplateRecord, UnraidTemplateConfigEntry, UnraidTemplateEntryType, UnraidTemplateSpec,
};
@@ -0,0 +1,474 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Display, Formatter};
use std::fs;
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
const MANIFEST_VERSION: u32 = 1;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ProfileId(String);
impl ProfileId {
#[must_use]
pub fn new(value: impl Into<String>) -> Self {
Self(value.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Display for ProfileId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DmKind {
Dm,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "channel")]
pub enum ChannelIdentity {
#[serde(rename = "telegram")]
TelegramDm { kind: DmKind, user_id: i64 },
}
impl ChannelIdentity {
#[must_use]
pub fn stable_key(&self) -> String {
match self {
Self::TelegramDm { user_id, .. } => format!("telegram:dm:{user_id}"),
}
}
#[must_use]
pub fn telegram_user_id(&self) -> Option<i64> {
match self {
Self::TelegramDm { user_id, .. } => Some(*user_id),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerSpec {
pub container_name: String,
pub host_state_dir: PathBuf,
pub host_workspace_dir: PathBuf,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProfileRecord {
pub profile_id: ProfileId,
#[serde(default)]
pub display_name: Option<String>,
pub worker: WorkerSpec,
#[serde(default)]
pub channels: Vec<ChannelIdentity>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct GatewaySettings {
pub worker_image: String,
pub worker_network: String,
pub template_dir: PathBuf,
pub template_file_prefix: String,
pub template_archive_dir: PathBuf,
#[serde(default)]
pub inherited_env: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerDefaults {
pub bind_port: u16,
pub default_cwd: PathBuf,
pub permission_mode: String,
pub model: String,
pub host_state_root: PathBuf,
pub host_workspace_root: PathBuf,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct GatewayManifest {
pub version: u32,
pub gateway: GatewaySettings,
pub worker_defaults: WorkerDefaults,
pub profiles: Vec<ProfileRecord>,
}
impl GatewayManifest {
pub fn load(path: impl AsRef<Path>) -> Result<Self, ManifestError> {
let path = path.as_ref();
let raw = fs::read_to_string(path)?;
let manifest = serde_json::from_str::<Self>(&raw)?;
manifest.validate()?;
Ok(manifest)
}
pub fn save_atomic(&self, path: impl AsRef<Path>) -> Result<(), ManifestError> {
self.validate()?;
let path = path.as_ref();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let temp_path = path.with_extension("json.tmp");
let payload = serde_json::to_string_pretty(self)?;
fs::write(&temp_path, format!("{payload}\n"))?;
fs::rename(temp_path, path)?;
Ok(())
}
pub fn validate(&self) -> Result<(), ManifestError> {
if self.version != MANIFEST_VERSION {
return Err(ManifestError::Invalid(format!(
"unsupported manifest version {}; expected {MANIFEST_VERSION}",
self.version
)));
}
if self.gateway.worker_image.trim().is_empty() {
return Err(ManifestError::Invalid(
"gateway.worker_image must not be empty".to_string(),
));
}
if self.gateway.worker_network.trim().is_empty() {
return Err(ManifestError::Invalid(
"gateway.worker_network must not be empty".to_string(),
));
}
if self.gateway.template_file_prefix.trim().is_empty() {
return Err(ManifestError::Invalid(
"gateway.template_file_prefix must not be empty".to_string(),
));
}
if self.worker_defaults.bind_port == 0 {
return Err(ManifestError::Invalid(
"worker_defaults.bind_port must be a valid port".to_string(),
));
}
let mut profile_ids = BTreeSet::new();
let mut container_names = BTreeSet::new();
let mut state_dirs = BTreeSet::new();
let mut workspace_dirs = BTreeSet::new();
let mut channel_keys = BTreeSet::new();
for profile in &self.profiles {
if profile.profile_id.as_str().trim().is_empty() {
return Err(ManifestError::Invalid(
"profile_id must not be empty".to_string(),
));
}
if !profile_ids.insert(profile.profile_id.clone()) {
return Err(ManifestError::Invalid(format!(
"duplicate profile_id `{}`",
profile.profile_id
)));
}
if profile.worker.container_name.trim().is_empty() {
return Err(ManifestError::Invalid(format!(
"worker.container_name must not be empty for profile `{}`",
profile.profile_id
)));
}
if !container_names.insert(profile.worker.container_name.clone()) {
return Err(ManifestError::Invalid(format!(
"duplicate worker.container_name `{}`",
profile.worker.container_name
)));
}
if !state_dirs.insert(profile.worker.host_state_dir.clone()) {
return Err(ManifestError::Invalid(format!(
"duplicate worker.host_state_dir `{}`",
profile.worker.host_state_dir.display()
)));
}
if !workspace_dirs.insert(profile.worker.host_workspace_dir.clone()) {
return Err(ManifestError::Invalid(format!(
"duplicate worker.host_workspace_dir `{}`",
profile.worker.host_workspace_dir.display()
)));
}
for channel in &profile.channels {
let key = channel.stable_key();
if !channel_keys.insert(key.clone()) {
return Err(ManifestError::Invalid(format!(
"duplicate channel identity `{key}`"
)));
}
}
}
Ok(())
}
#[must_use]
pub fn resolve_profile_for_telegram_user(&self, user_id: i64) -> Option<&ProfileRecord> {
self.profiles.iter().find(|profile| {
profile
.channels
.iter()
.any(|channel| channel.telegram_user_id() == Some(user_id))
})
}
pub fn add_profile(
&mut self,
profile_id: ProfileId,
display_name: Option<String>,
telegram_user_id: Option<i64>,
) -> Result<&ProfileRecord, ManifestError> {
let worker = WorkerSpec {
container_name: format!("claw-worker-{}", profile_id.as_str()),
host_state_dir: self
.worker_defaults
.host_state_root
.join(profile_id.as_str())
.join("state"),
host_workspace_dir: self
.worker_defaults
.host_workspace_root
.join(profile_id.as_str())
.join("workspace"),
};
let mut channels = Vec::new();
if let Some(user_id) = telegram_user_id {
channels.push(ChannelIdentity::TelegramDm {
kind: DmKind::Dm,
user_id,
});
}
self.profiles.push(ProfileRecord {
profile_id,
display_name,
worker,
channels,
});
self.validate()?;
self.profiles
.last()
.ok_or_else(|| ManifestError::Invalid("profile insertion failed".to_string()))
}
pub fn remove_profile(&mut self, profile_id: &str) -> Result<ProfileRecord, ManifestError> {
let index = self
.profiles
.iter()
.position(|profile| profile.profile_id.as_str() == profile_id)
.ok_or_else(|| ManifestError::NotFound(format!("unknown profile `{profile_id}`")))?;
Ok(self.profiles.remove(index))
}
pub fn merge_profiles(
&mut self,
source_profile_id: &str,
target_profile_id: &str,
) -> Result<ProfileRecord, ManifestError> {
if source_profile_id == target_profile_id {
return Err(ManifestError::Invalid(
"source and target profile must differ".to_string(),
));
}
let source_index = self
.profiles
.iter()
.position(|profile| profile.profile_id.as_str() == source_profile_id)
.ok_or_else(|| {
ManifestError::NotFound(format!("unknown profile `{source_profile_id}`"))
})?;
let target_index = self
.profiles
.iter()
.position(|profile| profile.profile_id.as_str() == target_profile_id)
.ok_or_else(|| {
ManifestError::NotFound(format!("unknown profile `{target_profile_id}`"))
})?;
let source = self.profiles.remove(source_index);
let adjusted_target_index = if source_index < target_index {
target_index - 1
} else {
target_index
};
let target = self
.profiles
.get_mut(adjusted_target_index)
.ok_or_else(|| {
ManifestError::NotFound(format!("unknown profile `{target_profile_id}`"))
})?;
target.channels.extend(source.channels.clone());
self.validate()?;
Ok(source)
}
pub fn add_telegram_channel(
&mut self,
profile_id: &str,
user_id: i64,
) -> Result<(), ManifestError> {
let profile = self
.profiles
.iter_mut()
.find(|profile| profile.profile_id.as_str() == profile_id)
.ok_or_else(|| ManifestError::NotFound(format!("unknown profile `{profile_id}`")))?;
profile.channels.push(ChannelIdentity::TelegramDm {
kind: DmKind::Dm,
user_id,
});
self.validate()
}
pub fn remove_telegram_channel(&mut self, user_id: i64) -> Result<(), ManifestError> {
for profile in &mut self.profiles {
let original_len = profile.channels.len();
profile
.channels
.retain(|channel| channel.telegram_user_id() != Some(user_id));
if profile.channels.len() != original_len {
self.validate()?;
return Ok(());
}
}
Err(ManifestError::NotFound(format!(
"unknown Telegram user id `{user_id}`"
)))
}
#[must_use]
pub fn required_env_vars(&self) -> BTreeSet<String> {
self.gateway.inherited_env.iter().cloned().collect()
}
#[must_use]
pub fn profile_map(&self) -> BTreeMap<ProfileId, &ProfileRecord> {
self.profiles
.iter()
.map(|profile| (profile.profile_id.clone(), profile))
.collect()
}
}
#[derive(Debug)]
pub enum ManifestError {
Io(std::io::Error),
Json(serde_json::Error),
Invalid(String),
NotFound(String),
}
impl Display for ManifestError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(error) => write!(f, "{error}"),
Self::Json(error) => write!(f, "{error}"),
Self::Invalid(message) => write!(f, "{message}"),
Self::NotFound(message) => write!(f, "{message}"),
}
}
}
impl std::error::Error for ManifestError {}
impl From<std::io::Error> for ManifestError {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
impl From<serde_json::Error> for ManifestError {
fn from(value: serde_json::Error) -> Self {
Self::Json(value)
}
}
#[cfg(test)]
mod tests {
use super::{
ChannelIdentity, DmKind, GatewayManifest, GatewaySettings, ManifestError, ProfileId,
ProfileRecord, WorkerDefaults, WorkerSpec,
};
fn sample_manifest() -> GatewayManifest {
GatewayManifest {
version: 1,
gateway: GatewaySettings {
worker_image: "registry/worker:latest".to_string(),
worker_network: "claw_gateway".to_string(),
template_dir: "/templates".into(),
template_file_prefix: "claw-worker-".to_string(),
template_archive_dir: "/archive".into(),
inherited_env: vec!["ANTHROPIC_API_KEY".to_string()],
},
worker_defaults: WorkerDefaults {
bind_port: 8080,
default_cwd: "/workspace".into(),
permission_mode: "workspace-write".to_string(),
model: "claude-opus-4-6".to_string(),
host_state_root: "/state-root".into(),
host_workspace_root: "/workspace-root".into(),
},
profiles: vec![ProfileRecord {
profile_id: ProfileId::new("makar"),
display_name: Some("Makar".to_string()),
worker: WorkerSpec {
container_name: "claw-worker-makar".to_string(),
host_state_dir: "/state-root/makar/state".into(),
host_workspace_dir: "/workspace-root/makar/workspace".into(),
},
channels: vec![ChannelIdentity::TelegramDm {
kind: DmKind::Dm,
user_id: 239824268,
}],
}],
}
}
#[test]
fn validate_rejects_duplicate_telegram_users() {
let mut manifest = sample_manifest();
manifest.profiles.push(ProfileRecord {
profile_id: ProfileId::new("other"),
display_name: None,
worker: WorkerSpec {
container_name: "claw-worker-other".to_string(),
host_state_dir: "/state-root/other/state".into(),
host_workspace_dir: "/workspace-root/other/workspace".into(),
},
channels: vec![ChannelIdentity::TelegramDm {
kind: DmKind::Dm,
user_id: 239824268,
}],
});
assert!(matches!(
manifest.validate(),
Err(ManifestError::Invalid(message)) if message.contains("duplicate channel identity")
));
}
#[test]
fn resolve_profile_for_telegram_user_returns_match() {
let manifest = sample_manifest();
let profile = manifest
.resolve_profile_for_telegram_user(239824268)
.expect("profile should resolve");
assert_eq!(profile.profile_id.as_str(), "makar");
}
#[test]
fn add_profile_derives_worker_paths() {
let mut manifest = sample_manifest();
let profile = manifest
.add_profile(ProfileId::new("new-user"), None, Some(99))
.expect("profile should add");
assert_eq!(profile.worker.container_name, "claw-worker-new-user");
assert!(profile.worker.host_state_dir.ends_with("new-user/state"));
}
}
@@ -0,0 +1,98 @@
use serde::{Deserialize, Serialize};
use crate::runtime_host::{ApprovalRequestPayload, AttachmentKind};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TurnSource {
pub channel: String,
pub sender_id: String,
#[serde(default)]
pub chat_id: Option<String>,
#[serde(default)]
pub display_name: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct InboundAttachment {
pub file_name: String,
pub kind: AttachmentKind,
#[serde(default)]
pub media_type: Option<String>,
pub data_base64: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerTurnRequest {
pub prompt: String,
pub source: TurnSource,
#[serde(default)]
pub attachments: Vec<InboundAttachment>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerTurnAccepted {
pub turn_id: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct GeneratedFileDescriptor {
pub file_id: String,
pub file_name: String,
#[serde(default)]
pub media_type: Option<String>,
pub size_bytes: u64,
pub is_image: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WorkerTurnEvent {
AssistantTextDelta {
delta: String,
},
ToolUse {
id: String,
name: String,
input: String,
},
ToolResult {
tool_use_id: String,
tool_name: String,
output: String,
is_error: bool,
},
ApprovalRequested {
request: ApprovalRequestPayload,
},
AutoCompaction {
removed_message_count: usize,
},
Completed {
final_text: String,
iterations: usize,
input_tokens: u32,
output_tokens: u32,
generated_files: Vec<GeneratedFileDescriptor>,
},
Failed {
message: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "decision", rename_all = "snake_case")]
pub enum WorkerApprovalDecision {
ApproveOnce,
Deny { reason: String },
CancelTurn,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerStatusResponse {
pub profile_id: String,
pub message_count: usize,
pub model: String,
pub permission_mode: String,
pub default_cwd: String,
pub busy: bool,
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,262 @@
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter, Write as _};
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum UnraidTemplateEntryType {
Port,
Path,
Variable,
Label,
Device,
}
impl Display for UnraidTemplateEntryType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Port => write!(f, "Port"),
Self::Path => write!(f, "Path"),
Self::Variable => write!(f, "Variable"),
Self::Label => write!(f, "Label"),
Self::Device => write!(f, "Device"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UnraidTemplateConfigEntry {
pub name: String,
pub target: String,
pub default_value: String,
pub mode: String,
pub description: String,
pub entry_type: UnraidTemplateEntryType,
pub display: String,
pub required: bool,
pub mask: bool,
pub value: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UnraidTemplateSpec {
pub name: String,
pub repository: String,
pub registry: String,
pub network: String,
pub shell: String,
pub privileged: bool,
pub support: String,
pub project: String,
pub overview: String,
pub category: String,
pub template_url: String,
pub icon: String,
pub extra_params: String,
pub post_args: String,
pub cpu_set: String,
pub requires: String,
pub web_ui: Option<String>,
pub config_entries: Vec<UnraidTemplateConfigEntry>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ManagedTemplateRecord {
pub profile_id: String,
pub file_path: PathBuf,
}
impl UnraidTemplateSpec {
#[must_use]
pub fn render_xml(&self) -> String {
let mut xml = String::new();
let date_installed = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs())
.unwrap_or(0);
writeln!(&mut xml, "<?xml version=\"1.0\"?>").expect("xml write should succeed");
writeln!(&mut xml, "<Container version=\"2\">").expect("xml write should succeed");
write_tag(&mut xml, "Name", &self.name);
write_tag(&mut xml, "Repository", &self.repository);
write_tag(&mut xml, "Registry", &self.registry);
write_tag(&mut xml, "Network", &self.network);
write_tag(&mut xml, "MyIP", "");
write_tag(&mut xml, "Shell", &self.shell);
write_tag(
&mut xml,
"Privileged",
if self.privileged { "true" } else { "false" },
);
write_tag(&mut xml, "Support", &self.support);
write_tag(&mut xml, "Project", &self.project);
write_tag(&mut xml, "Overview", &self.overview);
write_tag(&mut xml, "Category", &self.category);
write_tag(&mut xml, "WebUI", self.web_ui.as_deref().unwrap_or(""));
write_tag(&mut xml, "TemplateURL", &self.template_url);
write_tag(&mut xml, "Icon", &self.icon);
write_tag(&mut xml, "ExtraParams", &self.extra_params);
write_tag(&mut xml, "PostArgs", &self.post_args);
write_tag(&mut xml, "CPUset", &self.cpu_set);
write_tag(&mut xml, "DateInstalled", &date_installed.to_string());
write_tag(&mut xml, "DonateText", "");
write_tag(&mut xml, "DonateLink", "");
write_tag(&mut xml, "Requires", &self.requires);
for entry in &self.config_entries {
writeln!(
&mut xml,
" <Config Name=\"{}\" Target=\"{}\" Default=\"{}\" Mode=\"{}\" Description=\"{}\" Type=\"{}\" Display=\"{}\" Required=\"{}\" Mask=\"{}\">{}</Config>",
escape_attr(&entry.name),
escape_attr(&entry.target),
escape_attr(&entry.default_value),
escape_attr(&entry.mode),
escape_attr(&entry.description),
entry.entry_type,
escape_attr(&entry.display),
if entry.required { "true" } else { "false" },
if entry.mask { "true" } else { "false" },
escape_text(&entry.value),
)
.expect("xml write should succeed");
}
write_tag(&mut xml, "TailscaleStateDir", "");
xml.push_str("</Container>\n");
xml
}
#[must_use]
pub fn worker_template(
name: &str,
repository: &str,
network: &str,
host_state_dir: &str,
host_workspace_dir: &str,
env_values: &BTreeMap<String, String>,
labels: &BTreeMap<String, String>,
) -> Self {
let mut config_entries = vec![
UnraidTemplateConfigEntry {
name: "State".to_string(),
target: "/state".to_string(),
default_value: host_state_dir.to_string(),
mode: "rw".to_string(),
description: "Profile state directory".to_string(),
entry_type: UnraidTemplateEntryType::Path,
display: "always".to_string(),
required: true,
mask: false,
value: host_state_dir.to_string(),
},
UnraidTemplateConfigEntry {
name: "Workspace".to_string(),
target: "/workspace".to_string(),
default_value: host_workspace_dir.to_string(),
mode: "rw".to_string(),
description: "Profile workspace directory".to_string(),
entry_type: UnraidTemplateEntryType::Path,
display: "always".to_string(),
required: true,
mask: false,
value: host_workspace_dir.to_string(),
},
];
for (key, value) in env_values {
config_entries.push(UnraidTemplateConfigEntry {
name: key.clone(),
target: key.clone(),
default_value: value.clone(),
mode: String::new(),
description: format!("Injected environment variable `{key}`"),
entry_type: UnraidTemplateEntryType::Variable,
display: "advanced".to_string(),
required: true,
mask: key.contains("TOKEN") || key.contains("KEY") || key.contains("SECRET"),
value: value.clone(),
});
}
for (key, value) in labels {
config_entries.push(UnraidTemplateConfigEntry {
name: key.clone(),
target: key.clone(),
default_value: value.clone(),
mode: String::new(),
description: format!("Managed label `{key}`"),
entry_type: UnraidTemplateEntryType::Label,
display: "advanced-hide".to_string(),
required: false,
mask: false,
value: value.clone(),
});
}
Self {
name: name.to_string(),
repository: repository.to_string(),
registry: String::new(),
network: network.to_string(),
shell: "sh".to_string(),
privileged: false,
support: "https://git.wylab.me/wylab/claw-code-parity".to_string(),
project: "claw-code-parity".to_string(),
overview: "Managed worker for a single Claw profile.".to_string(),
category: "AI:Tools".to_string(),
template_url: String::new(),
icon: String::new(),
extra_params: String::new(),
post_args: String::new(),
cpu_set: String::new(),
requires: String::new(),
web_ui: None,
config_entries,
}
}
}
fn write_tag(buffer: &mut String, tag: &str, value: &str) {
writeln!(buffer, " <{tag}>{}</{tag}>", escape_text(value)).expect("xml write should succeed");
}
fn escape_attr(value: &str) -> String {
value
.replace('&', "&amp;")
.replace('"', "&quot;")
.replace('<', "&lt;")
.replace('>', "&gt;")
}
fn escape_text(value: &str) -> String {
value
.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use super::UnraidTemplateSpec;
#[test]
fn render_xml_includes_masked_secret_variables() {
let xml = UnraidTemplateSpec::worker_template(
"worker",
"registry/worker:latest",
"claw_gateway",
"/mnt/user/appdata/worker/state",
"/mnt/user/appdata/worker/workspace",
&BTreeMap::from([
("CLAW_WORKER_AUTH_TOKEN".to_string(), "secret".to_string()),
("CLAW_WORKER_PROFILE_ID".to_string(), "makar".to_string()),
]),
&BTreeMap::from([("ai.claw.profile_id".to_string(), "makar".to_string())]),
)
.render_xml();
assert!(xml.contains("<Container version=\"2\">"));
assert!(xml.contains("Type=\"Variable\""));
assert!(xml.contains("Mask=\"true\">secret</Config>"));
assert!(xml.contains("Target=\"/state\""));
}
}
@@ -0,0 +1,22 @@
[package]
name = "claw-profile-worker"
version.workspace = true
edition.workspace = true
license.workspace = true
publish.workspace = true
[dependencies]
async-stream = "0.3"
axum = { version = "0.7", features = ["multipart"] }
base64 = "0.22"
channel-gateway-core = { path = "../channel-gateway-core" }
futures-core = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json.workspace = true
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread", "signal", "sync", "time", "net"] }
[dev-dependencies]
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
[lints]
workspace = true
@@ -0,0 +1,143 @@
use std::env;
use std::fmt::{Display, Formatter};
use std::path::PathBuf;
use channel_gateway_core::PermissionMode;
const DEFAULT_BIND_ADDR: &str = "0.0.0.0:8080";
const DEFAULT_STATE_ROOT: &str = "/state";
const DEFAULT_DEFAULT_CWD: &str = "/workspace";
const DEFAULT_MODEL: &str = "claude-opus-4-6";
#[derive(Clone, PartialEq, Eq)]
pub struct WorkerConfig {
pub bind_addr: String,
pub auth_token: String,
pub profile_id: String,
pub state_root: PathBuf,
pub default_cwd: PathBuf,
pub model: String,
pub permission_mode: PermissionMode,
}
impl std::fmt::Debug for WorkerConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WorkerConfig")
.field("bind_addr", &self.bind_addr)
.field("profile_id", &self.profile_id)
.field("state_root", &self.state_root)
.field("default_cwd", &self.default_cwd)
.field("model", &self.model)
.field("permission_mode", &self.permission_mode.as_str())
.finish()
}
}
impl WorkerConfig {
pub fn from_env() -> Result<Self, ConfigError> {
Self::from_iter(env::vars())
}
pub fn from_iter(
vars: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Result<Self, ConfigError> {
let vars = vars
.into_iter()
.map(|(key, value)| (key.into(), value.into()))
.collect::<std::collections::BTreeMap<String, String>>();
Ok(Self {
bind_addr: vars
.get("CLAW_WORKER_BIND_ADDR")
.cloned()
.unwrap_or_else(|| DEFAULT_BIND_ADDR.to_string()),
auth_token: required_var(&vars, "CLAW_WORKER_AUTH_TOKEN")?,
profile_id: required_var(&vars, "CLAW_WORKER_PROFILE_ID")?,
state_root: vars
.get("CLAW_WORKER_STATE_ROOT")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from(DEFAULT_STATE_ROOT)),
default_cwd: vars
.get("CLAW_WORKER_DEFAULT_CWD")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from(DEFAULT_DEFAULT_CWD)),
model: vars
.get("CLAW_WORKER_MODEL")
.cloned()
.unwrap_or_else(|| DEFAULT_MODEL.to_string()),
permission_mode: parse_permission_mode(
vars.get("CLAW_WORKER_PERMISSION_MODE")
.map(String::as_str)
.unwrap_or("workspace-write"),
)?,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConfigError {
MissingVar(String),
InvalidVar { key: String, message: String },
}
impl Display for ConfigError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingVar(key) => write!(f, "missing required environment variable `{key}`"),
Self::InvalidVar { key, message } => {
write!(f, "invalid value for `{key}`: {message}")
}
}
}
}
impl std::error::Error for ConfigError {}
fn required_var(
vars: &std::collections::BTreeMap<String, String>,
key: &str,
) -> Result<String, ConfigError> {
vars.get(key)
.cloned()
.filter(|value| !value.trim().is_empty())
.ok_or_else(|| ConfigError::MissingVar(key.to_string()))
}
fn parse_permission_mode(value: &str) -> Result<PermissionMode, ConfigError> {
match value.trim() {
"read-only" => Ok(PermissionMode::ReadOnly),
"workspace-write" => Ok(PermissionMode::WorkspaceWrite),
"danger-full-access" => Ok(PermissionMode::DangerFullAccess),
"prompt" => Ok(PermissionMode::Prompt),
"allow" => Ok(PermissionMode::Allow),
other => Err(ConfigError::InvalidVar {
key: "CLAW_WORKER_PERMISSION_MODE".to_string(),
message: format!(
"`{other}` is unsupported; use read-only, workspace-write, danger-full-access, prompt, or allow"
),
}),
}
}
#[cfg(test)]
mod tests {
use super::WorkerConfig;
#[test]
fn config_requires_profile_and_token() {
let error = WorkerConfig::from_iter(std::iter::empty::<(&str, &str)>())
.expect_err("config should fail");
assert!(error.to_string().contains("CLAW_WORKER_AUTH_TOKEN"));
}
#[test]
fn config_parses_defaults() {
let config = WorkerConfig::from_iter([
("CLAW_WORKER_AUTH_TOKEN", "secret"),
("CLAW_WORKER_PROFILE_ID", "makar"),
])
.expect("config should parse");
assert_eq!(config.bind_addr, "0.0.0.0:8080");
assert_eq!(config.profile_id, "makar");
assert_eq!(config.model, "claude-opus-4-6");
}
}
@@ -0,0 +1,45 @@
mod config;
mod server;
use std::env;
use std::io::{self, Write};
use config::WorkerConfig;
#[tokio::main(flavor = "multi_thread")]
async fn main() {
if let Err(error) = run().await {
eprintln!("error: {error}");
std::process::exit(1);
}
}
async fn run() -> Result<(), Box<dyn std::error::Error>> {
let args = env::args().skip(1).collect::<Vec<_>>();
match args.first().map(String::as_str) {
Some("serve") => {
let config = WorkerConfig::from_env()?;
server::serve(config).await?;
}
Some("--help") | Some("-h") | None => print_help(&mut io::stdout())?,
Some(other) => return Err(format!("unsupported command `{other}`").into()),
}
Ok(())
}
fn print_help(out: &mut impl Write) -> io::Result<()> {
writeln!(out, "claw-profile-worker")?;
writeln!(out)?;
writeln!(out, "Usage:")?;
writeln!(out, " claw-profile-worker serve")?;
writeln!(out)?;
writeln!(out, "Environment:")?;
writeln!(out, " CLAW_WORKER_AUTH_TOKEN")?;
writeln!(out, " CLAW_WORKER_PROFILE_ID")?;
writeln!(out, " CLAW_WORKER_STATE_ROOT")?;
writeln!(out, " CLAW_WORKER_BIND_ADDR")?;
writeln!(out, " CLAW_WORKER_DEFAULT_CWD")?;
writeln!(out, " CLAW_WORKER_MODEL")?;
writeln!(out, " CLAW_WORKER_PERMISSION_MODE")?;
Ok(())
}
@@ -0,0 +1,807 @@
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use async_stream::stream;
use axum::extract::{Path as AxumPath, State};
use axum::http::{header, HeaderMap, StatusCode};
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::{Json, Router};
use base64::Engine as _;
use channel_gateway_core::{
ApprovalDecision, ApprovalResponder, AttachmentRef, GeneratedFileDescriptor, HostError,
RuntimeEvent, RuntimeHost, RuntimeHostConfig, WorkerApprovalDecision, WorkerStatusResponse,
WorkerTurnAccepted, WorkerTurnEvent, WorkerTurnRequest,
};
use serde::Deserialize;
use tokio::net::TcpListener;
use tokio::sync::{broadcast, mpsc, Mutex as AsyncMutex};
use crate::config::WorkerConfig;
const MAX_COMPLETED_TURNS: usize = 16;
pub async fn serve(config: WorkerConfig) -> Result<(), ServerError> {
let runtime = Arc::new(RealWorkerRuntime::new(&config));
let listener = TcpListener::bind(&config.bind_addr).await?;
let app = app_router(config, runtime);
axum::serve(listener, app).await.map_err(ServerError::Io)
}
fn app_router(config: WorkerConfig, runtime: Arc<dyn WorkerRuntime>) -> Router {
let state = AppState::new(config, runtime);
Router::new()
.route("/healthz", get(health))
.route("/v1/status", get(status))
.route("/v1/session/reset", post(reset_session))
.route("/v1/turns", post(post_turn))
.route("/v1/turns/:turn_id/events", get(stream_events))
.route("/v1/turns/:turn_id/approval", post(post_approval))
.route("/v1/turns/:turn_id/cancel", post(cancel_turn))
.route("/v1/turns/:turn_id/files/:file_id", get(get_file))
.with_state(state)
}
struct AppState {
config: WorkerConfig,
runtime: Arc<dyn WorkerRuntime>,
active_turn: AsyncMutex<Option<Arc<TurnState>>>,
completed_turns: AsyncMutex<BTreeMap<String, Arc<TurnState>>>,
}
impl AppState {
fn new(config: WorkerConfig, runtime: Arc<dyn WorkerRuntime>) -> Arc<Self> {
Arc::new(Self {
config,
runtime,
active_turn: AsyncMutex::new(None),
completed_turns: AsyncMutex::new(BTreeMap::new()),
})
}
async fn find_turn(&self, turn_id: &str) -> Option<Arc<TurnState>> {
if let Some(active) = self.active_turn.lock().await.clone() {
if active.turn_id == turn_id {
return Some(active);
}
}
self.completed_turns.lock().await.get(turn_id).cloned()
}
async fn finalize_turn(&self, turn: Arc<TurnState>) {
let mut active = self.active_turn.lock().await;
if active
.as_ref()
.is_some_and(|current| current.turn_id == turn.turn_id)
{
*active = None;
}
drop(active);
let mut completed = self.completed_turns.lock().await;
completed.insert(turn.turn_id.clone(), turn);
while completed.len() > MAX_COMPLETED_TURNS {
let oldest = completed.keys().next().cloned();
if let Some(oldest) = oldest {
completed.remove(&oldest);
}
}
}
}
struct TurnState {
turn_id: String,
history: AsyncMutex<Vec<WorkerTurnEvent>>,
sender: broadcast::Sender<WorkerTurnEvent>,
cancel_flag: Arc<AtomicBool>,
approvals: Mutex<BTreeMap<String, ApprovalResponder>>,
generated_files: AsyncMutex<BTreeMap<String, GeneratedFileRecord>>,
}
impl TurnState {
fn new(turn_id: String) -> Arc<Self> {
let (sender, _) = broadcast::channel(128);
Arc::new(Self {
turn_id,
history: AsyncMutex::new(Vec::new()),
sender,
cancel_flag: Arc::new(AtomicBool::new(false)),
approvals: Mutex::new(BTreeMap::new()),
generated_files: AsyncMutex::new(BTreeMap::new()),
})
}
async fn push_event(&self, event: WorkerTurnEvent) {
self.history.lock().await.push(event.clone());
let _ = self.sender.send(event);
}
async fn event_history(&self) -> Vec<WorkerTurnEvent> {
self.history.lock().await.clone()
}
}
struct GeneratedFileRecord {
descriptor: GeneratedFileDescriptor,
path: PathBuf,
}
trait WorkerRuntime: Send + Sync {
fn run_turn(
&self,
session_path: &Path,
prompt: String,
attachments: Vec<AttachmentRef>,
cancel_flag: Arc<AtomicBool>,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
) -> Result<(), HostError>;
fn session_message_count(&self, session_path: &Path) -> Result<usize, HostError>;
fn reset_session(&self, session_path: &Path, archive_dir: &Path) -> Result<(), HostError>;
}
struct RealWorkerRuntime {
host: RuntimeHost,
}
impl RealWorkerRuntime {
fn new(config: &WorkerConfig) -> Self {
Self {
host: RuntimeHost::new(RuntimeHostConfig {
cwd: config.default_cwd.clone(),
model: config.model.clone(),
permission_mode: config.permission_mode,
approval_timeout: std::time::Duration::from_secs(300),
allowed_tools: None,
}),
}
}
}
impl WorkerRuntime for RealWorkerRuntime {
fn run_turn(
&self,
session_path: &Path,
prompt: String,
attachments: Vec<AttachmentRef>,
cancel_flag: Arc<AtomicBool>,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
) -> Result<(), HostError> {
self.host
.run_turn(session_path, prompt, attachments, cancel_flag, event_tx)
}
fn session_message_count(&self, session_path: &Path) -> Result<usize, HostError> {
self.host.session_message_count(session_path)
}
fn reset_session(&self, session_path: &Path, archive_dir: &Path) -> Result<(), HostError> {
self.host
.reset_session(session_path, archive_dir)
.map(|_| ())
}
}
#[derive(Debug, Deserialize)]
struct ApprovalRequest {
approval_id: String,
decision: WorkerApprovalDecision,
}
async fn health() -> impl IntoResponse {
(StatusCode::OK, "ok")
}
async fn status(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
) -> Result<Json<WorkerStatusResponse>, StatusCode> {
authorize(&headers, &state.config.auth_token)?;
let session_path = session_path(&state.config);
let message_count = state
.runtime
.session_message_count(&session_path)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let busy = state.active_turn.lock().await.is_some();
Ok(Json(WorkerStatusResponse {
profile_id: state.config.profile_id.clone(),
message_count,
model: state.config.model.clone(),
permission_mode: state.config.permission_mode.as_str().to_string(),
default_cwd: state.config.default_cwd.display().to_string(),
busy,
}))
}
async fn reset_session(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
) -> Result<StatusCode, StatusCode> {
authorize(&headers, &state.config.auth_token)?;
if state.active_turn.lock().await.is_some() {
return Err(StatusCode::CONFLICT);
}
let session_path = session_path(&state.config);
let archive_dir = state.config.state_root.join("archive");
tokio::fs::create_dir_all(&archive_dir)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
state
.runtime
.reset_session(&session_path, &archive_dir)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(StatusCode::ACCEPTED)
}
async fn post_turn(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Json(request): Json<WorkerTurnRequest>,
) -> Result<(StatusCode, Json<WorkerTurnAccepted>), StatusCode> {
authorize(&headers, &state.config.auth_token)?;
let mut active_turn = state.active_turn.lock().await;
if active_turn.is_some() {
return Err(StatusCode::CONFLICT);
}
let turn_id = next_turn_id();
let attachments = persist_attachments(&state.config, &turn_id, &request.attachments)
.await
.map_err(|_| StatusCode::BAD_REQUEST)?;
let turn_state = TurnState::new(turn_id.clone());
*active_turn = Some(turn_state.clone());
drop(active_turn);
let session_path = session_path(&state.config);
let (runtime_tx, mut runtime_rx) = mpsc::unbounded_channel();
let runtime = state.runtime.clone();
let cancel_flag = turn_state.cancel_flag.clone();
let prompt = request.prompt.clone();
tokio::task::spawn_blocking(move || {
if let Err(error) = runtime.run_turn(
&session_path,
prompt,
attachments,
cancel_flag,
runtime_tx.clone(),
) {
let _ = runtime_tx.send(RuntimeEvent::Failed {
message: error.to_string(),
});
}
});
let state_for_events = state.clone();
let turn_state_for_events = turn_state.clone();
tokio::spawn(async move {
while let Some(event) = runtime_rx.recv().await {
match event {
RuntimeEvent::AssistantTextDelta(delta) => {
turn_state_for_events
.push_event(WorkerTurnEvent::AssistantTextDelta { delta })
.await;
}
RuntimeEvent::ToolUse { id, name, input } => {
turn_state_for_events
.push_event(WorkerTurnEvent::ToolUse { id, name, input })
.await;
}
RuntimeEvent::ToolResult {
tool_use_id,
tool_name,
output,
is_error,
} => {
turn_state_for_events
.push_event(WorkerTurnEvent::ToolResult {
tool_use_id,
tool_name,
output,
is_error,
})
.await;
}
RuntimeEvent::ApprovalRequested { request, responder } => {
turn_state_for_events
.approvals
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(request.approval_id.clone(), responder);
turn_state_for_events
.push_event(WorkerTurnEvent::ApprovalRequested { request })
.await;
}
RuntimeEvent::AutoCompaction {
removed_message_count,
} => {
turn_state_for_events
.push_event(WorkerTurnEvent::AutoCompaction {
removed_message_count,
})
.await;
}
RuntimeEvent::Completed {
final_text,
iterations,
input_tokens,
output_tokens,
generated_files,
} => {
let descriptors =
build_generated_file_records(&turn_state_for_events, &generated_files)
.await;
turn_state_for_events
.push_event(WorkerTurnEvent::Completed {
final_text,
iterations,
input_tokens,
output_tokens,
generated_files: descriptors,
})
.await;
state_for_events
.finalize_turn(turn_state_for_events.clone())
.await;
return;
}
RuntimeEvent::Failed { message } => {
turn_state_for_events
.push_event(WorkerTurnEvent::Failed { message })
.await;
state_for_events
.finalize_turn(turn_state_for_events.clone())
.await;
return;
}
}
}
state_for_events
.finalize_turn(turn_state_for_events.clone())
.await;
});
Ok((StatusCode::ACCEPTED, Json(WorkerTurnAccepted { turn_id })))
}
async fn stream_events(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
AxumPath(turn_id): AxumPath<String>,
) -> Result<
Sse<impl futures_core::Stream<Item = Result<Event, std::convert::Infallible>>>,
StatusCode,
> {
authorize(&headers, &state.config.auth_token)?;
let turn = state
.find_turn(&turn_id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
let history = turn.event_history().await;
let mut receiver = turn.sender.subscribe();
Ok(Sse::new(stream! {
for event in history {
let data = serde_json::to_string(&event).expect("event should serialize");
yield Ok(Event::default().data(data));
}
loop {
match receiver.recv().await {
Ok(event) => {
let data = serde_json::to_string(&event).expect("event should serialize");
yield Ok(Event::default().data(data));
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
})
.keep_alive(KeepAlive::default()))
}
async fn post_approval(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
AxumPath(turn_id): AxumPath<String>,
Json(request): Json<ApprovalRequest>,
) -> Result<StatusCode, StatusCode> {
authorize(&headers, &state.config.auth_token)?;
let turn = state
.find_turn(&turn_id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
let responder = turn
.approvals
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.remove(&request.approval_id)
.ok_or(StatusCode::NOT_FOUND)?;
responder
.send(map_approval_decision(request.decision))
.map_err(|_| StatusCode::GONE)?;
Ok(StatusCode::ACCEPTED)
}
async fn cancel_turn(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
AxumPath(turn_id): AxumPath<String>,
) -> Result<StatusCode, StatusCode> {
authorize(&headers, &state.config.auth_token)?;
let turn = state
.find_turn(&turn_id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
turn.cancel_flag.store(true, Ordering::SeqCst);
let pending = {
let mut approvals = turn
.approvals
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
std::mem::take(&mut *approvals)
.into_values()
.collect::<Vec<ApprovalResponder>>()
};
for responder in pending {
let _ = responder.send(ApprovalDecision::CancelTurn);
}
Ok(StatusCode::ACCEPTED)
}
async fn get_file(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
AxumPath((turn_id, file_id)): AxumPath<(String, String)>,
) -> Result<impl IntoResponse, StatusCode> {
authorize(&headers, &state.config.auth_token)?;
let turn = state
.find_turn(&turn_id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
let files = turn.generated_files.lock().await;
let file = files.get(&file_id).ok_or(StatusCode::NOT_FOUND)?;
let bytes = tokio::fs::read(&file.path)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let content_type = file
.descriptor
.media_type
.as_deref()
.unwrap_or("application/octet-stream");
Ok((
StatusCode::OK,
[
(header::CONTENT_TYPE, content_type.to_string()),
(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", file.descriptor.file_name),
),
],
bytes,
))
}
async fn persist_attachments(
config: &WorkerConfig,
turn_id: &str,
attachments: &[channel_gateway_core::InboundAttachment],
) -> Result<Vec<AttachmentRef>, ServerError> {
let inbound_dir = config.state_root.join("inbound").join(turn_id);
tokio::fs::create_dir_all(&inbound_dir).await?;
let mut persisted = Vec::new();
for (index, attachment) in attachments.iter().enumerate() {
let file_name = sanitize_file_name(&attachment.file_name);
let destination = inbound_dir.join(format!("{index}-{file_name}"));
let bytes = base64::engine::general_purpose::STANDARD
.decode(&attachment.data_base64)
.map_err(ServerError::Base64)?;
tokio::fs::write(&destination, &bytes).await?;
persisted.push(AttachmentRef {
path: destination,
kind: attachment.kind,
original_name: Some(attachment.file_name.clone()),
file_size_bytes: bytes.len() as u64,
});
}
Ok(persisted)
}
async fn build_generated_file_records(
turn: &Arc<TurnState>,
generated_files: &[PathBuf],
) -> Vec<GeneratedFileDescriptor> {
let mut descriptors = Vec::new();
let mut records = turn.generated_files.lock().await;
for (index, path) in generated_files.iter().enumerate() {
let metadata = match tokio::fs::metadata(path).await {
Ok(metadata) => metadata,
Err(_) => continue,
};
if !metadata.is_file() {
continue;
}
let file_name = path
.file_name()
.and_then(|value| value.to_str())
.unwrap_or("generated.bin")
.to_string();
let descriptor = GeneratedFileDescriptor {
file_id: format!("generated-{index}"),
file_name,
media_type: media_type_for_path(path),
size_bytes: metadata.len(),
is_image: is_image_path(path),
};
records.insert(
descriptor.file_id.clone(),
GeneratedFileRecord {
descriptor: descriptor.clone(),
path: path.clone(),
},
);
descriptors.push(descriptor);
}
descriptors
}
fn session_path(config: &WorkerConfig) -> PathBuf {
config.state_root.join("session.jsonl")
}
fn authorize(headers: &HeaderMap, expected_token: &str) -> Result<(), StatusCode> {
let Some(header_value) = headers.get(header::AUTHORIZATION) else {
return Err(StatusCode::UNAUTHORIZED);
};
let value = header_value
.to_str()
.map_err(|_| StatusCode::UNAUTHORIZED)?;
let expected = format!("Bearer {expected_token}");
if value == expected {
Ok(())
} else {
Err(StatusCode::UNAUTHORIZED)
}
}
fn map_approval_decision(decision: WorkerApprovalDecision) -> ApprovalDecision {
match decision {
WorkerApprovalDecision::ApproveOnce => ApprovalDecision::ApproveOnce,
WorkerApprovalDecision::Deny { reason } => ApprovalDecision::Deny { reason },
WorkerApprovalDecision::CancelTurn => ApprovalDecision::CancelTurn,
}
}
fn next_turn_id() -> String {
static COUNTER: AtomicU64 = AtomicU64::new(1);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or(0);
format!("turn-{now}-{}", COUNTER.fetch_add(1, Ordering::SeqCst))
}
fn sanitize_file_name(file_name: &str) -> String {
let cleaned = file_name
.chars()
.map(|ch| match ch {
'/' | '\\' | ':' | '\0' => '-',
value => value,
})
.collect::<String>();
if cleaned.trim().is_empty() {
"attachment.bin".to_string()
} else {
cleaned
}
}
fn media_type_for_path(path: &Path) -> Option<String> {
let extension = path.extension()?.to_str()?.to_ascii_lowercase();
let media_type = match extension.as_str() {
"png" => "image/png",
"jpg" | "jpeg" => "image/jpeg",
"gif" => "image/gif",
"webp" => "image/webp",
"txt" | "md" | "log" => "text/plain",
"json" => "application/json",
"pdf" => "application/pdf",
_ => return None,
};
Some(media_type.to_string())
}
fn is_image_path(path: &Path) -> bool {
path.extension()
.and_then(|value| value.to_str())
.is_some_and(|extension| {
matches!(
extension.to_ascii_lowercase().as_str(),
"png" | "jpg" | "jpeg" | "gif" | "webp"
)
})
}
#[derive(Debug)]
pub enum ServerError {
Io(std::io::Error),
Host(HostError),
Base64(base64::DecodeError),
}
impl Display for ServerError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(error) => write!(f, "{error}"),
Self::Host(error) => write!(f, "{error}"),
Self::Base64(error) => write!(f, "{error}"),
}
}
}
impl std::error::Error for ServerError {}
impl From<std::io::Error> for ServerError {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
impl From<HostError> for ServerError {
fn from(value: HostError) -> Self {
Self::Host(value)
}
}
#[cfg(test)]
mod tests {
use super::*;
use channel_gateway_core::{RuntimeEvent, TurnSource};
#[derive(Clone)]
struct MockRuntime {
events: Arc<Vec<RuntimeEvent>>,
}
impl WorkerRuntime for MockRuntime {
fn run_turn(
&self,
_session_path: &Path,
_prompt: String,
_attachments: Vec<AttachmentRef>,
_cancel_flag: Arc<AtomicBool>,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
) -> Result<(), HostError> {
for event in self.events.iter().cloned() {
let _ = event_tx.send(event);
}
Ok(())
}
fn session_message_count(&self, _session_path: &Path) -> Result<usize, HostError> {
Ok(3)
}
fn reset_session(
&self,
_session_path: &Path,
_archive_dir: &Path,
) -> Result<(), HostError> {
Ok(())
}
}
async fn spawn_test_server(
runtime: Arc<dyn WorkerRuntime>,
) -> (tokio::task::JoinHandle<()>, String) {
let config = WorkerConfig::from_iter([
("CLAW_WORKER_AUTH_TOKEN", "secret"),
("CLAW_WORKER_PROFILE_ID", "makar"),
(
"CLAW_WORKER_STATE_ROOT",
&std::env::temp_dir()
.join(format!("claw-profile-worker-test-{}", next_turn_id()))
.display()
.to_string(),
),
])
.expect("config should parse");
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("listener should bind");
let addr = listener.local_addr().expect("addr should exist");
let app = app_router(config, runtime);
let handle = tokio::spawn(async move {
axum::serve(listener, app).await.expect("server should run");
});
(handle, format!("http://{addr}"))
}
#[tokio::test]
async fn status_requires_authentication() {
let (handle, base_url) = spawn_test_server(Arc::new(MockRuntime {
events: Arc::new(Vec::new()),
}))
.await;
let response = reqwest::Client::new()
.get(format!("{base_url}/v1/status"))
.send()
.await
.expect("request should complete");
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
handle.abort();
}
#[tokio::test]
async fn turn_stream_emits_completion_event() {
let runtime = Arc::new(MockRuntime {
events: Arc::new(vec![RuntimeEvent::Completed {
final_text: "done".to_string(),
iterations: 1,
input_tokens: 2,
output_tokens: 3,
generated_files: Vec::new(),
}]),
});
let (handle, base_url) = spawn_test_server(runtime).await;
let client = reqwest::Client::new();
let accepted = client
.post(format!("{base_url}/v1/turns"))
.bearer_auth("secret")
.json(&WorkerTurnRequest {
prompt: "hello".to_string(),
source: TurnSource {
channel: "telegram".to_string(),
sender_id: "239824268".to_string(),
chat_id: Some("239824268".to_string()),
display_name: Some("Makar".to_string()),
},
attachments: Vec::new(),
})
.send()
.await
.expect("turn should post");
assert_eq!(accepted.status(), StatusCode::ACCEPTED);
let turn = accepted
.json::<WorkerTurnAccepted>()
.await
.expect("accepted payload should parse");
let mut response = client
.get(format!("{base_url}/v1/turns/{}/events", turn.turn_id))
.bearer_auth("secret")
.send()
.await
.expect("events should stream");
let chunk = tokio::time::timeout(std::time::Duration::from_secs(2), response.chunk())
.await
.expect("stream should yield a chunk")
.expect("chunk should read")
.expect("chunk should exist");
let body = String::from_utf8(chunk.to_vec()).expect("chunk should be valid UTF-8");
assert!(body.contains("\"type\":\"completed\""));
assert!(body.contains("\"final_text\":\"done\""));
handle.abort();
}
#[test]
fn sanitize_file_name_replaces_path_separators() {
assert_eq!(sanitize_file_name("../weird/name.txt"), "..-weird-name.txt");
}
#[test]
fn approval_mapping_preserves_reason() {
let decision = map_approval_decision(WorkerApprovalDecision::Deny {
reason: "no".to_string(),
});
assert_eq!(
decision,
ApprovalDecision::Deny {
reason: "no".to_string()
}
);
}
}
+6 -1
View File
@@ -6,9 +6,14 @@ license.workspace = true
publish.workspace = true
[dependencies]
bollard = "0.17"
channel-gateway-core = { path = "../channel-gateway-core" }
futures-util = "0.3"
sha2 = "0.10"
api = { path = "../api" }
base64 = "0.22"
plugins = { path = "../plugins" }
reqwest = { version = "0.12", default-features = false, features = ["json", "multipart", "rustls-tls"] }
reqwest = { version = "0.12", default-features = false, features = ["json", "multipart", "rustls-tls", "stream"] }
runtime = { path = "../runtime" }
serde = { version = "1", features = ["derive"] }
serde_json.workspace = true
@@ -0,0 +1,36 @@
<?xml version="1.0"?>
<Container version="2">
<Name>claw-telegram-gateway</Name>
<Repository>git.wylab.me/wylab/claw-code-parity:latest</Repository>
<Registry></Registry>
<Network>claw_gateway</Network>
<MyIP></MyIP>
<Shell>sh</Shell>
<Privileged>false</Privileged>
<Support>https://git.wylab.me/wylab/claw-code-parity</Support>
<Project>claw-code-parity</Project>
<Overview>Telegram gateway with Docker-socket worker management.</Overview>
<Category>AI:Tools</Category>
<WebUI></WebUI>
<TemplateURL></TemplateURL>
<Icon></Icon>
<ExtraParams></ExtraParams>
<PostArgs></PostArgs>
<CPUset></CPUset>
<DateInstalled>0</DateInstalled>
<DonateText></DonateText>
<DonateLink></DonateLink>
<Requires></Requires>
<Config Name="Docker Socket" Target="/var/run/docker.sock" Default="/var/run/docker.sock" Mode="rw" Description="Docker socket for worker orchestration" Type="Path" Display="always" Required="true" Mask="false">/var/run/docker.sock</Config>
<Config Name="Unraid Templates" Target="/unraid/templates-user" Default="/boot/config/plugins/dockerMan/templates-user" Mode="rw" Description="Unraid template directory" Type="Path" Display="always" Required="true" Mask="false">/boot/config/plugins/dockerMan/templates-user</Config>
<Config Name="Gateway AppData" Target="/appdata" Default="/mnt/user/appdata/claw-telegram-gateway" Mode="rw" Description="Gateway state and manifest directory" Type="Path" Display="always" Required="true" Mask="false">/mnt/user/appdata/claw-telegram-gateway</Config>
<Config Name="CLAW_GATEWAY_TELEGRAM_BOT_TOKEN" Target="CLAW_GATEWAY_TELEGRAM_BOT_TOKEN" Default="" Mode="" Description="Telegram bot token" Type="Variable" Display="always" Required="true" Mask="true"></Config>
<Config Name="CLAW_WORKER_AUTH_TOKEN" Target="CLAW_WORKER_AUTH_TOKEN" Default="" Mode="" Description="Shared bearer token for worker API calls" Type="Variable" Display="always" Required="true" Mask="true"></Config>
<Config Name="CLAW_GATEWAY_WORKER_IMAGE" Target="CLAW_GATEWAY_WORKER_IMAGE" Default="git.wylab.me/wylab/claw-code-parity:latest" Mode="" Description="Worker image pulled by the gateway for per-profile containers" Type="Variable" Display="always" Required="true" Mask="false">git.wylab.me/wylab/claw-code-parity:latest</Config>
<Config Name="CLAW_GATEWAY_MANIFEST" Target="CLAW_GATEWAY_MANIFEST" Default="/appdata/profiles.json" Mode="" Description="Manifest path for channel identities and worker layout" Type="Variable" Display="advanced" Required="true" Mask="false">/appdata/profiles.json</Config>
<Config Name="CLAW_GATEWAY_TEMPLATE_ARCHIVE_DIR" Target="CLAW_GATEWAY_TEMPLATE_ARCHIVE_DIR" Default="/appdata/template-archive" Mode="" Description="Archive location for removed worker templates" Type="Variable" Display="advanced" Required="true" Mask="false">/appdata/template-archive</Config>
<Config Name="CLAW_GATEWAY_WORKER_NETWORK" Target="CLAW_GATEWAY_WORKER_NETWORK" Default="claw_gateway" Mode="" Description="Docker network name shared by the gateway and workers" Type="Variable" Display="advanced" Required="true" Mask="false">claw_gateway</Config>
<Config Name="CLAW_GATEWAY_INHERITED_ENV" Target="CLAW_GATEWAY_INHERITED_ENV" Default="ANTHROPIC_API_KEY" Mode="" Description="Comma-separated env vars copied from gateway into worker containers" Type="Variable" Display="advanced" Required="false" Mask="false">ANTHROPIC_API_KEY</Config>
<Config Name="ANTHROPIC_API_KEY" Target="ANTHROPIC_API_KEY" Default="" Mode="" Description="Anthropic API key inherited by worker containers when enabled" Type="Variable" Display="advanced" Required="false" Mask="true"></Config>
<TailscaleStateDir></TailscaleStateDir>
</Container>
+219 -1
View File
@@ -6,10 +6,23 @@ use std::path::PathBuf;
use runtime::PermissionMode;
const DEFAULT_BOT_TOKEN_ENV: &str = "CLAW_TELEGRAM_BOT_TOKEN";
const DEFAULT_GATEWAY_BOT_TOKEN_ENV: &str = "CLAW_GATEWAY_TELEGRAM_BOT_TOKEN";
const DEFAULT_MODEL: &str = "claude-opus-4-6";
const DEFAULT_MAX_UPLOAD_MB: u64 = 20;
const DEFAULT_APPROVAL_TIMEOUT_SECS: u64 = 300;
const DEFAULT_POLL_TIMEOUT_SECS: u64 = 30;
const DEFAULT_GATEWAY_STATE_ROOT: &str = "/appdata/state";
const DEFAULT_GATEWAY_MANIFEST_PATH: &str = "/appdata/profiles.json";
const DEFAULT_DOCKER_SOCKET: &str = "/var/run/docker.sock";
const DEFAULT_GATEWAY_TEMPLATE_DIR: &str = "/unraid/templates-user";
const DEFAULT_GATEWAY_TEMPLATE_ARCHIVE_DIR: &str = "/appdata/template-archive";
const DEFAULT_GATEWAY_TEMPLATE_FILE_PREFIX: &str = "claw-worker-";
const DEFAULT_GATEWAY_WORKER_NETWORK: &str = "claw_gateway";
const DEFAULT_GATEWAY_WORKER_BIND_PORT: u16 = 8080;
const DEFAULT_GATEWAY_WORKER_DEFAULT_CWD: &str = "/workspace";
const DEFAULT_GATEWAY_WORKER_PERMISSION_MODE: &str = "workspace-write";
const DEFAULT_GATEWAY_WORKER_HOST_STATE_ROOT: &str = "/mnt/user/appdata/claw-workers";
const DEFAULT_GATEWAY_WORKER_HOST_WORKSPACE_ROOT: &str = "/mnt/user/appdata/claw-workers";
#[derive(Clone, PartialEq, Eq)]
pub struct TelegramBotConfig {
@@ -113,6 +126,155 @@ impl TelegramBotConfig {
}
}
#[derive(Clone, PartialEq, Eq)]
pub struct GatewayConfig {
pub bot_token_env: String,
pub state_root: PathBuf,
pub manifest_path: PathBuf,
pub docker_socket: PathBuf,
pub poll_timeout_secs: u64,
pub max_upload_mb: u64,
pub worker_auth_token: String,
pub worker_image: Option<String>,
pub worker_network: String,
pub template_dir: PathBuf,
pub template_archive_dir: PathBuf,
pub template_file_prefix: String,
pub worker_bind_port: u16,
pub worker_default_cwd: PathBuf,
pub worker_model: String,
pub worker_permission_mode: PermissionMode,
pub worker_host_state_root: PathBuf,
pub worker_host_workspace_root: PathBuf,
pub inherited_env: Vec<String>,
bot_token: String,
}
impl std::fmt::Debug for GatewayConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GatewayConfig")
.field("bot_token_env", &self.bot_token_env)
.field("state_root", &self.state_root)
.field("manifest_path", &self.manifest_path)
.field("docker_socket", &self.docker_socket)
.field("poll_timeout_secs", &self.poll_timeout_secs)
.field("max_upload_mb", &self.max_upload_mb)
.field("worker_image", &self.worker_image)
.field("worker_network", &self.worker_network)
.field("template_dir", &self.template_dir)
.field("template_archive_dir", &self.template_archive_dir)
.field("template_file_prefix", &self.template_file_prefix)
.field("worker_bind_port", &self.worker_bind_port)
.field("worker_default_cwd", &self.worker_default_cwd)
.field("worker_model", &self.worker_model)
.field(
"worker_permission_mode",
&self.worker_permission_mode.as_str(),
)
.field("worker_host_state_root", &self.worker_host_state_root)
.field(
"worker_host_workspace_root",
&self.worker_host_workspace_root,
)
.field("inherited_env", &self.inherited_env)
.finish()
}
}
impl GatewayConfig {
pub fn from_env() -> Result<Self, ConfigError> {
Self::from_iter(env::vars())
}
pub fn from_iter(
vars: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Result<Self, ConfigError> {
let vars = vars
.into_iter()
.map(|(key, value)| (key.into(), value.into()))
.collect::<std::collections::BTreeMap<String, String>>();
let bot_token_env = vars
.get("CLAW_GATEWAY_TELEGRAM_BOT_TOKEN_ENV")
.cloned()
.unwrap_or_else(|| DEFAULT_GATEWAY_BOT_TOKEN_ENV.to_string());
let bot_token =
required_var_with_fallback(&vars, &bot_token_env, Some(DEFAULT_BOT_TOKEN_ENV))?;
Ok(Self {
bot_token_env,
state_root: optional_path(&vars, "CLAW_GATEWAY_STATE_ROOT")
.unwrap_or_else(|| PathBuf::from(DEFAULT_GATEWAY_STATE_ROOT)),
manifest_path: optional_path(&vars, "CLAW_GATEWAY_MANIFEST")
.unwrap_or_else(|| PathBuf::from(DEFAULT_GATEWAY_MANIFEST_PATH)),
docker_socket: optional_path(&vars, "CLAW_GATEWAY_DOCKER_SOCKET")
.unwrap_or_else(|| PathBuf::from(DEFAULT_DOCKER_SOCKET)),
poll_timeout_secs: parse_u64(
vars.get("CLAW_GATEWAY_POLL_TIMEOUT_SECS"),
DEFAULT_POLL_TIMEOUT_SECS,
"CLAW_GATEWAY_POLL_TIMEOUT_SECS",
)?,
max_upload_mb: parse_u64(
vars.get("CLAW_GATEWAY_MAX_UPLOAD_MB"),
DEFAULT_MAX_UPLOAD_MB,
"CLAW_GATEWAY_MAX_UPLOAD_MB",
)?,
worker_auth_token: required_var(&vars, "CLAW_WORKER_AUTH_TOKEN")?,
worker_image: vars
.get("CLAW_GATEWAY_WORKER_IMAGE")
.cloned()
.filter(|value| !value.trim().is_empty()),
worker_network: vars
.get("CLAW_GATEWAY_WORKER_NETWORK")
.cloned()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| DEFAULT_GATEWAY_WORKER_NETWORK.to_string()),
template_dir: optional_path(&vars, "CLAW_GATEWAY_TEMPLATE_DIR")
.unwrap_or_else(|| PathBuf::from(DEFAULT_GATEWAY_TEMPLATE_DIR)),
template_archive_dir: optional_path(&vars, "CLAW_GATEWAY_TEMPLATE_ARCHIVE_DIR")
.unwrap_or_else(|| PathBuf::from(DEFAULT_GATEWAY_TEMPLATE_ARCHIVE_DIR)),
template_file_prefix: vars
.get("CLAW_GATEWAY_TEMPLATE_FILE_PREFIX")
.cloned()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| DEFAULT_GATEWAY_TEMPLATE_FILE_PREFIX.to_string()),
worker_bind_port: parse_u16(
vars.get("CLAW_GATEWAY_WORKER_BIND_PORT"),
DEFAULT_GATEWAY_WORKER_BIND_PORT,
"CLAW_GATEWAY_WORKER_BIND_PORT",
)?,
worker_default_cwd: optional_path(&vars, "CLAW_GATEWAY_WORKER_DEFAULT_CWD")
.unwrap_or_else(|| PathBuf::from(DEFAULT_GATEWAY_WORKER_DEFAULT_CWD)),
worker_model: vars
.get("CLAW_GATEWAY_WORKER_MODEL")
.cloned()
.unwrap_or_else(|| DEFAULT_MODEL.to_string()),
worker_permission_mode: parse_permission_mode(
vars.get("CLAW_GATEWAY_WORKER_PERMISSION_MODE")
.map(String::as_str)
.unwrap_or(DEFAULT_GATEWAY_WORKER_PERMISSION_MODE),
)?,
worker_host_state_root: optional_path(&vars, "CLAW_GATEWAY_WORKER_HOST_STATE_ROOT")
.unwrap_or_else(|| PathBuf::from(DEFAULT_GATEWAY_WORKER_HOST_STATE_ROOT)),
worker_host_workspace_root: optional_path(
&vars,
"CLAW_GATEWAY_WORKER_HOST_WORKSPACE_ROOT",
)
.unwrap_or_else(|| PathBuf::from(DEFAULT_GATEWAY_WORKER_HOST_WORKSPACE_ROOT)),
inherited_env: parse_csv_list(
vars.get("CLAW_GATEWAY_INHERITED_ENV").map(String::as_str),
),
bot_token,
})
}
pub fn bot_token(&self) -> &str {
&self.bot_token
}
pub fn max_upload_bytes(&self) -> u64 {
self.max_upload_mb.saturating_mul(1024 * 1024)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConfigError {
MissingVar(String),
@@ -144,6 +306,18 @@ fn required_var(
.ok_or_else(|| ConfigError::MissingVar(key.to_string()))
}
fn required_var_with_fallback(
vars: &std::collections::BTreeMap<String, String>,
key: &str,
fallback_key: Option<&str>,
) -> Result<String, ConfigError> {
required_var(vars, key).or_else(|_| {
fallback_key
.map(|fallback| required_var(vars, fallback))
.unwrap_or_else(|| Err(ConfigError::MissingVar(key.to_string())))
})
}
fn optional_path(vars: &std::collections::BTreeMap<String, String>, key: &str) -> Option<PathBuf> {
vars.get(key)
.map(String::as_str)
@@ -194,6 +368,29 @@ fn parse_u64(value: Option<&String>, default: u64, key: &str) -> Result<u64, Con
}
}
fn parse_u16(value: Option<&String>, default: u16, key: &str) -> Result<u16, ConfigError> {
match value
.map(String::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
{
Some(value) => value.parse::<u16>().map_err(|_| ConfigError::InvalidVar {
key: key.to_string(),
message: format!("`{value}` is not an unsigned integer"),
}),
None => Ok(default),
}
}
fn parse_csv_list(value: Option<&str>) -> Vec<String> {
value
.unwrap_or("")
.split(|ch: char| ch == ',' || ch.is_whitespace())
.filter(|token| !token.trim().is_empty())
.map(ToString::to_string)
.collect()
}
fn parse_permission_mode(value: &str) -> Result<PermissionMode, ConfigError> {
match value.trim() {
"read-only" => Ok(PermissionMode::ReadOnly),
@@ -212,7 +409,7 @@ fn parse_permission_mode(value: &str) -> Result<PermissionMode, ConfigError> {
#[cfg(test)]
mod tests {
use super::{ConfigError, TelegramBotConfig};
use super::{ConfigError, GatewayConfig, TelegramBotConfig};
#[test]
fn config_parses_required_values() {
@@ -236,4 +433,25 @@ mod tests {
ConfigError::MissingVar("CLAW_TELEGRAM_ALLOWED_USER_IDS".to_string())
);
}
#[test]
fn gateway_config_uses_gateway_defaults() {
let config = GatewayConfig::from_iter([
("CLAW_GATEWAY_TELEGRAM_BOT_TOKEN", "secret"),
("CLAW_WORKER_AUTH_TOKEN", "worker-secret"),
])
.expect("gateway config should parse");
assert_eq!(config.bot_token(), "secret");
assert_eq!(
config.manifest_path,
std::path::PathBuf::from("/appdata/profiles.json")
);
assert_eq!(config.worker_network, "claw_gateway");
assert_eq!(
config.template_dir,
std::path::PathBuf::from("/unraid/templates-user")
);
assert_eq!(config.worker_bind_port, 8080);
assert_eq!(config.max_upload_bytes(), 20 * 1024 * 1024);
}
}
@@ -0,0 +1,522 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt::{Display, Formatter};
use std::path::PathBuf;
use std::time::Duration;
use bollard::container::{
Config as ContainerConfig, CreateContainerOptions, InspectContainerOptions,
ListContainersOptions, RemoveContainerOptions, StartContainerOptions, StopContainerOptions,
};
use bollard::errors::Error as BollardError;
use bollard::models::{ContainerInspectResponse, HostConfig, Mount, MountTypeEnum};
use bollard::network::{CreateNetworkOptions, ListNetworksOptions};
use bollard::Docker;
use channel_gateway_core::{
GatewayManifest, ManagedTemplateRecord, ProfileRecord, UnraidTemplateSpec,
};
use serde::Serialize;
use sha2::{Digest, Sha256};
use crate::config::GatewayConfig;
use crate::unraid_template_manager::{TemplateError, UnraidTemplateManager};
use crate::worker_client::{WorkerClient, WorkerClientError};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ManagedContainerRecord {
pub profile_id: String,
pub container_name: String,
pub base_url: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ReconcileResult {
pub ensured_containers: Vec<ManagedContainerRecord>,
pub archived_templates: Vec<ManagedTemplateRecord>,
pub removed_profiles: Vec<String>,
}
#[derive(Clone)]
pub struct DockerWorkerManager {
docker: Docker,
gateway_config: GatewayConfig,
}
impl DockerWorkerManager {
pub fn new(gateway_config: GatewayConfig) -> Result<Self, WorkerManagerError> {
let docker = if gateway_config.docker_socket == PathBuf::from("/var/run/docker.sock") {
Docker::connect_with_unix_defaults().map_err(WorkerManagerError::Docker)?
} else {
Docker::connect_with_unix(
gateway_config.docker_socket.to_str().ok_or_else(|| {
WorkerManagerError::Invalid("docker socket path is not valid UTF-8".to_string())
})?,
120,
bollard::API_DEFAULT_VERSION,
)
.map_err(WorkerManagerError::Docker)?
};
Ok(Self {
docker,
gateway_config,
})
}
pub async fn reconcile_manifest(
&self,
manifest: &GatewayManifest,
) -> Result<ReconcileResult, WorkerManagerError> {
self.ensure_required_env(manifest)?;
self.ensure_network(&manifest.gateway.worker_network)
.await?;
let template_manager = UnraidTemplateManager::new(
manifest.gateway.template_dir.clone(),
manifest.gateway.template_archive_dir.clone(),
manifest.gateway.template_file_prefix.clone(),
);
let mut result = ReconcileResult::default();
let mut active_profiles = BTreeSet::new();
for profile in &manifest.profiles {
active_profiles.insert(profile.profile_id.as_str().to_string());
let env_values = self.build_worker_env(manifest, profile)?;
let labels = self.build_worker_labels(manifest, profile);
std::fs::create_dir_all(&profile.worker.host_state_dir)?;
std::fs::create_dir_all(&profile.worker.host_workspace_dir)?;
let template = UnraidTemplateSpec::worker_template(
&profile.worker.container_name,
&manifest.gateway.worker_image,
&manifest.gateway.worker_network,
&profile.worker.host_state_dir.display().to_string(),
&profile.worker.host_workspace_dir.display().to_string(),
&env_values,
&labels,
);
template_manager.write_worker_template(profile.profile_id.as_str(), &template)?;
let record = self
.ensure_profile_worker_inner(manifest, profile, &env_values, &labels)
.await?;
result.ensured_containers.push(record);
}
result.archived_templates = template_manager.archive_removed_templates(&active_profiles)?;
result.removed_profiles = self.remove_deleted_containers(&active_profiles).await?;
Ok(result)
}
pub async fn ensure_profile_worker(
&self,
manifest: &GatewayManifest,
profile: &ProfileRecord,
) -> Result<ManagedContainerRecord, WorkerManagerError> {
self.ensure_required_env(manifest)?;
let env_values = self.build_worker_env(manifest, profile)?;
let labels = self.build_worker_labels(manifest, profile);
self.ensure_profile_worker_inner(manifest, profile, &env_values, &labels)
.await
}
async fn ensure_profile_worker_inner(
&self,
manifest: &GatewayManifest,
profile: &ProfileRecord,
env_values: &BTreeMap<String, String>,
labels: &BTreeMap<String, String>,
) -> Result<ManagedContainerRecord, WorkerManagerError> {
let spec_hash = self.spec_hash(manifest, profile, env_values)?;
let desired_base_url = format!(
"http://{}:{}",
profile.worker.container_name, manifest.worker_defaults.bind_port
);
match self
.inspect_container(&profile.worker.container_name)
.await?
{
Some(existing) => {
let existing_hash = existing
.config
.as_ref()
.and_then(|config| config.labels.as_ref())
.and_then(|labels| labels.get("ai.claw.spec_hash"))
.cloned();
if existing_hash.as_deref() != Some(spec_hash.as_str()) {
self.remove_container(&profile.worker.container_name)
.await?;
self.create_and_start_container(
manifest, profile, env_values, labels, &spec_hash,
)
.await?;
} else if !existing
.state
.as_ref()
.and_then(|state| state.running)
.unwrap_or(false)
{
self.docker
.start_container(
&profile.worker.container_name,
None::<StartContainerOptions<String>>,
)
.await
.map_err(WorkerManagerError::Docker)?;
}
}
None => {
self.create_and_start_container(manifest, profile, env_values, labels, &spec_hash)
.await?;
}
}
self.wait_for_worker_ready(&desired_base_url).await?;
Ok(ManagedContainerRecord {
profile_id: profile.profile_id.as_str().to_string(),
container_name: profile.worker.container_name.clone(),
base_url: desired_base_url,
})
}
fn ensure_required_env(&self, manifest: &GatewayManifest) -> Result<(), WorkerManagerError> {
for env_key in manifest.required_env_vars() {
if std::env::var(&env_key)
.ok()
.filter(|value| !value.trim().is_empty())
.is_none()
{
return Err(WorkerManagerError::Invalid(format!(
"required inherited env var `{env_key}` is not set"
)));
}
}
Ok(())
}
fn build_worker_env(
&self,
manifest: &GatewayManifest,
profile: &ProfileRecord,
) -> Result<BTreeMap<String, String>, WorkerManagerError> {
let mut env_values = BTreeMap::new();
env_values.insert(
"CLAW_WORKER_PROFILE_ID".to_string(),
profile.profile_id.as_str().to_string(),
);
env_values.insert("CLAW_WORKER_STATE_ROOT".to_string(), "/state".to_string());
env_values.insert(
"CLAW_WORKER_BIND_ADDR".to_string(),
format!("0.0.0.0:{}", manifest.worker_defaults.bind_port),
);
env_values.insert(
"CLAW_WORKER_DEFAULT_CWD".to_string(),
manifest.worker_defaults.default_cwd.display().to_string(),
);
env_values.insert(
"CLAW_WORKER_MODEL".to_string(),
manifest.worker_defaults.model.clone(),
);
env_values.insert(
"CLAW_WORKER_PERMISSION_MODE".to_string(),
manifest.worker_defaults.permission_mode.clone(),
);
env_values.insert(
"CLAW_WORKER_AUTH_TOKEN".to_string(),
self.gateway_config.worker_auth_token.clone(),
);
for env_key in &manifest.gateway.inherited_env {
let value = std::env::var(env_key).map_err(|_| {
WorkerManagerError::Invalid(format!(
"required inherited env var `{env_key}` is not set"
))
})?;
env_values.insert(env_key.clone(), value);
}
Ok(env_values)
}
fn build_worker_labels(
&self,
manifest: &GatewayManifest,
profile: &ProfileRecord,
) -> BTreeMap<String, String> {
let template_manager = UnraidTemplateManager::new(
manifest.gateway.template_dir.clone(),
manifest.gateway.template_archive_dir.clone(),
manifest.gateway.template_file_prefix.clone(),
);
BTreeMap::from([
("ai.claw.managed".to_string(), "true".to_string()),
(
"ai.claw.profile_id".to_string(),
profile.profile_id.as_str().to_string(),
),
("ai.claw.gateway".to_string(), "telegram".to_string()),
(
"ai.claw.template_path".to_string(),
template_manager
.template_path(profile.profile_id.as_str())
.display()
.to_string(),
),
])
}
fn spec_hash(
&self,
manifest: &GatewayManifest,
profile: &ProfileRecord,
env_values: &BTreeMap<String, String>,
) -> Result<String, WorkerManagerError> {
#[derive(Serialize)]
struct WorkerSpecHashInput<'a> {
image: &'a str,
network: &'a str,
container_name: &'a str,
state_dir: String,
workspace_dir: String,
bind_port: u16,
env_values: &'a BTreeMap<String, String>,
}
let encoded = serde_json::to_vec(&WorkerSpecHashInput {
image: &manifest.gateway.worker_image,
network: &manifest.gateway.worker_network,
container_name: &profile.worker.container_name,
state_dir: profile.worker.host_state_dir.display().to_string(),
workspace_dir: profile.worker.host_workspace_dir.display().to_string(),
bind_port: manifest.worker_defaults.bind_port,
env_values,
})?;
let hash = Sha256::digest(encoded);
Ok(format!("{hash:x}"))
}
async fn ensure_network(&self, network_name: &str) -> Result<(), WorkerManagerError> {
let mut filters = HashMap::new();
filters.insert("name".to_string(), vec![network_name.to_string()]);
let existing = self
.docker
.list_networks(Some(ListNetworksOptions { filters }))
.await
.map_err(WorkerManagerError::Docker)?;
if existing.is_empty() {
self.docker
.create_network(CreateNetworkOptions {
name: network_name.to_string(),
check_duplicate: true,
driver: "bridge".to_string(),
..Default::default()
})
.await
.map_err(WorkerManagerError::Docker)?;
}
Ok(())
}
async fn inspect_container(
&self,
container_name: &str,
) -> Result<Option<ContainerInspectResponse>, WorkerManagerError> {
match self
.docker
.inspect_container(container_name, None::<InspectContainerOptions>)
.await
{
Ok(container) => Ok(Some(container)),
Err(error) if is_not_found(&error) => Ok(None),
Err(error) => Err(WorkerManagerError::Docker(error)),
}
}
async fn create_and_start_container(
&self,
manifest: &GatewayManifest,
profile: &ProfileRecord,
env_values: &BTreeMap<String, String>,
labels: &BTreeMap<String, String>,
spec_hash: &str,
) -> Result<(), WorkerManagerError> {
let mut labels = labels.clone();
labels.insert("ai.claw.spec_hash".to_string(), spec_hash.to_string());
let mounts = vec![
Mount {
target: Some("/state".to_string()),
source: Some(profile.worker.host_state_dir.display().to_string()),
typ: Some(MountTypeEnum::BIND),
read_only: Some(false),
..Default::default()
},
Mount {
target: Some("/workspace".to_string()),
source: Some(profile.worker.host_workspace_dir.display().to_string()),
typ: Some(MountTypeEnum::BIND),
read_only: Some(false),
..Default::default()
},
];
let env = env_values
.iter()
.map(|(key, value)| format!("{key}={value}"))
.collect::<Vec<_>>();
self.docker
.create_container(
Some(CreateContainerOptions {
name: profile.worker.container_name.clone(),
platform: None,
}),
ContainerConfig {
image: Some(manifest.gateway.worker_image.clone()),
cmd: Some(vec!["claw-profile-worker".to_string(), "serve".to_string()]),
env: Some(env),
labels: Some(labels.into_iter().collect()),
host_config: Some(HostConfig {
mounts: Some(mounts),
network_mode: Some(manifest.gateway.worker_network.clone()),
..Default::default()
}),
..Default::default()
},
)
.await
.map_err(WorkerManagerError::Docker)?;
self.docker
.start_container(
&profile.worker.container_name,
None::<StartContainerOptions<String>>,
)
.await
.map_err(WorkerManagerError::Docker)?;
Ok(())
}
async fn wait_for_worker_ready(&self, base_url: &str) -> Result<(), WorkerManagerError> {
let client = WorkerClient::new(base_url, &self.gateway_config.worker_auth_token)?;
let mut attempts = 0;
loop {
attempts += 1;
match client.health().await {
Ok(()) => return Ok(()),
Err(error) if attempts < 30 => {
let _ = error;
tokio::time::sleep(Duration::from_millis(500)).await;
}
Err(error) => return Err(WorkerManagerError::WorkerClient(error)),
}
}
}
async fn remove_deleted_containers(
&self,
active_profile_ids: &BTreeSet<String>,
) -> Result<Vec<String>, WorkerManagerError> {
let mut filters = HashMap::new();
filters.insert(
"label".to_string(),
vec!["ai.claw.managed=true".to_string()],
);
let containers = self
.docker
.list_containers(Some(ListContainersOptions::<String> {
all: true,
filters,
..Default::default()
}))
.await
.map_err(WorkerManagerError::Docker)?;
let mut removed = Vec::new();
for container in containers {
let Some(labels) = container.labels.as_ref() else {
continue;
};
let Some(profile_id) = labels.get("ai.claw.profile_id") else {
continue;
};
if active_profile_ids.contains(profile_id) {
continue;
}
if let Some(names) = container.names.as_ref() {
for name in names {
let normalized = name.trim_start_matches('/');
self.remove_container(normalized).await?;
}
}
removed.push(profile_id.clone());
}
Ok(removed)
}
async fn remove_container(&self, container_name: &str) -> Result<(), WorkerManagerError> {
let _ = self
.docker
.stop_container(container_name, Some(StopContainerOptions { t: 5 }))
.await;
self.docker
.remove_container(
container_name,
Some(RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await
.map_err(WorkerManagerError::Docker)?;
Ok(())
}
}
fn is_not_found(error: &BollardError) -> bool {
matches!(
error,
BollardError::DockerResponseServerError {
status_code: 404,
..
}
)
}
#[derive(Debug)]
pub enum WorkerManagerError {
Docker(BollardError),
Invalid(String),
Template(TemplateError),
WorkerClient(WorkerClientError),
Io(std::io::Error),
Json(serde_json::Error),
}
impl Display for WorkerManagerError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Docker(error) => write!(f, "{error}"),
Self::Invalid(message) => write!(f, "{message}"),
Self::Template(error) => write!(f, "{error}"),
Self::WorkerClient(error) => write!(f, "{error}"),
Self::Io(error) => write!(f, "{error}"),
Self::Json(error) => write!(f, "{error}"),
}
}
}
impl std::error::Error for WorkerManagerError {}
impl From<TemplateError> for WorkerManagerError {
fn from(value: TemplateError) -> Self {
Self::Template(value)
}
}
impl From<WorkerClientError> for WorkerManagerError {
fn from(value: WorkerClientError) -> Self {
Self::WorkerClient(value)
}
}
impl From<std::io::Error> for WorkerManagerError {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
impl From<serde_json::Error> for WorkerManagerError {
fn from(value: serde_json::Error) -> Self {
Self::Json(value)
}
}
File diff suppressed because it is too large Load Diff
+338 -6
View File
@@ -1,14 +1,24 @@
mod bot;
mod config;
mod docker_worker_manager;
mod gateway;
mod registry;
mod runtime_host;
mod telegram_api;
mod unraid_template_manager;
mod worker_client;
use std::env;
use std::io::{self, Write};
use std::path::PathBuf;
use bot::TelegramBot;
use config::TelegramBotConfig;
use channel_gateway_core::ProfileId;
use config::{GatewayConfig, TelegramBotConfig};
use gateway::{
load_or_init_manifest, migrate_standalone_registry, reconcile_workers, save_manifest,
TelegramGateway,
};
#[tokio::main(flavor = "multi_thread")]
async fn main() {
@@ -21,10 +31,13 @@ async fn main() {
async fn run() -> Result<(), Box<dyn std::error::Error>> {
let args = env::args().skip(1).collect::<Vec<_>>();
match args.first().map(String::as_str) {
Some("serve") => {
let config = TelegramBotConfig::from_env()?;
let bot = TelegramBot::new(config)?;
bot.run().await?;
Some("serve") => serve_standalone().await?,
Some("standalone") => handle_standalone_command(&args[1..]).await?,
Some("gateway") => handle_gateway_command(&args[1..]).await?,
Some("workers") => handle_workers_command(&args[1..]).await?,
Some("profiles") => handle_profiles_command(&args[1..]).await?,
Some("migrate-standalone-registry") => {
handle_migrate_standalone_registry(&args[1..]).await?;
}
Some("--help") | Some("-h") | None => print_help(&mut io::stdout())?,
Some(other) => {
@@ -34,13 +47,311 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
async fn serve_standalone() -> Result<(), Box<dyn std::error::Error>> {
let config = TelegramBotConfig::from_env()?;
let bot = TelegramBot::new(config)?;
bot.run().await?;
Ok(())
}
async fn handle_standalone_command(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
match args.first().map(String::as_str) {
Some("serve") => serve_standalone().await?,
Some(other) => return Err(format!("unsupported standalone command `{other}`").into()),
None => return Err("missing standalone command".into()),
}
Ok(())
}
async fn handle_gateway_command(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
match args.first().map(String::as_str) {
Some("serve") => {
let config = GatewayConfig::from_env()?;
let gateway = TelegramGateway::new(config)?;
gateway.run().await?;
}
Some(other) => return Err(format!("unsupported gateway command `{other}`").into()),
None => return Err("missing gateway command".into()),
}
Ok(())
}
async fn handle_workers_command(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
match args.first().map(String::as_str) {
Some("reconcile") => {
let config = GatewayConfig::from_env()?;
let result = reconcile_workers(&config).await?;
print_reconcile_summary(&result, &mut io::stdout())?;
}
Some(other) => return Err(format!("unsupported workers command `{other}`").into()),
None => return Err("missing workers command".into()),
}
Ok(())
}
async fn handle_profiles_command(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
match args.first().map(String::as_str) {
Some("add") => handle_profiles_add(&args[1..]).await?,
Some("remove") => handle_profiles_remove(&args[1..]).await?,
Some("merge") => handle_profiles_merge(&args[1..]).await?,
Some("channel") => handle_profiles_channel(&args[1..]).await?,
Some(other) => return Err(format!("unsupported profiles command `{other}`").into()),
None => return Err("missing profiles command".into()),
}
Ok(())
}
async fn handle_profiles_add(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
let Some(profile_id) = args.first() else {
return Err("usage: claw-telegram profiles add <profile-id> [--user-id <telegram-user-id>] [--display-name <name>]".into());
};
let mut telegram_user_id = None;
let mut display_name = None;
let mut index = 1;
while index < args.len() {
match args[index].as_str() {
"--user-id" => {
let value = args
.get(index + 1)
.ok_or("missing value for --user-id")?
.parse::<i64>()?;
telegram_user_id = Some(value);
index += 2;
}
"--display-name" => {
let value = args
.get(index + 1)
.ok_or("missing value for --display-name")?;
display_name = Some(value.clone());
index += 2;
}
other => return Err(format!("unsupported option `{other}`").into()),
}
}
let config = GatewayConfig::from_env()?;
let mut manifest = load_or_init_manifest(&config)?;
let profile = manifest
.add_profile(
ProfileId::new(profile_id.clone()),
display_name,
telegram_user_id,
)?
.clone();
save_manifest(&config, &manifest)?;
let result = reconcile_workers(&config).await?;
writeln!(
io::stdout(),
"Added profile `{}` -> container `{}`",
profile.profile_id.as_str(),
profile.worker.container_name
)?;
print_reconcile_summary(&result, &mut io::stdout())?;
Ok(())
}
async fn handle_profiles_remove(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
let Some(profile_id) = args.first() else {
return Err("usage: claw-telegram profiles remove <profile-id>".into());
};
let config = GatewayConfig::from_env()?;
let mut manifest = load_or_init_manifest(&config)?;
let removed = manifest.remove_profile(profile_id)?;
save_manifest(&config, &manifest)?;
let result = reconcile_workers(&config).await?;
writeln!(
io::stdout(),
"Removed profile `{}` (state paths kept: {}, {})",
removed.profile_id.as_str(),
removed.worker.host_state_dir.display(),
removed.worker.host_workspace_dir.display()
)?;
print_reconcile_summary(&result, &mut io::stdout())?;
Ok(())
}
async fn handle_profiles_merge(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
let Some(source_profile_id) = args.first() else {
return Err(
"usage: claw-telegram profiles merge <source-profile-id> <target-profile-id>".into(),
);
};
let Some(target_profile_id) = args.get(1) else {
return Err(
"usage: claw-telegram profiles merge <source-profile-id> <target-profile-id>".into(),
);
};
let config = GatewayConfig::from_env()?;
let mut manifest = load_or_init_manifest(&config)?;
let removed = manifest.merge_profiles(source_profile_id, target_profile_id)?;
save_manifest(&config, &manifest)?;
let result = reconcile_workers(&config).await?;
writeln!(
io::stdout(),
"Merged profile `{}` into `{}`. Source state was preserved at {} and {}.",
removed.profile_id.as_str(),
target_profile_id,
removed.worker.host_state_dir.display(),
removed.worker.host_workspace_dir.display()
)?;
print_reconcile_summary(&result, &mut io::stdout())?;
Ok(())
}
async fn handle_profiles_channel(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
match args.first().map(String::as_str) {
Some("add") => handle_profiles_channel_add(&args[1..]).await?,
Some("remove") => handle_profiles_channel_remove(&args[1..]).await?,
Some(other) => return Err(format!("unsupported channel command `{other}`").into()),
None => return Err("missing channel command".into()),
}
Ok(())
}
async fn handle_profiles_channel_add(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
let Some(profile_id) = args.first() else {
return Err(
"usage: claw-telegram profiles channel add <profile-id> telegram <user-id>".into(),
);
};
let Some(channel_name) = args.get(1) else {
return Err(
"usage: claw-telegram profiles channel add <profile-id> telegram <user-id>".into(),
);
};
if channel_name != "telegram" {
return Err(format!("unsupported channel `{channel_name}`").into());
}
let user_id = args
.get(2)
.ok_or("usage: claw-telegram profiles channel add <profile-id> telegram <user-id>")?
.parse::<i64>()?;
let config = GatewayConfig::from_env()?;
let mut manifest = load_or_init_manifest(&config)?;
manifest.add_telegram_channel(profile_id, user_id)?;
save_manifest(&config, &manifest)?;
let result = reconcile_workers(&config).await?;
writeln!(
io::stdout(),
"Attached Telegram user `{user_id}` to profile `{profile_id}`"
)?;
print_reconcile_summary(&result, &mut io::stdout())?;
Ok(())
}
async fn handle_profiles_channel_remove(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
let Some(channel_name) = args.first() else {
return Err("usage: claw-telegram profiles channel remove telegram <user-id>".into());
};
if channel_name != "telegram" {
return Err(format!("unsupported channel `{channel_name}`").into());
}
let user_id = args
.get(1)
.ok_or("usage: claw-telegram profiles channel remove telegram <user-id>")?
.parse::<i64>()?;
let config = GatewayConfig::from_env()?;
let mut manifest = load_or_init_manifest(&config)?;
manifest.remove_telegram_channel(user_id)?;
save_manifest(&config, &manifest)?;
let result = reconcile_workers(&config).await?;
writeln!(io::stdout(), "Detached Telegram user `{user_id}`")?;
print_reconcile_summary(&result, &mut io::stdout())?;
Ok(())
}
async fn handle_migrate_standalone_registry(
args: &[String],
) -> Result<(), Box<dyn std::error::Error>> {
let config = GatewayConfig::from_env()?;
let source = args
.first()
.map(PathBuf::from)
.unwrap_or_else(default_standalone_registry_path);
let manifest = migrate_standalone_registry(&config, &source)?;
writeln!(
io::stdout(),
"Migrated {} profiles from {} into {}",
manifest.profiles.len(),
source.display(),
config.manifest_path.display()
)?;
let result = reconcile_workers(&config).await?;
print_reconcile_summary(&result, &mut io::stdout())?;
Ok(())
}
fn default_standalone_registry_path() -> PathBuf {
env::var_os("CLAW_TELEGRAM_STATE_ROOT")
.map(PathBuf::from)
.unwrap_or_else(|| {
env::current_dir()
.unwrap_or_else(|_| PathBuf::from("."))
.join(".claw-telegram")
})
.join("registry.json")
}
fn print_reconcile_summary(
result: &crate::docker_worker_manager::ReconcileResult,
out: &mut impl Write,
) -> io::Result<()> {
writeln!(out, "Workers ensured: {}", result.ensured_containers.len())?;
for record in &result.ensured_containers {
writeln!(
out,
" {} -> {} ({})",
record.profile_id, record.container_name, record.base_url
)?;
}
writeln!(
out,
"Templates archived: {}",
result.archived_templates.len()
)?;
for record in &result.archived_templates {
writeln!(
out,
" {} -> {}",
record.profile_id,
record.file_path.display()
)?;
}
writeln!(out, "Profiles removed: {}", result.removed_profiles.len())?;
for profile_id in &result.removed_profiles {
writeln!(out, " {profile_id}")?;
}
Ok(())
}
fn print_help(out: &mut impl Write) -> io::Result<()> {
writeln!(out, "claw-telegram")?;
writeln!(out)?;
writeln!(out, "Usage:")?;
writeln!(out, " claw-telegram serve")?;
writeln!(out, " claw-telegram standalone serve")?;
writeln!(out, " claw-telegram gateway serve")?;
writeln!(out, " claw-telegram workers reconcile")?;
writeln!(out, " claw-telegram profiles add <profile-id> [--user-id <telegram-user-id>] [--display-name <name>]")?;
writeln!(out, " claw-telegram profiles remove <profile-id>")?;
writeln!(
out,
" claw-telegram profiles merge <source-profile-id> <target-profile-id>"
)?;
writeln!(
out,
" claw-telegram profiles channel add <profile-id> telegram <user-id>"
)?;
writeln!(
out,
" claw-telegram profiles channel remove telegram <user-id>"
)?;
writeln!(
out,
" claw-telegram migrate-standalone-registry [registry-path]"
)?;
writeln!(out)?;
writeln!(out, "Environment:")?;
writeln!(out, "Standalone environment:")?;
writeln!(out, " CLAW_TELEGRAM_BOT_TOKEN")?;
writeln!(out, " CLAW_TELEGRAM_ALLOWED_USER_IDS")?;
writeln!(out, " CLAW_TELEGRAM_STATE_ROOT")?;
@@ -50,5 +361,26 @@ fn print_help(out: &mut impl Write) -> io::Result<()> {
writeln!(out, " CLAW_TELEGRAM_MAX_UPLOAD_MB")?;
writeln!(out, " CLAW_TELEGRAM_APPROVAL_TIMEOUT_SECS")?;
writeln!(out, " CLAW_TELEGRAM_POLL_TIMEOUT_SECS")?;
writeln!(out)?;
writeln!(out, "Gateway environment:")?;
writeln!(out, " CLAW_GATEWAY_TELEGRAM_BOT_TOKEN")?;
writeln!(out, " CLAW_GATEWAY_STATE_ROOT")?;
writeln!(out, " CLAW_GATEWAY_MANIFEST")?;
writeln!(out, " CLAW_GATEWAY_DOCKER_SOCKET")?;
writeln!(out, " CLAW_GATEWAY_POLL_TIMEOUT_SECS")?;
writeln!(out, " CLAW_GATEWAY_MAX_UPLOAD_MB")?;
writeln!(out, " CLAW_GATEWAY_WORKER_IMAGE")?;
writeln!(out, " CLAW_GATEWAY_WORKER_NETWORK")?;
writeln!(out, " CLAW_GATEWAY_TEMPLATE_DIR")?;
writeln!(out, " CLAW_GATEWAY_TEMPLATE_ARCHIVE_DIR")?;
writeln!(out, " CLAW_GATEWAY_TEMPLATE_FILE_PREFIX")?;
writeln!(out, " CLAW_GATEWAY_WORKER_BIND_PORT")?;
writeln!(out, " CLAW_GATEWAY_WORKER_DEFAULT_CWD")?;
writeln!(out, " CLAW_GATEWAY_WORKER_MODEL")?;
writeln!(out, " CLAW_GATEWAY_WORKER_PERMISSION_MODE")?;
writeln!(out, " CLAW_GATEWAY_WORKER_HOST_STATE_ROOT")?;
writeln!(out, " CLAW_GATEWAY_WORKER_HOST_WORKSPACE_ROOT")?;
writeln!(out, " CLAW_GATEWAY_INHERITED_ENV")?;
writeln!(out, " CLAW_WORKER_AUTH_TOKEN")?;
Ok(())
}
@@ -128,6 +128,11 @@ impl AgentRegistry {
pub fn path(&self) -> &Path {
&self.path
}
#[must_use]
pub fn records(&self) -> Vec<AgentRegistryRecord> {
self.records.values().cloned().collect()
}
}
pub fn agent_dir(state_root: &Path, agent_id: &AgentInstanceId) -> PathBuf {
@@ -0,0 +1,156 @@
use std::collections::BTreeSet;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use channel_gateway_core::{ManagedTemplateRecord, UnraidTemplateSpec};
#[derive(Debug, Clone)]
pub struct UnraidTemplateManager {
template_dir: PathBuf,
archive_dir: PathBuf,
template_prefix: String,
}
impl UnraidTemplateManager {
pub fn new(
template_dir: impl Into<PathBuf>,
archive_dir: impl Into<PathBuf>,
template_prefix: impl Into<String>,
) -> Self {
Self {
template_dir: template_dir.into(),
archive_dir: archive_dir.into(),
template_prefix: template_prefix.into(),
}
}
pub fn write_worker_template(
&self,
profile_id: &str,
template: &UnraidTemplateSpec,
) -> Result<ManagedTemplateRecord, TemplateError> {
std::fs::create_dir_all(&self.template_dir)?;
let path = self.template_path(profile_id);
let temp_path = path.with_extension("xml.tmp");
std::fs::write(&temp_path, template.render_xml())?;
std::fs::rename(temp_path, &path)?;
Ok(ManagedTemplateRecord {
profile_id: profile_id.to_string(),
file_path: path,
})
}
pub fn archive_removed_templates(
&self,
active_profile_ids: &BTreeSet<String>,
) -> Result<Vec<ManagedTemplateRecord>, TemplateError> {
std::fs::create_dir_all(&self.archive_dir)?;
let mut archived = Vec::new();
if !self.template_dir.exists() {
return Ok(archived);
}
for entry in std::fs::read_dir(&self.template_dir)? {
let entry = entry?;
let path = entry.path();
if !path.is_file() {
continue;
}
let Some(file_name) = path.file_name().and_then(|value| value.to_str()) else {
continue;
};
if !file_name.starts_with(&self.template_prefix) || !file_name.ends_with(".xml") {
continue;
}
let profile_id = file_name
.trim_start_matches(&self.template_prefix)
.trim_end_matches(".xml")
.to_string();
if active_profile_ids.contains(&profile_id) {
continue;
}
let archived_path =
self.archive_dir
.join(format!("{}-{}", current_timestamp_secs(), file_name));
std::fs::rename(&path, &archived_path)?;
archived.push(ManagedTemplateRecord {
profile_id,
file_path: archived_path,
});
}
Ok(archived)
}
pub fn archive_template_for_profile(
&self,
profile_id: &str,
) -> Result<Option<ManagedTemplateRecord>, TemplateError> {
std::fs::create_dir_all(&self.archive_dir)?;
let path = self.template_path(profile_id);
if !path.exists() {
return Ok(None);
}
let archived_path = self.archive_dir.join(format!(
"{}-{}",
current_timestamp_secs(),
path.file_name()
.and_then(|value| value.to_str())
.unwrap_or("template.xml")
));
std::fs::rename(&path, &archived_path)?;
Ok(Some(ManagedTemplateRecord {
profile_id: profile_id.to_string(),
file_path: archived_path,
}))
}
pub fn template_path(&self, profile_id: &str) -> PathBuf {
self.template_dir
.join(format!("{}{}.xml", self.template_prefix, profile_id))
}
pub fn render_gateway_example(
&self,
destination: impl AsRef<Path>,
repository: &str,
) -> Result<(), TemplateError> {
let xml = format!(
"<?xml version=\"1.0\"?>\n<Container version=\"2\">\n <Name>claw-telegram-gateway</Name>\n <Repository>{repository}</Repository>\n <Registry></Registry>\n <Network>claw_gateway</Network>\n <MyIP></MyIP>\n <Shell>sh</Shell>\n <Privileged>false</Privileged>\n <Support>https://git.wylab.me/wylab/claw-code-parity</Support>\n <Project>claw-code-parity</Project>\n <Overview>Telegram gateway with Docker-socket worker management.</Overview>\n <Category>AI:Tools</Category>\n <WebUI></WebUI>\n <TemplateURL></TemplateURL>\n <Icon></Icon>\n <ExtraParams></ExtraParams>\n <PostArgs></PostArgs>\n <CPUset></CPUset>\n <DateInstalled>{}</DateInstalled>\n <DonateText></DonateText>\n <DonateLink></DonateLink>\n <Requires></Requires>\n <Config Name=\"Docker Socket\" Target=\"/var/run/docker.sock\" Default=\"/var/run/docker.sock\" Mode=\"rw\" Description=\"Docker socket for worker orchestration\" Type=\"Path\" Display=\"always\" Required=\"true\" Mask=\"false\">/var/run/docker.sock</Config>\n <Config Name=\"Unraid Templates\" Target=\"/unraid/templates-user\" Default=\"/boot/config/plugins/dockerMan/templates-user\" Mode=\"rw\" Description=\"Unraid template directory\" Type=\"Path\" Display=\"always\" Required=\"true\" Mask=\"false\">/boot/config/plugins/dockerMan/templates-user</Config>\n <Config Name=\"Gateway AppData\" Target=\"/appdata\" Default=\"/mnt/user/appdata/claw-telegram-gateway\" Mode=\"rw\" Description=\"Gateway state and manifest directory\" Type=\"Path\" Display=\"always\" Required=\"true\" Mask=\"false\">/mnt/user/appdata/claw-telegram-gateway</Config>\n <Config Name=\"CLAW_GATEWAY_TELEGRAM_BOT_TOKEN\" Target=\"CLAW_GATEWAY_TELEGRAM_BOT_TOKEN\" Default=\"\" Mode=\"\" Description=\"Telegram bot token\" Type=\"Variable\" Display=\"always\" Required=\"true\" Mask=\"true\"></Config>\n <Config Name=\"CLAW_WORKER_AUTH_TOKEN\" Target=\"CLAW_WORKER_AUTH_TOKEN\" Default=\"\" Mode=\"\" Description=\"Shared bearer token for worker API calls\" Type=\"Variable\" Display=\"always\" Required=\"true\" Mask=\"true\"></Config>\n <TailscaleStateDir></TailscaleStateDir>\n</Container>\n",
current_timestamp_secs()
);
let destination = destination.as_ref();
if let Some(parent) = destination.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(destination, xml)?;
Ok(())
}
}
fn current_timestamp_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs())
.unwrap_or(0)
}
#[derive(Debug)]
pub enum TemplateError {
Io(std::io::Error),
}
impl Display for TemplateError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(error) => write!(f, "{error}"),
}
}
}
impl std::error::Error for TemplateError {}
impl From<std::io::Error> for TemplateError {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
@@ -0,0 +1,312 @@
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use base64::Engine as _;
use channel_gateway_core::{
AttachmentRef, GeneratedFileDescriptor, TurnSource, WorkerApprovalDecision,
WorkerStatusResponse, WorkerTurnAccepted, WorkerTurnEvent, WorkerTurnRequest,
};
use futures_util::StreamExt;
use serde::Serialize;
use tokio::sync::mpsc;
#[derive(Clone)]
pub struct WorkerClient {
client: reqwest::Client,
base_url: String,
auth_token: String,
}
impl WorkerClient {
pub fn new(
base_url: impl Into<String>,
auth_token: impl Into<String>,
) -> Result<Self, WorkerClientError> {
let client = reqwest::Client::builder()
.user_agent("claw-telegram-gateway/0.1")
.build()
.map_err(WorkerClientError::Http)?;
Ok(Self {
client,
base_url: base_url.into().trim_end_matches('/').to_string(),
auth_token: auth_token.into(),
})
}
pub async fn health(&self) -> Result<(), WorkerClientError> {
let response = self
.client
.get(self.url("/healthz"))
.send()
.await
.map_err(WorkerClientError::Http)?;
response
.error_for_status()
.map_err(WorkerClientError::Http)?;
Ok(())
}
pub async fn status(&self) -> Result<WorkerStatusResponse, WorkerClientError> {
self.get_json("/v1/status").await
}
pub async fn reset_session(&self) -> Result<(), WorkerClientError> {
self.post_no_content("/v1/session/reset", &serde_json::json!({}))
.await
}
pub async fn post_turn(
&self,
prompt: String,
source: TurnSource,
attachments: &[AttachmentRef],
) -> Result<String, WorkerClientError> {
let request = WorkerTurnRequest {
prompt,
source,
attachments: encode_attachments(attachments).await?,
};
let accepted: WorkerTurnAccepted = self.post_json("/v1/turns", &request).await?;
Ok(accepted.turn_id)
}
pub async fn stream_turn_events(
&self,
turn_id: &str,
) -> Result<
mpsc::UnboundedReceiver<Result<WorkerTurnEvent, WorkerClientError>>,
WorkerClientError,
> {
let response = self
.client
.get(self.url(&format!("/v1/turns/{turn_id}/events")))
.bearer_auth(&self.auth_token)
.send()
.await
.map_err(WorkerClientError::Http)?;
let response = response
.error_for_status()
.map_err(WorkerClientError::Http)?;
let mut stream = response.bytes_stream();
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
let mut buffer = String::new();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
buffer.push_str(&String::from_utf8_lossy(&bytes));
while let Some(index) = buffer.find("\n\n") {
let frame = buffer[..index].to_string();
buffer = buffer[index + 2..].to_string();
for line in frame.lines() {
if let Some(data) = line.strip_prefix("data:") {
let parsed =
serde_json::from_str::<WorkerTurnEvent>(data.trim())
.map_err(WorkerClientError::Json);
let _ = tx.send(parsed);
}
}
}
}
Err(error) => {
let _ = tx.send(Err(WorkerClientError::Http(error)));
return;
}
}
}
});
Ok(rx)
}
pub async fn post_approval(
&self,
turn_id: &str,
approval_id: &str,
decision: WorkerApprovalDecision,
) -> Result<(), WorkerClientError> {
self.post_no_content(
&format!("/v1/turns/{turn_id}/approval"),
&serde_json::json!({
"approval_id": approval_id,
"decision": decision,
}),
)
.await
}
pub async fn cancel_turn(&self, turn_id: &str) -> Result<(), WorkerClientError> {
self.post_no_content(
&format!("/v1/turns/{turn_id}/cancel"),
&serde_json::json!({}),
)
.await
}
pub async fn download_generated_file(
&self,
turn_id: &str,
descriptor: &GeneratedFileDescriptor,
destination_dir: &Path,
) -> Result<PathBuf, WorkerClientError> {
tokio::fs::create_dir_all(destination_dir)
.await
.map_err(WorkerClientError::Io)?;
let response = self
.client
.get(self.url(&format!("/v1/turns/{turn_id}/files/{}", descriptor.file_id)))
.bearer_auth(&self.auth_token)
.send()
.await
.map_err(WorkerClientError::Http)?;
let response = response
.error_for_status()
.map_err(WorkerClientError::Http)?;
let bytes = response.bytes().await.map_err(WorkerClientError::Http)?;
let destination = destination_dir.join(&descriptor.file_name);
tokio::fs::write(&destination, bytes)
.await
.map_err(WorkerClientError::Io)?;
Ok(destination)
}
async fn get_json<T: for<'de> serde::Deserialize<'de>>(
&self,
path: &str,
) -> Result<T, WorkerClientError> {
let response = self
.client
.get(self.url(path))
.bearer_auth(&self.auth_token)
.send()
.await
.map_err(WorkerClientError::Http)?;
let response = response
.error_for_status()
.map_err(WorkerClientError::Http)?;
response.json().await.map_err(WorkerClientError::Http)
}
async fn post_json<T: for<'de> serde::Deserialize<'de>, B: Serialize>(
&self,
path: &str,
body: &B,
) -> Result<T, WorkerClientError> {
let response = self
.client
.post(self.url(path))
.bearer_auth(&self.auth_token)
.json(body)
.send()
.await
.map_err(WorkerClientError::Http)?;
let response = response
.error_for_status()
.map_err(WorkerClientError::Http)?;
response.json().await.map_err(WorkerClientError::Http)
}
async fn post_json_value<B: Serialize>(
&self,
path: &str,
body: &B,
) -> Result<serde_json::Value, WorkerClientError> {
let response = self
.client
.post(self.url(path))
.bearer_auth(&self.auth_token)
.json(body)
.send()
.await
.map_err(WorkerClientError::Http)?;
let response = response
.error_for_status()
.map_err(WorkerClientError::Http)?;
response.json().await.map_err(WorkerClientError::Http)
}
async fn post_no_content<B: Serialize>(
&self,
path: &str,
body: &B,
) -> Result<(), WorkerClientError> {
let response = self
.client
.post(self.url(path))
.bearer_auth(&self.auth_token)
.json(body)
.send()
.await
.map_err(WorkerClientError::Http)?;
response
.error_for_status()
.map_err(WorkerClientError::Http)?;
Ok(())
}
fn url(&self, path: &str) -> String {
format!("{}{}", self.base_url, path)
}
}
async fn encode_attachments(
attachments: &[AttachmentRef],
) -> Result<Vec<channel_gateway_core::InboundAttachment>, WorkerClientError> {
let mut encoded = Vec::new();
for attachment in attachments {
let bytes = tokio::fs::read(&attachment.path)
.await
.map_err(WorkerClientError::Io)?;
let file_name = attachment
.original_name
.clone()
.or_else(|| {
attachment
.path
.file_name()
.and_then(|value| value.to_str())
.map(ToString::to_string)
})
.unwrap_or_else(|| "attachment.bin".to_string());
encoded.push(channel_gateway_core::InboundAttachment {
file_name,
kind: attachment.kind,
media_type: media_type_for_path(&attachment.path),
data_base64: base64::engine::general_purpose::STANDARD.encode(bytes),
});
}
Ok(encoded)
}
fn media_type_for_path(path: &Path) -> Option<String> {
let extension = path.extension()?.to_str()?.to_ascii_lowercase();
let media_type = match extension.as_str() {
"png" => "image/png",
"jpg" | "jpeg" => "image/jpeg",
"gif" => "image/gif",
"webp" => "image/webp",
"pdf" => "application/pdf",
"json" => "application/json",
"txt" | "md" | "log" => "text/plain",
_ => return None,
};
Some(media_type.to_string())
}
#[derive(Debug)]
pub enum WorkerClientError {
Http(reqwest::Error),
Io(std::io::Error),
Json(serde_json::Error),
}
impl Display for WorkerClientError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Http(error) => write!(f, "{error}"),
Self::Io(error) => write!(f, "{error}"),
Self::Json(error) => write!(f, "{error}"),
}
}
}
impl std::error::Error for WorkerClientError {}