diff --git a/README.md b/README.md
index 8a15892..74c24d9 100644
--- a/README.md
+++ b/README.md
@@ -164,7 +164,7 @@ nanobot agent -m "Hello from my local LLM!"
## 💬 Chat Apps
-Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime, anywhere.
+Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, or Moltchat — anytime, anywhere.
| Channel | Setup |
|---------|-------|
@@ -172,6 +172,7 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime,
| **Discord** | Easy (bot token + intents) |
| **WhatsApp** | Medium (scan QR) |
| **Feishu** | Medium (app credentials) |
+| **Moltchat** | Medium (claw token + websocket) |
Telegram (Recommended)
@@ -205,6 +206,48 @@ nanobot gateway
+
+Moltchat (Claw IM)
+
+Uses **Socket.IO WebSocket** by default, with HTTP polling fallback.
+
+**1. Prepare credentials**
+- `clawToken`: Claw API token
+- `agentUserId`: your bot user id
+- Optional: `sessions`/`panels` with `["*"]` for auto-discovery
+
+**2. Configure**
+
+```json
+{
+ "channels": {
+ "moltchat": {
+ "enabled": true,
+ "baseUrl": "https://mochat.io",
+ "socketUrl": "https://mochat.io",
+ "socketPath": "/socket.io",
+ "clawToken": "claw_xxx",
+ "agentUserId": "69820107a785110aea8b1069",
+ "sessions": ["*"],
+ "panels": ["*"],
+ "replyDelayMode": "non-mention",
+ "replyDelayMs": 120000
+ }
+ }
+}
+```
+
+**3. Run**
+
+```bash
+nanobot gateway
+```
+
+> [!TIP]
+> Keep `clawToken` private. It should only be sent in `X-Claw-Token` header to your Moltchat API endpoint.
+
+
+
Discord
@@ -413,7 +456,7 @@ docker run -v ~/.nanobot:/root/.nanobot --rm nanobot onboard
# Edit config on host to add API keys
vim ~/.nanobot/config.json
-# Run gateway (connects to Telegram/WhatsApp)
+# Run gateway (connects to enabled channels, e.g. Telegram/Discord/Moltchat)
docker run -v ~/.nanobot:/root/.nanobot -p 18790:18790 nanobot gateway
# Or run a single command
@@ -433,7 +476,7 @@ nanobot/
│ ├── subagent.py # Background task execution
│ └── tools/ # Built-in tools (incl. spawn)
├── skills/ # 🎯 Bundled skills (github, weather, tmux...)
-├── channels/ # 📱 WhatsApp integration
+├── channels/ # 📱 Chat channel integrations
├── bus/ # 🚌 Message routing
├── cron/ # ⏰ Scheduled tasks
├── heartbeat/ # 💓 Proactive wake-up
diff --git a/nanobot/channels/__init__.py b/nanobot/channels/__init__.py
index 588169d..4d77063 100644
--- a/nanobot/channels/__init__.py
+++ b/nanobot/channels/__init__.py
@@ -2,5 +2,6 @@
from nanobot.channels.base import BaseChannel
from nanobot.channels.manager import ChannelManager
+from nanobot.channels.moltchat import MoltchatChannel
-__all__ = ["BaseChannel", "ChannelManager"]
+__all__ = ["BaseChannel", "ChannelManager", "MoltchatChannel"]
diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py
index 64ced48..11690ef 100644
--- a/nanobot/channels/manager.py
+++ b/nanobot/channels/manager.py
@@ -77,6 +77,18 @@ class ChannelManager:
logger.info("Feishu channel enabled")
except ImportError as e:
logger.warning(f"Feishu channel not available: {e}")
+
+ # Moltchat channel
+ if self.config.channels.moltchat.enabled:
+ try:
+ from nanobot.channels.moltchat import MoltchatChannel
+
+ self.channels["moltchat"] = MoltchatChannel(
+ self.config.channels.moltchat, self.bus
+ )
+ logger.info("Moltchat channel enabled")
+ except ImportError as e:
+ logger.warning(f"Moltchat channel not available: {e}")
async def start_all(self) -> None:
"""Start WhatsApp channel and the outbound dispatcher."""
diff --git a/nanobot/channels/moltchat.py b/nanobot/channels/moltchat.py
new file mode 100644
index 0000000..cc590d4
--- /dev/null
+++ b/nanobot/channels/moltchat.py
@@ -0,0 +1,1227 @@
+"""Moltchat channel implementation using Socket.IO with HTTP polling fallback."""
+
+from __future__ import annotations
+
+import asyncio
+import json
+from collections import deque
+from dataclasses import dataclass, field
+from datetime import datetime
+from typing import Any
+
+import httpx
+from loguru import logger
+
+from nanobot.bus.events import OutboundMessage
+from nanobot.bus.queue import MessageBus
+from nanobot.channels.base import BaseChannel
+from nanobot.config.schema import MoltchatConfig
+from nanobot.utils.helpers import get_data_path
+
+try:
+ import socketio
+
+ SOCKETIO_AVAILABLE = True
+except ImportError:
+ socketio = None
+ SOCKETIO_AVAILABLE = False
+
+try:
+ import msgpack # noqa: F401
+
+ MSGPACK_AVAILABLE = True
+except ImportError:
+ MSGPACK_AVAILABLE = False
+
+
+MAX_SEEN_MESSAGE_IDS = 2000
+CURSOR_SAVE_DEBOUNCE_S = 0.5
+
+
+@dataclass
+class MoltchatBufferedEntry:
+ """Buffered inbound entry for delayed dispatch."""
+
+ raw_body: str
+ author: str
+ sender_name: str = ""
+ sender_username: str = ""
+ timestamp: int | None = None
+ message_id: str = ""
+ group_id: str = ""
+
+
+@dataclass
+class DelayState:
+ """Per-target delayed message state."""
+
+ entries: list[MoltchatBufferedEntry] = field(default_factory=list)
+ lock: asyncio.Lock = field(default_factory=asyncio.Lock)
+ timer: asyncio.Task | None = None
+
+
+@dataclass
+class MoltchatTarget:
+ """Outbound target resolution result."""
+
+ id: str
+ is_panel: bool
+
+
+def normalize_moltchat_content(content: Any) -> str:
+ """Normalize content payload to text."""
+ if isinstance(content, str):
+ return content.strip()
+ if content is None:
+ return ""
+ try:
+ return json.dumps(content, ensure_ascii=False)
+ except TypeError:
+ return str(content)
+
+
+def resolve_moltchat_target(raw: str) -> MoltchatTarget:
+ """Resolve id and target kind from user-provided target string."""
+ trimmed = (raw or "").strip()
+ if not trimmed:
+ return MoltchatTarget(id="", is_panel=False)
+
+ lowered = trimmed.lower()
+ cleaned = trimmed
+ forced_panel = False
+
+ prefixes = ["moltchat:", "mochat:", "group:", "channel:", "panel:"]
+ for prefix in prefixes:
+ if lowered.startswith(prefix):
+ cleaned = trimmed[len(prefix) :].strip()
+ if prefix in {"group:", "channel:", "panel:"}:
+ forced_panel = True
+ break
+
+ if not cleaned:
+ return MoltchatTarget(id="", is_panel=False)
+
+ return MoltchatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_"))
+
+
+def extract_mention_ids(value: Any) -> list[str]:
+ """Extract mention ids from heterogeneous mention payload."""
+ if not isinstance(value, list):
+ return []
+
+ ids: list[str] = []
+ for item in value:
+ if isinstance(item, str):
+ text = item.strip()
+ if text:
+ ids.append(text)
+ continue
+
+ if not isinstance(item, dict):
+ continue
+
+ for key in ("id", "userId", "_id"):
+ candidate = item.get(key)
+ if isinstance(candidate, str) and candidate.strip():
+ ids.append(candidate.strip())
+ break
+
+ return ids
+
+
+def resolve_was_mentioned(payload: dict[str, Any], agent_user_id: str) -> bool:
+ """Resolve mention state from payload metadata and text fallback."""
+ meta = payload.get("meta")
+ if isinstance(meta, dict):
+ if meta.get("mentioned") is True or meta.get("wasMentioned") is True:
+ return True
+
+ for field in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"):
+ ids = extract_mention_ids(meta.get(field))
+ if agent_user_id and agent_user_id in ids:
+ return True
+
+ if not agent_user_id:
+ return False
+
+ content = payload.get("content")
+ if not isinstance(content, str) or not content:
+ return False
+
+ return f"<@{agent_user_id}>" in content or f"@{agent_user_id}" in content
+
+
+def resolve_require_mention(
+ config: MoltchatConfig,
+ session_id: str,
+ group_id: str,
+) -> bool:
+ """Resolve mention requirement for group/panel conversations."""
+ groups = config.groups or {}
+ if group_id and group_id in groups:
+ return bool(groups[group_id].require_mention)
+ if session_id in groups:
+ return bool(groups[session_id].require_mention)
+ if "*" in groups:
+ return bool(groups["*"].require_mention)
+ return bool(config.mention.require_in_groups)
+
+
+def build_buffered_body(entries: list[MoltchatBufferedEntry], is_group: bool) -> str:
+ """Build text body from one or more buffered entries."""
+ if not entries:
+ return ""
+
+ if len(entries) == 1:
+ return entries[0].raw_body
+
+ lines: list[str] = []
+ for entry in entries:
+ body = entry.raw_body
+ if not body:
+ continue
+ if is_group:
+ label = entry.sender_name.strip() or entry.sender_username.strip() or entry.author
+ if label:
+ lines.append(f"{label}: {body}")
+ continue
+ lines.append(body)
+
+ return "\n".join(lines).strip()
+
+
+def parse_timestamp(value: Any) -> int | None:
+ """Parse event timestamp to epoch milliseconds."""
+ if not isinstance(value, str) or not value.strip():
+ return None
+ try:
+ return int(datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() * 1000)
+ except ValueError:
+ return None
+
+
+class MoltchatChannel(BaseChannel):
+ """Moltchat channel using socket.io with fallback polling workers."""
+
+ name = "moltchat"
+
+ def __init__(self, config: MoltchatConfig, bus: MessageBus):
+ super().__init__(config, bus)
+ self.config: MoltchatConfig = config
+ self._http: httpx.AsyncClient | None = None
+ self._socket: Any = None
+ self._ws_connected = False
+ self._ws_ready = False
+
+ self._state_dir = get_data_path() / "moltchat"
+ self._cursor_path = self._state_dir / "session_cursors.json"
+ self._session_cursor: dict[str, int] = {}
+ self._cursor_save_task: asyncio.Task | None = None
+
+ self._session_set: set[str] = set()
+ self._panel_set: set[str] = set()
+ self._auto_discover_sessions = False
+ self._auto_discover_panels = False
+
+ self._cold_sessions: set[str] = set()
+ self._session_by_converse: dict[str, str] = {}
+
+ self._seen_set: dict[str, set[str]] = {}
+ self._seen_queue: dict[str, deque[str]] = {}
+
+ self._delay_states: dict[str, DelayState] = {}
+
+ self._fallback_mode = False
+ self._session_fallback_tasks: dict[str, asyncio.Task] = {}
+ self._panel_fallback_tasks: dict[str, asyncio.Task] = {}
+ self._refresh_task: asyncio.Task | None = None
+
+ self._target_locks: dict[str, asyncio.Lock] = {}
+
+ async def start(self) -> None:
+ """Start Moltchat channel workers and websocket connection."""
+ if not self.config.claw_token:
+ logger.error("Moltchat claw_token not configured")
+ return
+
+ self._running = True
+ self._http = httpx.AsyncClient(timeout=30.0)
+
+ self._state_dir.mkdir(parents=True, exist_ok=True)
+ await self._load_session_cursors()
+ self._seed_targets_from_config()
+
+ await self._refresh_targets(subscribe_new=False)
+
+ websocket_started = await self._start_socket_client()
+ if not websocket_started:
+ await self._ensure_fallback_workers()
+
+ self._refresh_task = asyncio.create_task(self._refresh_loop())
+
+ while self._running:
+ await asyncio.sleep(1)
+
+ async def stop(self) -> None:
+ """Stop all workers and clean up resources."""
+ self._running = False
+
+ if self._refresh_task:
+ self._refresh_task.cancel()
+ self._refresh_task = None
+
+ await self._stop_fallback_workers()
+ await self._cancel_delay_timers()
+
+ if self._socket:
+ try:
+ await self._socket.disconnect()
+ except Exception:
+ pass
+ self._socket = None
+
+ if self._cursor_save_task:
+ self._cursor_save_task.cancel()
+ self._cursor_save_task = None
+
+ await self._save_session_cursors()
+
+ if self._http:
+ await self._http.aclose()
+ self._http = None
+
+ self._ws_connected = False
+ self._ws_ready = False
+
+ async def send(self, msg: OutboundMessage) -> None:
+ """Send outbound message to session or panel."""
+ if not self.config.claw_token:
+ logger.warning("Moltchat claw_token missing, skip send")
+ return
+
+ content_parts = [msg.content.strip()] if msg.content and msg.content.strip() else []
+ if msg.media:
+ content_parts.extend([m for m in msg.media if isinstance(m, str) and m.strip()])
+ content = "\n".join(content_parts).strip()
+ if not content:
+ return
+
+ target = resolve_moltchat_target(msg.chat_id)
+ if not target.id:
+ logger.warning("Moltchat outbound target is empty")
+ return
+
+ is_panel = target.is_panel or target.id in self._panel_set
+ if target.id.startswith("session_"):
+ is_panel = False
+
+ try:
+ if is_panel:
+ await self._send_panel_message(
+ panel_id=target.id,
+ content=content,
+ reply_to=msg.reply_to,
+ group_id=self._read_group_id(msg.metadata),
+ )
+ else:
+ await self._send_session_message(
+ session_id=target.id,
+ content=content,
+ reply_to=msg.reply_to,
+ )
+ except Exception as e:
+ logger.error(f"Failed to send Moltchat message: {e}")
+
+ def _seed_targets_from_config(self) -> None:
+ sessions, self._auto_discover_sessions = self._normalize_id_list(self.config.sessions)
+ panels, self._auto_discover_panels = self._normalize_id_list(self.config.panels)
+
+ self._session_set.update(sessions)
+ self._panel_set.update(panels)
+
+ for session_id in sessions:
+ if session_id not in self._session_cursor:
+ self._cold_sessions.add(session_id)
+
+ def _normalize_id_list(self, values: list[str]) -> tuple[list[str], bool]:
+ cleaned = [str(v).strip() for v in values if str(v).strip()]
+ has_wildcard = "*" in cleaned
+ ids = sorted({v for v in cleaned if v != "*"})
+ return ids, has_wildcard
+
+ async def _start_socket_client(self) -> bool:
+ if not SOCKETIO_AVAILABLE:
+ logger.warning("python-socketio not installed, Moltchat using polling fallback")
+ return False
+
+ serializer = "default"
+ if not self.config.socket_disable_msgpack:
+ if MSGPACK_AVAILABLE:
+ serializer = "msgpack"
+ else:
+ logger.warning(
+ "msgpack is not installed but socket_disable_msgpack=false; "
+ "trying JSON serializer"
+ )
+
+ reconnect_attempts = None
+ if self.config.max_retry_attempts > 0:
+ reconnect_attempts = self.config.max_retry_attempts
+
+ client = socketio.AsyncClient(
+ reconnection=True,
+ reconnection_attempts=reconnect_attempts,
+ reconnection_delay=max(0.1, self.config.socket_reconnect_delay_ms / 1000.0),
+ reconnection_delay_max=max(
+ 0.1,
+ self.config.socket_max_reconnect_delay_ms / 1000.0,
+ ),
+ logger=False,
+ engineio_logger=False,
+ serializer=serializer,
+ )
+
+ @client.event
+ async def connect() -> None:
+ self._ws_connected = True
+ self._ws_ready = False
+ logger.info("Moltchat websocket connected")
+
+ subscribed = await self._subscribe_all()
+ self._ws_ready = subscribed
+ if subscribed:
+ await self._stop_fallback_workers()
+ else:
+ await self._ensure_fallback_workers()
+
+ @client.event
+ async def disconnect() -> None:
+ if not self._running:
+ return
+ self._ws_connected = False
+ self._ws_ready = False
+ logger.warning("Moltchat websocket disconnected")
+ await self._ensure_fallback_workers()
+
+ @client.event
+ async def connect_error(data: Any) -> None:
+ message = str(data)
+ logger.error(f"Moltchat websocket connect error: {message}")
+
+ @client.on("claw.session.events")
+ async def on_session_events(payload: dict[str, Any]) -> None:
+ await self._handle_watch_payload(payload, target_kind="session")
+
+ @client.on("claw.panel.events")
+ async def on_panel_events(payload: dict[str, Any]) -> None:
+ await self._handle_watch_payload(payload, target_kind="panel")
+
+ for event_name in (
+ "notify:chat.inbox.append",
+ "notify:chat.message.add",
+ "notify:chat.message.update",
+ "notify:chat.message.recall",
+ "notify:chat.message.delete",
+ ):
+ client.on(event_name, self._build_notify_handler(event_name))
+
+ socket_url = (self.config.socket_url or self.config.base_url).strip().rstrip("/")
+ socket_path = (self.config.socket_path or "/socket.io").strip()
+ if socket_path.startswith("/"):
+ socket_path = socket_path[1:]
+
+ try:
+ self._socket = client
+ await client.connect(
+ socket_url,
+ transports=["websocket"],
+ socketio_path=socket_path,
+ auth={"token": self.config.claw_token},
+ wait_timeout=max(1.0, self.config.socket_connect_timeout_ms / 1000.0),
+ )
+ return True
+ except Exception as e:
+ logger.error(f"Failed to connect Moltchat websocket: {e}")
+ try:
+ await client.disconnect()
+ except Exception:
+ pass
+ self._socket = None
+ return False
+
+ def _build_notify_handler(self, event_name: str):
+ async def handler(payload: Any) -> None:
+ if event_name == "notify:chat.inbox.append":
+ await self._handle_notify_inbox_append(payload)
+ return
+
+ if event_name.startswith("notify:chat.message."):
+ await self._handle_notify_chat_message(payload)
+
+ return handler
+
+ async def _subscribe_all(self) -> bool:
+ sessions_ok = await self._subscribe_sessions(sorted(self._session_set))
+ panels_ok = await self._subscribe_panels(sorted(self._panel_set))
+
+ if self._auto_discover_sessions or self._auto_discover_panels:
+ await self._refresh_targets(subscribe_new=True)
+
+ return sessions_ok and panels_ok
+
+ async def _subscribe_sessions(self, session_ids: list[str]) -> bool:
+ if not session_ids:
+ return True
+
+ for session_id in session_ids:
+ if session_id not in self._session_cursor:
+ self._cold_sessions.add(session_id)
+
+ ack = await self._socket_call(
+ "com.claw.im.subscribeSessions",
+ {
+ "sessionIds": session_ids,
+ "cursors": self._session_cursor,
+ "limit": self.config.watch_limit,
+ },
+ )
+ if not ack.get("result"):
+ logger.error(f"Moltchat subscribeSessions failed: {ack.get('message', 'unknown error')}")
+ return False
+
+ data = ack.get("data")
+ items: list[dict[str, Any]] = []
+ if isinstance(data, list):
+ items = [item for item in data if isinstance(item, dict)]
+ elif isinstance(data, dict):
+ sessions = data.get("sessions")
+ if isinstance(sessions, list):
+ items = [item for item in sessions if isinstance(item, dict)]
+ elif "sessionId" in data:
+ items = [data]
+
+ for payload in items:
+ await self._handle_watch_payload(payload, target_kind="session")
+
+ return True
+
+ async def _subscribe_panels(self, panel_ids: list[str]) -> bool:
+ if not self._auto_discover_panels and not panel_ids:
+ return True
+
+ ack = await self._socket_call(
+ "com.claw.im.subscribePanels",
+ {
+ "panelIds": panel_ids,
+ },
+ )
+ if not ack.get("result"):
+ logger.error(f"Moltchat subscribePanels failed: {ack.get('message', 'unknown error')}")
+ return False
+
+ return True
+
+ async def _socket_call(self, event_name: str, payload: dict[str, Any]) -> dict[str, Any]:
+ if not self._socket:
+ return {"result": False, "message": "socket not connected"}
+
+ try:
+ raw = await self._socket.call(event_name, payload, timeout=10)
+ except Exception as e:
+ return {"result": False, "message": str(e)}
+
+ if isinstance(raw, dict):
+ return raw
+
+ return {"result": True, "data": raw}
+
+ async def _refresh_loop(self) -> None:
+ interval_s = max(1.0, self.config.refresh_interval_ms / 1000.0)
+
+ while self._running:
+ await asyncio.sleep(interval_s)
+
+ try:
+ await self._refresh_targets(subscribe_new=self._ws_ready)
+ except Exception as e:
+ logger.warning(f"Moltchat refresh failed: {e}")
+
+ if self._fallback_mode:
+ await self._ensure_fallback_workers()
+
+ async def _refresh_targets(self, subscribe_new: bool) -> None:
+ if self._auto_discover_sessions:
+ await self._refresh_sessions_directory(subscribe_new=subscribe_new)
+
+ if self._auto_discover_panels:
+ await self._refresh_panels(subscribe_new=subscribe_new)
+
+ async def _refresh_sessions_directory(self, subscribe_new: bool) -> None:
+ try:
+ response = await self._list_sessions()
+ except Exception as e:
+ logger.warning(f"Moltchat listSessions failed: {e}")
+ return
+
+ sessions = response.get("sessions")
+ if not isinstance(sessions, list):
+ return
+
+ new_sessions: list[str] = []
+ for session in sessions:
+ if not isinstance(session, dict):
+ continue
+
+ session_id = str(session.get("sessionId") or "").strip()
+ if not session_id:
+ continue
+
+ if session_id not in self._session_set:
+ self._session_set.add(session_id)
+ new_sessions.append(session_id)
+ if session_id not in self._session_cursor:
+ self._cold_sessions.add(session_id)
+
+ converse_id = str(session.get("converseId") or "").strip()
+ if converse_id:
+ self._session_by_converse[converse_id] = session_id
+
+ if not new_sessions:
+ return
+
+ if self._ws_ready and subscribe_new:
+ await self._subscribe_sessions(new_sessions)
+
+ if self._fallback_mode:
+ await self._ensure_fallback_workers()
+
+ async def _refresh_panels(self, subscribe_new: bool) -> None:
+ try:
+ response = await self._get_workspace_group()
+ except Exception as e:
+ logger.warning(f"Moltchat getWorkspaceGroup failed: {e}")
+ return
+
+ raw_panels = response.get("panels")
+ if not isinstance(raw_panels, list):
+ return
+
+ new_panels: list[str] = []
+ for panel in raw_panels:
+ if not isinstance(panel, dict):
+ continue
+
+ panel_type = panel.get("type")
+ if isinstance(panel_type, int) and panel_type != 0:
+ continue
+
+ panel_id = str(panel.get("id") or panel.get("_id") or "").strip()
+ if not panel_id:
+ continue
+
+ if panel_id not in self._panel_set:
+ self._panel_set.add(panel_id)
+ new_panels.append(panel_id)
+
+ if not new_panels:
+ return
+
+ if self._ws_ready and subscribe_new:
+ await self._subscribe_panels(new_panels)
+
+ if self._fallback_mode:
+ await self._ensure_fallback_workers()
+
+ async def _ensure_fallback_workers(self) -> None:
+ if not self._running:
+ return
+
+ self._fallback_mode = True
+
+ for session_id in sorted(self._session_set):
+ task = self._session_fallback_tasks.get(session_id)
+ if task and not task.done():
+ continue
+ self._session_fallback_tasks[session_id] = asyncio.create_task(
+ self._session_watch_worker(session_id)
+ )
+
+ for panel_id in sorted(self._panel_set):
+ task = self._panel_fallback_tasks.get(panel_id)
+ if task and not task.done():
+ continue
+ self._panel_fallback_tasks[panel_id] = asyncio.create_task(
+ self._panel_poll_worker(panel_id)
+ )
+
+ async def _stop_fallback_workers(self) -> None:
+ self._fallback_mode = False
+
+ tasks = [
+ *self._session_fallback_tasks.values(),
+ *self._panel_fallback_tasks.values(),
+ ]
+ for task in tasks:
+ task.cancel()
+
+ if tasks:
+ await asyncio.gather(*tasks, return_exceptions=True)
+
+ self._session_fallback_tasks.clear()
+ self._panel_fallback_tasks.clear()
+
+ async def _session_watch_worker(self, session_id: str) -> None:
+ while self._running and self._fallback_mode:
+ try:
+ payload = await self._watch_session(
+ session_id=session_id,
+ cursor=self._session_cursor.get(session_id, 0),
+ timeout_ms=self.config.watch_timeout_ms,
+ limit=self.config.watch_limit,
+ )
+ await self._handle_watch_payload(payload, target_kind="session")
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ logger.warning(f"Moltchat watch fallback error ({session_id}): {e}")
+ await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0))
+
+ async def _panel_poll_worker(self, panel_id: str) -> None:
+ sleep_s = max(1.0, self.config.refresh_interval_ms / 1000.0)
+
+ while self._running and self._fallback_mode:
+ try:
+ response = await self._list_panel_messages(
+ panel_id=panel_id,
+ limit=min(100, max(1, self.config.watch_limit)),
+ )
+
+ raw_messages = response.get("messages")
+ if isinstance(raw_messages, list):
+ for message in reversed(raw_messages):
+ if not isinstance(message, dict):
+ continue
+
+ synthetic_event = {
+ "type": "message.add",
+ "timestamp": message.get("createdAt") or datetime.utcnow().isoformat(),
+ "payload": {
+ "messageId": str(message.get("messageId") or ""),
+ "author": str(message.get("author") or ""),
+ "authorInfo": message.get("authorInfo") if isinstance(message.get("authorInfo"), dict) else {},
+ "content": message.get("content"),
+ "meta": message.get("meta") if isinstance(message.get("meta"), dict) else {},
+ "groupId": str(response.get("groupId") or ""),
+ "converseId": panel_id,
+ },
+ }
+ await self._process_inbound_event(
+ target_id=panel_id,
+ event=synthetic_event,
+ target_kind="panel",
+ )
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ logger.warning(f"Moltchat panel polling error ({panel_id}): {e}")
+
+ await asyncio.sleep(sleep_s)
+
+ async def _handle_watch_payload(
+ self,
+ payload: dict[str, Any],
+ target_kind: str,
+ ) -> None:
+ if not isinstance(payload, dict):
+ return
+
+ target_id = str(payload.get("sessionId") or "").strip()
+ if not target_id:
+ return
+
+ lock = self._target_locks.setdefault(f"{target_kind}:{target_id}", asyncio.Lock())
+ async with lock:
+ previous_cursor = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0
+ payload_cursor = payload.get("cursor")
+ if (
+ target_kind == "session"
+ and isinstance(payload_cursor, int)
+ and payload_cursor >= 0
+ ):
+ self._mark_session_cursor(target_id, payload_cursor)
+
+ raw_events = payload.get("events")
+ if not isinstance(raw_events, list):
+ return
+
+ if target_kind == "session" and target_id in self._cold_sessions:
+ self._cold_sessions.discard(target_id)
+ return
+
+ for event in raw_events:
+ if not isinstance(event, dict):
+ continue
+ seq = event.get("seq")
+ if (
+ target_kind == "session"
+ and isinstance(seq, int)
+ and seq > self._session_cursor.get(target_id, previous_cursor)
+ ):
+ self._mark_session_cursor(target_id, seq)
+
+ if event.get("type") != "message.add":
+ continue
+
+ await self._process_inbound_event(
+ target_id=target_id,
+ event=event,
+ target_kind=target_kind,
+ )
+
+ async def _process_inbound_event(
+ self,
+ target_id: str,
+ event: dict[str, Any],
+ target_kind: str,
+ ) -> None:
+ payload = event.get("payload")
+ if not isinstance(payload, dict):
+ return
+
+ author = str(payload.get("author") or "").strip()
+ if not author:
+ return
+
+ if self.config.agent_user_id and author == self.config.agent_user_id:
+ return
+
+ if not self.is_allowed(author):
+ return
+
+ message_id = str(payload.get("messageId") or "").strip()
+ seen_key = f"{target_kind}:{target_id}"
+ if message_id and self._remember_message_id(seen_key, message_id):
+ return
+
+ raw_body = normalize_moltchat_content(payload.get("content"))
+ if not raw_body:
+ raw_body = "[empty message]"
+
+ author_info = payload.get("authorInfo") if isinstance(payload.get("authorInfo"), dict) else {}
+ sender_name = str(author_info.get("nickname") or author_info.get("email") or "").strip()
+ sender_username = str(author_info.get("agentId") or "").strip()
+
+ group_id = str(payload.get("groupId") or "").strip()
+ is_group = bool(group_id)
+ was_mentioned = resolve_was_mentioned(payload, self.config.agent_user_id)
+
+ require_mention = (
+ target_kind == "panel"
+ and is_group
+ and resolve_require_mention(self.config, target_id, group_id)
+ )
+
+ use_delay = target_kind == "panel" and self.config.reply_delay_mode == "non-mention"
+
+ if require_mention and not was_mentioned and not use_delay:
+ return
+
+ entry = MoltchatBufferedEntry(
+ raw_body=raw_body,
+ author=author,
+ sender_name=sender_name,
+ sender_username=sender_username,
+ timestamp=parse_timestamp(event.get("timestamp")),
+ message_id=message_id,
+ group_id=group_id,
+ )
+
+ if use_delay:
+ delay_key = f"{target_kind}:{target_id}"
+ if was_mentioned:
+ await self._flush_delayed_entries(
+ key=delay_key,
+ target_id=target_id,
+ target_kind=target_kind,
+ reason="mention",
+ entry=entry,
+ )
+ else:
+ await self._enqueue_delayed_entry(
+ key=delay_key,
+ target_id=target_id,
+ target_kind=target_kind,
+ entry=entry,
+ )
+ return
+
+ await self._dispatch_entries(
+ target_id=target_id,
+ target_kind=target_kind,
+ entries=[entry],
+ was_mentioned=was_mentioned,
+ )
+
+ def _remember_message_id(self, key: str, message_id: str) -> bool:
+ seen_set = self._seen_set.setdefault(key, set())
+ seen_queue = self._seen_queue.setdefault(key, deque())
+
+ if message_id in seen_set:
+ return True
+
+ seen_set.add(message_id)
+ seen_queue.append(message_id)
+
+ while len(seen_queue) > MAX_SEEN_MESSAGE_IDS:
+ removed = seen_queue.popleft()
+ seen_set.discard(removed)
+
+ return False
+
+ async def _enqueue_delayed_entry(
+ self,
+ key: str,
+ target_id: str,
+ target_kind: str,
+ entry: MoltchatBufferedEntry,
+ ) -> None:
+ state = self._delay_states.setdefault(key, DelayState())
+
+ async with state.lock:
+ state.entries.append(entry)
+ if state.timer:
+ state.timer.cancel()
+
+ state.timer = asyncio.create_task(
+ self._delay_flush_after(key, target_id, target_kind)
+ )
+
+ async def _delay_flush_after(self, key: str, target_id: str, target_kind: str) -> None:
+ await asyncio.sleep(max(0, self.config.reply_delay_ms) / 1000.0)
+ await self._flush_delayed_entries(
+ key=key,
+ target_id=target_id,
+ target_kind=target_kind,
+ reason="timer",
+ entry=None,
+ )
+
+ async def _flush_delayed_entries(
+ self,
+ key: str,
+ target_id: str,
+ target_kind: str,
+ reason: str,
+ entry: MoltchatBufferedEntry | None,
+ ) -> None:
+ state = self._delay_states.setdefault(key, DelayState())
+
+ async with state.lock:
+ if entry:
+ state.entries.append(entry)
+
+ current = asyncio.current_task()
+ if state.timer and state.timer is not current:
+ state.timer.cancel()
+ state.timer = None
+ elif state.timer is current:
+ state.timer = None
+
+ entries = state.entries[:]
+ state.entries.clear()
+
+ if not entries:
+ return
+
+ await self._dispatch_entries(
+ target_id=target_id,
+ target_kind=target_kind,
+ entries=entries,
+ was_mentioned=(reason == "mention"),
+ )
+
+ async def _dispatch_entries(
+ self,
+ target_id: str,
+ target_kind: str,
+ entries: list[MoltchatBufferedEntry],
+ was_mentioned: bool,
+ ) -> None:
+ if not entries:
+ return
+
+ is_group = bool(entries[-1].group_id)
+ body = build_buffered_body(entries, is_group)
+ if not body:
+ body = "[empty message]"
+
+ last = entries[-1]
+ metadata = {
+ "message_id": last.message_id,
+ "timestamp": last.timestamp,
+ "is_group": is_group,
+ "group_id": last.group_id,
+ "sender_name": last.sender_name,
+ "sender_username": last.sender_username,
+ "target_kind": target_kind,
+ "was_mentioned": was_mentioned,
+ "buffered_count": len(entries),
+ }
+
+ await self._handle_message(
+ sender_id=last.author,
+ chat_id=target_id,
+ content=body,
+ metadata=metadata,
+ )
+
+ async def _cancel_delay_timers(self) -> None:
+ for state in self._delay_states.values():
+ if state.timer:
+ state.timer.cancel()
+ state.timer = None
+ self._delay_states.clear()
+
+ async def _handle_notify_chat_message(self, payload: Any) -> None:
+ if not isinstance(payload, dict):
+ return
+
+ group_id = str(payload.get("groupId") or "").strip()
+ panel_id = str(payload.get("converseId") or payload.get("panelId") or "").strip()
+ if not group_id or not panel_id:
+ return
+
+ if self._panel_set and panel_id not in self._panel_set:
+ return
+
+ synthetic_event = {
+ "type": "message.add",
+ "timestamp": payload.get("createdAt") or datetime.utcnow().isoformat(),
+ "payload": {
+ "messageId": str(payload.get("_id") or payload.get("messageId") or ""),
+ "author": str(payload.get("author") or ""),
+ "authorInfo": payload.get("authorInfo") if isinstance(payload.get("authorInfo"), dict) else {},
+ "content": payload.get("content"),
+ "meta": payload.get("meta") if isinstance(payload.get("meta"), dict) else {},
+ "groupId": group_id,
+ "converseId": panel_id,
+ },
+ }
+ await self._process_inbound_event(
+ target_id=panel_id,
+ event=synthetic_event,
+ target_kind="panel",
+ )
+
+ async def _handle_notify_inbox_append(self, payload: Any) -> None:
+ if not isinstance(payload, dict):
+ return
+
+ if payload.get("type") != "message":
+ return
+
+ detail = payload.get("payload")
+ if not isinstance(detail, dict):
+ return
+
+ group_id = str(detail.get("groupId") or "").strip()
+ if group_id:
+ return
+
+ converse_id = str(detail.get("converseId") or "").strip()
+ if not converse_id:
+ return
+
+ session_id = self._session_by_converse.get(converse_id)
+ if not session_id:
+ await self._refresh_sessions_directory(subscribe_new=self._ws_ready)
+ session_id = self._session_by_converse.get(converse_id)
+ if not session_id:
+ return
+
+ message_id = str(detail.get("messageId") or payload.get("_id") or "").strip()
+ author = str(detail.get("messageAuthor") or "").strip()
+ content = str(detail.get("messagePlainContent") or detail.get("messageSnippet") or "").strip()
+
+ synthetic_event = {
+ "type": "message.add",
+ "timestamp": payload.get("createdAt") or datetime.utcnow().isoformat(),
+ "payload": {
+ "messageId": message_id,
+ "author": author,
+ "content": content,
+ "meta": {
+ "source": "notify:chat.inbox.append",
+ "converseId": converse_id,
+ },
+ "converseId": converse_id,
+ },
+ }
+
+ await self._process_inbound_event(
+ target_id=session_id,
+ event=synthetic_event,
+ target_kind="session",
+ )
+
+ def _mark_session_cursor(self, session_id: str, cursor: int) -> None:
+ if cursor < 0:
+ return
+
+ previous = self._session_cursor.get(session_id, 0)
+ if cursor < previous:
+ return
+
+ self._session_cursor[session_id] = cursor
+ self._schedule_cursor_save()
+
+ def _schedule_cursor_save(self) -> None:
+ if self._cursor_save_task and not self._cursor_save_task.done():
+ return
+
+ self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced())
+
+ async def _save_cursor_debounced(self) -> None:
+ await asyncio.sleep(CURSOR_SAVE_DEBOUNCE_S)
+ await self._save_session_cursors()
+
+ async def _load_session_cursors(self) -> None:
+ if not self._cursor_path.exists():
+ return
+
+ try:
+ data = json.loads(self._cursor_path.read_text("utf-8"))
+ except Exception as e:
+ logger.warning(f"Failed to read Moltchat cursor file: {e}")
+ return
+
+ cursors = data.get("cursors") if isinstance(data, dict) else None
+ if not isinstance(cursors, dict):
+ return
+
+ for session_id, cursor in cursors.items():
+ if isinstance(session_id, str) and isinstance(cursor, int) and cursor >= 0:
+ self._session_cursor[session_id] = cursor
+
+ async def _save_session_cursors(self) -> None:
+ payload = {
+ "schemaVersion": 1,
+ "updatedAt": datetime.utcnow().isoformat(),
+ "cursors": self._session_cursor,
+ }
+
+ try:
+ self._state_dir.mkdir(parents=True, exist_ok=True)
+ self._cursor_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", "utf-8")
+ except Exception as e:
+ logger.warning(f"Failed to save Moltchat cursor file: {e}")
+
+ def _base_url(self) -> str:
+ return self.config.base_url.strip().rstrip("/")
+
+ async def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
+ if not self._http:
+ raise RuntimeError("Moltchat HTTP client not initialized")
+
+ url = f"{self._base_url()}{path}"
+ response = await self._http.post(
+ url,
+ headers={
+ "Content-Type": "application/json",
+ "X-Claw-Token": self.config.claw_token,
+ },
+ json=payload,
+ )
+
+ text = response.text
+ if not response.is_success:
+ raise RuntimeError(f"Moltchat HTTP {response.status_code}: {text[:200]}")
+
+ parsed: Any
+ try:
+ parsed = response.json()
+ except Exception:
+ parsed = text
+
+ if isinstance(parsed, dict) and isinstance(parsed.get("code"), int):
+ if parsed["code"] != 200:
+ message = str(parsed.get("message") or parsed.get("name") or "request failed")
+ raise RuntimeError(f"Moltchat API error: {message} (code={parsed['code']})")
+ data = parsed.get("data")
+ return data if isinstance(data, dict) else {}
+
+ if isinstance(parsed, dict):
+ return parsed
+
+ return {}
+
+ async def _watch_session(
+ self,
+ session_id: str,
+ cursor: int,
+ timeout_ms: int,
+ limit: int,
+ ) -> dict[str, Any]:
+ return await self._post_json(
+ "/api/claw/sessions/watch",
+ {
+ "sessionId": session_id,
+ "cursor": cursor,
+ "timeoutMs": timeout_ms,
+ "limit": limit,
+ },
+ )
+
+ async def _send_session_message(
+ self,
+ session_id: str,
+ content: str,
+ reply_to: str | None,
+ ) -> dict[str, Any]:
+ payload = {
+ "sessionId": session_id,
+ "content": content,
+ }
+ if reply_to:
+ payload["replyTo"] = reply_to
+ return await self._post_json("/api/claw/sessions/send", payload)
+
+ async def _send_panel_message(
+ self,
+ panel_id: str,
+ content: str,
+ reply_to: str | None,
+ group_id: str | None,
+ ) -> dict[str, Any]:
+ payload = {
+ "panelId": panel_id,
+ "content": content,
+ }
+ if reply_to:
+ payload["replyTo"] = reply_to
+ if group_id:
+ payload["groupId"] = group_id
+ return await self._post_json("/api/claw/groups/panels/send", payload)
+
+ async def _list_sessions(self) -> dict[str, Any]:
+ return await self._post_json("/api/claw/sessions/list", {})
+
+ async def _get_workspace_group(self) -> dict[str, Any]:
+ return await self._post_json("/api/claw/groups/get", {})
+
+ async def _list_panel_messages(self, panel_id: str, limit: int) -> dict[str, Any]:
+ return await self._post_json(
+ "/api/claw/groups/panels/messages",
+ {
+ "panelId": panel_id,
+ "limit": limit,
+ },
+ )
+
+ def _read_group_id(self, metadata: dict[str, Any]) -> str | None:
+ if not isinstance(metadata, dict):
+ return None
+ value = metadata.get("group_id") or metadata.get("groupId")
+ if isinstance(value, str) and value.strip():
+ return value.strip()
+ return None
diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py
index 19e62e9..2039f82 100644
--- a/nanobot/cli/commands.py
+++ b/nanobot/cli/commands.py
@@ -366,6 +366,24 @@ def channels_status():
"✓" if dc.enabled else "✗",
dc.gateway_url
)
+
+ # Feishu
+ fs = config.channels.feishu
+ fs_config = f"app_id: {fs.app_id[:10]}..." if fs.app_id else "[dim]not configured[/dim]"
+ table.add_row(
+ "Feishu",
+ "✓" if fs.enabled else "✗",
+ fs_config
+ )
+
+ # Moltchat
+ mc = config.channels.moltchat
+ mc_base = mc.base_url or "[dim]not configured[/dim]"
+ table.add_row(
+ "Moltchat",
+ "✓" if mc.enabled else "✗",
+ mc_base
+ )
# Telegram
tg = config.channels.telegram
diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py
index 7724288..4df4251 100644
--- a/nanobot/config/schema.py
+++ b/nanobot/config/schema.py
@@ -39,12 +39,49 @@ class DiscordConfig(BaseModel):
intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT
+class MoltchatMentionConfig(BaseModel):
+ """Moltchat mention behavior configuration."""
+ require_in_groups: bool = False
+
+
+class MoltchatGroupRule(BaseModel):
+ """Moltchat per-group mention requirement."""
+ require_mention: bool = False
+
+
+class MoltchatConfig(BaseModel):
+ """Moltchat channel configuration."""
+ enabled: bool = False
+ base_url: str = "http://localhost:11000"
+ socket_url: str = ""
+ socket_path: str = "/socket.io"
+ socket_disable_msgpack: bool = False
+ socket_reconnect_delay_ms: int = 1000
+ socket_max_reconnect_delay_ms: int = 10000
+ socket_connect_timeout_ms: int = 10000
+ refresh_interval_ms: int = 30000
+ watch_timeout_ms: int = 25000
+ watch_limit: int = 100
+ retry_delay_ms: int = 500
+ max_retry_attempts: int = 0 # 0 means unlimited retries
+ claw_token: str = ""
+ agent_user_id: str = ""
+ sessions: list[str] = Field(default_factory=list)
+ panels: list[str] = Field(default_factory=list)
+ allow_from: list[str] = Field(default_factory=list)
+ mention: MoltchatMentionConfig = Field(default_factory=MoltchatMentionConfig)
+ groups: dict[str, MoltchatGroupRule] = Field(default_factory=dict)
+ reply_delay_mode: str = "non-mention" # off | non-mention
+ reply_delay_ms: int = 120000
+
+
class ChannelsConfig(BaseModel):
"""Configuration for chat channels."""
whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig)
telegram: TelegramConfig = Field(default_factory=TelegramConfig)
discord: DiscordConfig = Field(default_factory=DiscordConfig)
feishu: FeishuConfig = Field(default_factory=FeishuConfig)
+ moltchat: MoltchatConfig = Field(default_factory=MoltchatConfig)
class AgentDefaults(BaseModel):
diff --git a/pyproject.toml b/pyproject.toml
index 2a952a1..81d38b7 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -30,6 +30,8 @@ dependencies = [
"croniter>=2.0.0",
"python-telegram-bot>=21.0",
"lark-oapi>=1.0.0",
+ "python-socketio>=5.11.0",
+ "msgpack>=1.0.8",
]
[project.optional-dependencies]
diff --git a/tests/test_moltchat_channel.py b/tests/test_moltchat_channel.py
new file mode 100644
index 0000000..1f65a68
--- /dev/null
+++ b/tests/test_moltchat_channel.py
@@ -0,0 +1,115 @@
+import pytest
+
+from nanobot.bus.queue import MessageBus
+from nanobot.channels.moltchat import (
+ MoltchatBufferedEntry,
+ MoltchatChannel,
+ build_buffered_body,
+ resolve_moltchat_target,
+ resolve_require_mention,
+ resolve_was_mentioned,
+)
+from nanobot.config.schema import MoltchatConfig, MoltchatGroupRule, MoltchatMentionConfig
+
+
+def test_resolve_moltchat_target_prefixes() -> None:
+ t = resolve_moltchat_target("panel:abc")
+ assert t.id == "abc"
+ assert t.is_panel is True
+
+ t = resolve_moltchat_target("session_123")
+ assert t.id == "session_123"
+ assert t.is_panel is False
+
+ t = resolve_moltchat_target("mochat:session_456")
+ assert t.id == "session_456"
+ assert t.is_panel is False
+
+
+def test_resolve_was_mentioned_from_meta_and_text() -> None:
+ payload = {
+ "content": "hello",
+ "meta": {
+ "mentionIds": ["bot-1"],
+ },
+ }
+ assert resolve_was_mentioned(payload, "bot-1") is True
+
+ payload = {"content": "ping <@bot-2>", "meta": {}}
+ assert resolve_was_mentioned(payload, "bot-2") is True
+
+
+def test_resolve_require_mention_priority() -> None:
+ cfg = MoltchatConfig(
+ groups={
+ "*": MoltchatGroupRule(require_mention=False),
+ "group-a": MoltchatGroupRule(require_mention=True),
+ },
+ mention=MoltchatMentionConfig(require_in_groups=False),
+ )
+
+ assert resolve_require_mention(cfg, session_id="panel-x", group_id="group-a") is True
+ assert resolve_require_mention(cfg, session_id="panel-x", group_id="group-b") is False
+
+
+@pytest.mark.asyncio
+async def test_delay_buffer_flushes_on_mention() -> None:
+ bus = MessageBus()
+ cfg = MoltchatConfig(
+ enabled=True,
+ claw_token="token",
+ agent_user_id="bot",
+ reply_delay_mode="non-mention",
+ reply_delay_ms=60_000,
+ )
+ channel = MoltchatChannel(cfg, bus)
+
+ first = {
+ "type": "message.add",
+ "timestamp": "2026-02-07T10:00:00Z",
+ "payload": {
+ "messageId": "m1",
+ "author": "user1",
+ "content": "first",
+ "groupId": "group-1",
+ "meta": {},
+ },
+ }
+ second = {
+ "type": "message.add",
+ "timestamp": "2026-02-07T10:00:01Z",
+ "payload": {
+ "messageId": "m2",
+ "author": "user2",
+ "content": "hello <@bot>",
+ "groupId": "group-1",
+ "meta": {},
+ },
+ }
+
+ await channel._process_inbound_event(target_id="panel-1", event=first, target_kind="panel")
+ assert bus.inbound_size == 0
+
+ await channel._process_inbound_event(target_id="panel-1", event=second, target_kind="panel")
+ assert bus.inbound_size == 1
+
+ msg = await bus.consume_inbound()
+ assert msg.channel == "moltchat"
+ assert msg.chat_id == "panel-1"
+ assert "user1: first" in msg.content
+ assert "user2: hello <@bot>" in msg.content
+ assert msg.metadata.get("buffered_count") == 2
+
+ await channel._cancel_delay_timers()
+
+
+def test_build_buffered_body_group_labels() -> None:
+ body = build_buffered_body(
+ entries=[
+ MoltchatBufferedEntry(raw_body="a", author="u1", sender_name="Alice"),
+ MoltchatBufferedEntry(raw_body="b", author="u2", sender_username="bot"),
+ ],
+ is_group=True,
+ )
+ assert "Alice: a" in body
+ assert "bot: b" in body