From 20b8a2fc58dd4550c487d9b6f88e38b57a55b4bf Mon Sep 17 00:00:00 2001 From: tjb-tech Date: Mon, 9 Feb 2026 08:46:47 +0000 Subject: [PATCH] feat(channels): add Moltchat websocket channel with polling fallback --- README.md | 49 +- nanobot/channels/__init__.py | 3 +- nanobot/channels/manager.py | 12 + nanobot/channels/moltchat.py | 1227 ++++++++++++++++++++++++++++++++ nanobot/cli/commands.py | 18 + nanobot/config/schema.py | 37 + pyproject.toml | 2 + tests/test_moltchat_channel.py | 115 +++ 8 files changed, 1459 insertions(+), 4 deletions(-) create mode 100644 nanobot/channels/moltchat.py create mode 100644 tests/test_moltchat_channel.py diff --git a/README.md b/README.md index 8a15892..74c24d9 100644 --- a/README.md +++ b/README.md @@ -164,7 +164,7 @@ nanobot agent -m "Hello from my local LLM!" ## 💬 Chat Apps -Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime, anywhere. +Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, or Moltchat — anytime, anywhere. | Channel | Setup | |---------|-------| @@ -172,6 +172,7 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime, | **Discord** | Easy (bot token + intents) | | **WhatsApp** | Medium (scan QR) | | **Feishu** | Medium (app credentials) | +| **Moltchat** | Medium (claw token + websocket) |
Telegram (Recommended) @@ -205,6 +206,48 @@ nanobot gateway
+
+Moltchat (Claw IM) + +Uses **Socket.IO WebSocket** by default, with HTTP polling fallback. + +**1. Prepare credentials** +- `clawToken`: Claw API token +- `agentUserId`: your bot user id +- Optional: `sessions`/`panels` with `["*"]` for auto-discovery + +**2. Configure** + +```json +{ + "channels": { + "moltchat": { + "enabled": true, + "baseUrl": "https://mochat.io", + "socketUrl": "https://mochat.io", + "socketPath": "/socket.io", + "clawToken": "claw_xxx", + "agentUserId": "69820107a785110aea8b1069", + "sessions": ["*"], + "panels": ["*"], + "replyDelayMode": "non-mention", + "replyDelayMs": 120000 + } + } +} +``` + +**3. Run** + +```bash +nanobot gateway +``` + +> [!TIP] +> Keep `clawToken` private. It should only be sent in `X-Claw-Token` header to your Moltchat API endpoint. + +
+
Discord @@ -413,7 +456,7 @@ docker run -v ~/.nanobot:/root/.nanobot --rm nanobot onboard # Edit config on host to add API keys vim ~/.nanobot/config.json -# Run gateway (connects to Telegram/WhatsApp) +# Run gateway (connects to enabled channels, e.g. Telegram/Discord/Moltchat) docker run -v ~/.nanobot:/root/.nanobot -p 18790:18790 nanobot gateway # Or run a single command @@ -433,7 +476,7 @@ nanobot/ │ ├── subagent.py # Background task execution │ └── tools/ # Built-in tools (incl. spawn) ├── skills/ # 🎯 Bundled skills (github, weather, tmux...) -├── channels/ # 📱 WhatsApp integration +├── channels/ # 📱 Chat channel integrations ├── bus/ # 🚌 Message routing ├── cron/ # ⏰ Scheduled tasks ├── heartbeat/ # 💓 Proactive wake-up diff --git a/nanobot/channels/__init__.py b/nanobot/channels/__init__.py index 588169d..4d77063 100644 --- a/nanobot/channels/__init__.py +++ b/nanobot/channels/__init__.py @@ -2,5 +2,6 @@ from nanobot.channels.base import BaseChannel from nanobot.channels.manager import ChannelManager +from nanobot.channels.moltchat import MoltchatChannel -__all__ = ["BaseChannel", "ChannelManager"] +__all__ = ["BaseChannel", "ChannelManager", "MoltchatChannel"] diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index 64ced48..11690ef 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -77,6 +77,18 @@ class ChannelManager: logger.info("Feishu channel enabled") except ImportError as e: logger.warning(f"Feishu channel not available: {e}") + + # Moltchat channel + if self.config.channels.moltchat.enabled: + try: + from nanobot.channels.moltchat import MoltchatChannel + + self.channels["moltchat"] = MoltchatChannel( + self.config.channels.moltchat, self.bus + ) + logger.info("Moltchat channel enabled") + except ImportError as e: + logger.warning(f"Moltchat channel not available: {e}") async def start_all(self) -> None: """Start WhatsApp channel and the outbound dispatcher.""" diff --git a/nanobot/channels/moltchat.py b/nanobot/channels/moltchat.py new file mode 100644 index 0000000..cc590d4 --- /dev/null +++ b/nanobot/channels/moltchat.py @@ -0,0 +1,1227 @@ +"""Moltchat channel implementation using Socket.IO with HTTP polling fallback.""" + +from __future__ import annotations + +import asyncio +import json +from collections import deque +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any + +import httpx +from loguru import logger + +from nanobot.bus.events import OutboundMessage +from nanobot.bus.queue import MessageBus +from nanobot.channels.base import BaseChannel +from nanobot.config.schema import MoltchatConfig +from nanobot.utils.helpers import get_data_path + +try: + import socketio + + SOCKETIO_AVAILABLE = True +except ImportError: + socketio = None + SOCKETIO_AVAILABLE = False + +try: + import msgpack # noqa: F401 + + MSGPACK_AVAILABLE = True +except ImportError: + MSGPACK_AVAILABLE = False + + +MAX_SEEN_MESSAGE_IDS = 2000 +CURSOR_SAVE_DEBOUNCE_S = 0.5 + + +@dataclass +class MoltchatBufferedEntry: + """Buffered inbound entry for delayed dispatch.""" + + raw_body: str + author: str + sender_name: str = "" + sender_username: str = "" + timestamp: int | None = None + message_id: str = "" + group_id: str = "" + + +@dataclass +class DelayState: + """Per-target delayed message state.""" + + entries: list[MoltchatBufferedEntry] = field(default_factory=list) + lock: asyncio.Lock = field(default_factory=asyncio.Lock) + timer: asyncio.Task | None = None + + +@dataclass +class MoltchatTarget: + """Outbound target resolution result.""" + + id: str + is_panel: bool + + +def normalize_moltchat_content(content: Any) -> str: + """Normalize content payload to text.""" + if isinstance(content, str): + return content.strip() + if content is None: + return "" + try: + return json.dumps(content, ensure_ascii=False) + except TypeError: + return str(content) + + +def resolve_moltchat_target(raw: str) -> MoltchatTarget: + """Resolve id and target kind from user-provided target string.""" + trimmed = (raw or "").strip() + if not trimmed: + return MoltchatTarget(id="", is_panel=False) + + lowered = trimmed.lower() + cleaned = trimmed + forced_panel = False + + prefixes = ["moltchat:", "mochat:", "group:", "channel:", "panel:"] + for prefix in prefixes: + if lowered.startswith(prefix): + cleaned = trimmed[len(prefix) :].strip() + if prefix in {"group:", "channel:", "panel:"}: + forced_panel = True + break + + if not cleaned: + return MoltchatTarget(id="", is_panel=False) + + return MoltchatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_")) + + +def extract_mention_ids(value: Any) -> list[str]: + """Extract mention ids from heterogeneous mention payload.""" + if not isinstance(value, list): + return [] + + ids: list[str] = [] + for item in value: + if isinstance(item, str): + text = item.strip() + if text: + ids.append(text) + continue + + if not isinstance(item, dict): + continue + + for key in ("id", "userId", "_id"): + candidate = item.get(key) + if isinstance(candidate, str) and candidate.strip(): + ids.append(candidate.strip()) + break + + return ids + + +def resolve_was_mentioned(payload: dict[str, Any], agent_user_id: str) -> bool: + """Resolve mention state from payload metadata and text fallback.""" + meta = payload.get("meta") + if isinstance(meta, dict): + if meta.get("mentioned") is True or meta.get("wasMentioned") is True: + return True + + for field in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"): + ids = extract_mention_ids(meta.get(field)) + if agent_user_id and agent_user_id in ids: + return True + + if not agent_user_id: + return False + + content = payload.get("content") + if not isinstance(content, str) or not content: + return False + + return f"<@{agent_user_id}>" in content or f"@{agent_user_id}" in content + + +def resolve_require_mention( + config: MoltchatConfig, + session_id: str, + group_id: str, +) -> bool: + """Resolve mention requirement for group/panel conversations.""" + groups = config.groups or {} + if group_id and group_id in groups: + return bool(groups[group_id].require_mention) + if session_id in groups: + return bool(groups[session_id].require_mention) + if "*" in groups: + return bool(groups["*"].require_mention) + return bool(config.mention.require_in_groups) + + +def build_buffered_body(entries: list[MoltchatBufferedEntry], is_group: bool) -> str: + """Build text body from one or more buffered entries.""" + if not entries: + return "" + + if len(entries) == 1: + return entries[0].raw_body + + lines: list[str] = [] + for entry in entries: + body = entry.raw_body + if not body: + continue + if is_group: + label = entry.sender_name.strip() or entry.sender_username.strip() or entry.author + if label: + lines.append(f"{label}: {body}") + continue + lines.append(body) + + return "\n".join(lines).strip() + + +def parse_timestamp(value: Any) -> int | None: + """Parse event timestamp to epoch milliseconds.""" + if not isinstance(value, str) or not value.strip(): + return None + try: + return int(datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() * 1000) + except ValueError: + return None + + +class MoltchatChannel(BaseChannel): + """Moltchat channel using socket.io with fallback polling workers.""" + + name = "moltchat" + + def __init__(self, config: MoltchatConfig, bus: MessageBus): + super().__init__(config, bus) + self.config: MoltchatConfig = config + self._http: httpx.AsyncClient | None = None + self._socket: Any = None + self._ws_connected = False + self._ws_ready = False + + self._state_dir = get_data_path() / "moltchat" + self._cursor_path = self._state_dir / "session_cursors.json" + self._session_cursor: dict[str, int] = {} + self._cursor_save_task: asyncio.Task | None = None + + self._session_set: set[str] = set() + self._panel_set: set[str] = set() + self._auto_discover_sessions = False + self._auto_discover_panels = False + + self._cold_sessions: set[str] = set() + self._session_by_converse: dict[str, str] = {} + + self._seen_set: dict[str, set[str]] = {} + self._seen_queue: dict[str, deque[str]] = {} + + self._delay_states: dict[str, DelayState] = {} + + self._fallback_mode = False + self._session_fallback_tasks: dict[str, asyncio.Task] = {} + self._panel_fallback_tasks: dict[str, asyncio.Task] = {} + self._refresh_task: asyncio.Task | None = None + + self._target_locks: dict[str, asyncio.Lock] = {} + + async def start(self) -> None: + """Start Moltchat channel workers and websocket connection.""" + if not self.config.claw_token: + logger.error("Moltchat claw_token not configured") + return + + self._running = True + self._http = httpx.AsyncClient(timeout=30.0) + + self._state_dir.mkdir(parents=True, exist_ok=True) + await self._load_session_cursors() + self._seed_targets_from_config() + + await self._refresh_targets(subscribe_new=False) + + websocket_started = await self._start_socket_client() + if not websocket_started: + await self._ensure_fallback_workers() + + self._refresh_task = asyncio.create_task(self._refresh_loop()) + + while self._running: + await asyncio.sleep(1) + + async def stop(self) -> None: + """Stop all workers and clean up resources.""" + self._running = False + + if self._refresh_task: + self._refresh_task.cancel() + self._refresh_task = None + + await self._stop_fallback_workers() + await self._cancel_delay_timers() + + if self._socket: + try: + await self._socket.disconnect() + except Exception: + pass + self._socket = None + + if self._cursor_save_task: + self._cursor_save_task.cancel() + self._cursor_save_task = None + + await self._save_session_cursors() + + if self._http: + await self._http.aclose() + self._http = None + + self._ws_connected = False + self._ws_ready = False + + async def send(self, msg: OutboundMessage) -> None: + """Send outbound message to session or panel.""" + if not self.config.claw_token: + logger.warning("Moltchat claw_token missing, skip send") + return + + content_parts = [msg.content.strip()] if msg.content and msg.content.strip() else [] + if msg.media: + content_parts.extend([m for m in msg.media if isinstance(m, str) and m.strip()]) + content = "\n".join(content_parts).strip() + if not content: + return + + target = resolve_moltchat_target(msg.chat_id) + if not target.id: + logger.warning("Moltchat outbound target is empty") + return + + is_panel = target.is_panel or target.id in self._panel_set + if target.id.startswith("session_"): + is_panel = False + + try: + if is_panel: + await self._send_panel_message( + panel_id=target.id, + content=content, + reply_to=msg.reply_to, + group_id=self._read_group_id(msg.metadata), + ) + else: + await self._send_session_message( + session_id=target.id, + content=content, + reply_to=msg.reply_to, + ) + except Exception as e: + logger.error(f"Failed to send Moltchat message: {e}") + + def _seed_targets_from_config(self) -> None: + sessions, self._auto_discover_sessions = self._normalize_id_list(self.config.sessions) + panels, self._auto_discover_panels = self._normalize_id_list(self.config.panels) + + self._session_set.update(sessions) + self._panel_set.update(panels) + + for session_id in sessions: + if session_id not in self._session_cursor: + self._cold_sessions.add(session_id) + + def _normalize_id_list(self, values: list[str]) -> tuple[list[str], bool]: + cleaned = [str(v).strip() for v in values if str(v).strip()] + has_wildcard = "*" in cleaned + ids = sorted({v for v in cleaned if v != "*"}) + return ids, has_wildcard + + async def _start_socket_client(self) -> bool: + if not SOCKETIO_AVAILABLE: + logger.warning("python-socketio not installed, Moltchat using polling fallback") + return False + + serializer = "default" + if not self.config.socket_disable_msgpack: + if MSGPACK_AVAILABLE: + serializer = "msgpack" + else: + logger.warning( + "msgpack is not installed but socket_disable_msgpack=false; " + "trying JSON serializer" + ) + + reconnect_attempts = None + if self.config.max_retry_attempts > 0: + reconnect_attempts = self.config.max_retry_attempts + + client = socketio.AsyncClient( + reconnection=True, + reconnection_attempts=reconnect_attempts, + reconnection_delay=max(0.1, self.config.socket_reconnect_delay_ms / 1000.0), + reconnection_delay_max=max( + 0.1, + self.config.socket_max_reconnect_delay_ms / 1000.0, + ), + logger=False, + engineio_logger=False, + serializer=serializer, + ) + + @client.event + async def connect() -> None: + self._ws_connected = True + self._ws_ready = False + logger.info("Moltchat websocket connected") + + subscribed = await self._subscribe_all() + self._ws_ready = subscribed + if subscribed: + await self._stop_fallback_workers() + else: + await self._ensure_fallback_workers() + + @client.event + async def disconnect() -> None: + if not self._running: + return + self._ws_connected = False + self._ws_ready = False + logger.warning("Moltchat websocket disconnected") + await self._ensure_fallback_workers() + + @client.event + async def connect_error(data: Any) -> None: + message = str(data) + logger.error(f"Moltchat websocket connect error: {message}") + + @client.on("claw.session.events") + async def on_session_events(payload: dict[str, Any]) -> None: + await self._handle_watch_payload(payload, target_kind="session") + + @client.on("claw.panel.events") + async def on_panel_events(payload: dict[str, Any]) -> None: + await self._handle_watch_payload(payload, target_kind="panel") + + for event_name in ( + "notify:chat.inbox.append", + "notify:chat.message.add", + "notify:chat.message.update", + "notify:chat.message.recall", + "notify:chat.message.delete", + ): + client.on(event_name, self._build_notify_handler(event_name)) + + socket_url = (self.config.socket_url or self.config.base_url).strip().rstrip("/") + socket_path = (self.config.socket_path or "/socket.io").strip() + if socket_path.startswith("/"): + socket_path = socket_path[1:] + + try: + self._socket = client + await client.connect( + socket_url, + transports=["websocket"], + socketio_path=socket_path, + auth={"token": self.config.claw_token}, + wait_timeout=max(1.0, self.config.socket_connect_timeout_ms / 1000.0), + ) + return True + except Exception as e: + logger.error(f"Failed to connect Moltchat websocket: {e}") + try: + await client.disconnect() + except Exception: + pass + self._socket = None + return False + + def _build_notify_handler(self, event_name: str): + async def handler(payload: Any) -> None: + if event_name == "notify:chat.inbox.append": + await self._handle_notify_inbox_append(payload) + return + + if event_name.startswith("notify:chat.message."): + await self._handle_notify_chat_message(payload) + + return handler + + async def _subscribe_all(self) -> bool: + sessions_ok = await self._subscribe_sessions(sorted(self._session_set)) + panels_ok = await self._subscribe_panels(sorted(self._panel_set)) + + if self._auto_discover_sessions or self._auto_discover_panels: + await self._refresh_targets(subscribe_new=True) + + return sessions_ok and panels_ok + + async def _subscribe_sessions(self, session_ids: list[str]) -> bool: + if not session_ids: + return True + + for session_id in session_ids: + if session_id not in self._session_cursor: + self._cold_sessions.add(session_id) + + ack = await self._socket_call( + "com.claw.im.subscribeSessions", + { + "sessionIds": session_ids, + "cursors": self._session_cursor, + "limit": self.config.watch_limit, + }, + ) + if not ack.get("result"): + logger.error(f"Moltchat subscribeSessions failed: {ack.get('message', 'unknown error')}") + return False + + data = ack.get("data") + items: list[dict[str, Any]] = [] + if isinstance(data, list): + items = [item for item in data if isinstance(item, dict)] + elif isinstance(data, dict): + sessions = data.get("sessions") + if isinstance(sessions, list): + items = [item for item in sessions if isinstance(item, dict)] + elif "sessionId" in data: + items = [data] + + for payload in items: + await self._handle_watch_payload(payload, target_kind="session") + + return True + + async def _subscribe_panels(self, panel_ids: list[str]) -> bool: + if not self._auto_discover_panels and not panel_ids: + return True + + ack = await self._socket_call( + "com.claw.im.subscribePanels", + { + "panelIds": panel_ids, + }, + ) + if not ack.get("result"): + logger.error(f"Moltchat subscribePanels failed: {ack.get('message', 'unknown error')}") + return False + + return True + + async def _socket_call(self, event_name: str, payload: dict[str, Any]) -> dict[str, Any]: + if not self._socket: + return {"result": False, "message": "socket not connected"} + + try: + raw = await self._socket.call(event_name, payload, timeout=10) + except Exception as e: + return {"result": False, "message": str(e)} + + if isinstance(raw, dict): + return raw + + return {"result": True, "data": raw} + + async def _refresh_loop(self) -> None: + interval_s = max(1.0, self.config.refresh_interval_ms / 1000.0) + + while self._running: + await asyncio.sleep(interval_s) + + try: + await self._refresh_targets(subscribe_new=self._ws_ready) + except Exception as e: + logger.warning(f"Moltchat refresh failed: {e}") + + if self._fallback_mode: + await self._ensure_fallback_workers() + + async def _refresh_targets(self, subscribe_new: bool) -> None: + if self._auto_discover_sessions: + await self._refresh_sessions_directory(subscribe_new=subscribe_new) + + if self._auto_discover_panels: + await self._refresh_panels(subscribe_new=subscribe_new) + + async def _refresh_sessions_directory(self, subscribe_new: bool) -> None: + try: + response = await self._list_sessions() + except Exception as e: + logger.warning(f"Moltchat listSessions failed: {e}") + return + + sessions = response.get("sessions") + if not isinstance(sessions, list): + return + + new_sessions: list[str] = [] + for session in sessions: + if not isinstance(session, dict): + continue + + session_id = str(session.get("sessionId") or "").strip() + if not session_id: + continue + + if session_id not in self._session_set: + self._session_set.add(session_id) + new_sessions.append(session_id) + if session_id not in self._session_cursor: + self._cold_sessions.add(session_id) + + converse_id = str(session.get("converseId") or "").strip() + if converse_id: + self._session_by_converse[converse_id] = session_id + + if not new_sessions: + return + + if self._ws_ready and subscribe_new: + await self._subscribe_sessions(new_sessions) + + if self._fallback_mode: + await self._ensure_fallback_workers() + + async def _refresh_panels(self, subscribe_new: bool) -> None: + try: + response = await self._get_workspace_group() + except Exception as e: + logger.warning(f"Moltchat getWorkspaceGroup failed: {e}") + return + + raw_panels = response.get("panels") + if not isinstance(raw_panels, list): + return + + new_panels: list[str] = [] + for panel in raw_panels: + if not isinstance(panel, dict): + continue + + panel_type = panel.get("type") + if isinstance(panel_type, int) and panel_type != 0: + continue + + panel_id = str(panel.get("id") or panel.get("_id") or "").strip() + if not panel_id: + continue + + if panel_id not in self._panel_set: + self._panel_set.add(panel_id) + new_panels.append(panel_id) + + if not new_panels: + return + + if self._ws_ready and subscribe_new: + await self._subscribe_panels(new_panels) + + if self._fallback_mode: + await self._ensure_fallback_workers() + + async def _ensure_fallback_workers(self) -> None: + if not self._running: + return + + self._fallback_mode = True + + for session_id in sorted(self._session_set): + task = self._session_fallback_tasks.get(session_id) + if task and not task.done(): + continue + self._session_fallback_tasks[session_id] = asyncio.create_task( + self._session_watch_worker(session_id) + ) + + for panel_id in sorted(self._panel_set): + task = self._panel_fallback_tasks.get(panel_id) + if task and not task.done(): + continue + self._panel_fallback_tasks[panel_id] = asyncio.create_task( + self._panel_poll_worker(panel_id) + ) + + async def _stop_fallback_workers(self) -> None: + self._fallback_mode = False + + tasks = [ + *self._session_fallback_tasks.values(), + *self._panel_fallback_tasks.values(), + ] + for task in tasks: + task.cancel() + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + self._session_fallback_tasks.clear() + self._panel_fallback_tasks.clear() + + async def _session_watch_worker(self, session_id: str) -> None: + while self._running and self._fallback_mode: + try: + payload = await self._watch_session( + session_id=session_id, + cursor=self._session_cursor.get(session_id, 0), + timeout_ms=self.config.watch_timeout_ms, + limit=self.config.watch_limit, + ) + await self._handle_watch_payload(payload, target_kind="session") + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"Moltchat watch fallback error ({session_id}): {e}") + await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0)) + + async def _panel_poll_worker(self, panel_id: str) -> None: + sleep_s = max(1.0, self.config.refresh_interval_ms / 1000.0) + + while self._running and self._fallback_mode: + try: + response = await self._list_panel_messages( + panel_id=panel_id, + limit=min(100, max(1, self.config.watch_limit)), + ) + + raw_messages = response.get("messages") + if isinstance(raw_messages, list): + for message in reversed(raw_messages): + if not isinstance(message, dict): + continue + + synthetic_event = { + "type": "message.add", + "timestamp": message.get("createdAt") or datetime.utcnow().isoformat(), + "payload": { + "messageId": str(message.get("messageId") or ""), + "author": str(message.get("author") or ""), + "authorInfo": message.get("authorInfo") if isinstance(message.get("authorInfo"), dict) else {}, + "content": message.get("content"), + "meta": message.get("meta") if isinstance(message.get("meta"), dict) else {}, + "groupId": str(response.get("groupId") or ""), + "converseId": panel_id, + }, + } + await self._process_inbound_event( + target_id=panel_id, + event=synthetic_event, + target_kind="panel", + ) + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"Moltchat panel polling error ({panel_id}): {e}") + + await asyncio.sleep(sleep_s) + + async def _handle_watch_payload( + self, + payload: dict[str, Any], + target_kind: str, + ) -> None: + if not isinstance(payload, dict): + return + + target_id = str(payload.get("sessionId") or "").strip() + if not target_id: + return + + lock = self._target_locks.setdefault(f"{target_kind}:{target_id}", asyncio.Lock()) + async with lock: + previous_cursor = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0 + payload_cursor = payload.get("cursor") + if ( + target_kind == "session" + and isinstance(payload_cursor, int) + and payload_cursor >= 0 + ): + self._mark_session_cursor(target_id, payload_cursor) + + raw_events = payload.get("events") + if not isinstance(raw_events, list): + return + + if target_kind == "session" and target_id in self._cold_sessions: + self._cold_sessions.discard(target_id) + return + + for event in raw_events: + if not isinstance(event, dict): + continue + seq = event.get("seq") + if ( + target_kind == "session" + and isinstance(seq, int) + and seq > self._session_cursor.get(target_id, previous_cursor) + ): + self._mark_session_cursor(target_id, seq) + + if event.get("type") != "message.add": + continue + + await self._process_inbound_event( + target_id=target_id, + event=event, + target_kind=target_kind, + ) + + async def _process_inbound_event( + self, + target_id: str, + event: dict[str, Any], + target_kind: str, + ) -> None: + payload = event.get("payload") + if not isinstance(payload, dict): + return + + author = str(payload.get("author") or "").strip() + if not author: + return + + if self.config.agent_user_id and author == self.config.agent_user_id: + return + + if not self.is_allowed(author): + return + + message_id = str(payload.get("messageId") or "").strip() + seen_key = f"{target_kind}:{target_id}" + if message_id and self._remember_message_id(seen_key, message_id): + return + + raw_body = normalize_moltchat_content(payload.get("content")) + if not raw_body: + raw_body = "[empty message]" + + author_info = payload.get("authorInfo") if isinstance(payload.get("authorInfo"), dict) else {} + sender_name = str(author_info.get("nickname") or author_info.get("email") or "").strip() + sender_username = str(author_info.get("agentId") or "").strip() + + group_id = str(payload.get("groupId") or "").strip() + is_group = bool(group_id) + was_mentioned = resolve_was_mentioned(payload, self.config.agent_user_id) + + require_mention = ( + target_kind == "panel" + and is_group + and resolve_require_mention(self.config, target_id, group_id) + ) + + use_delay = target_kind == "panel" and self.config.reply_delay_mode == "non-mention" + + if require_mention and not was_mentioned and not use_delay: + return + + entry = MoltchatBufferedEntry( + raw_body=raw_body, + author=author, + sender_name=sender_name, + sender_username=sender_username, + timestamp=parse_timestamp(event.get("timestamp")), + message_id=message_id, + group_id=group_id, + ) + + if use_delay: + delay_key = f"{target_kind}:{target_id}" + if was_mentioned: + await self._flush_delayed_entries( + key=delay_key, + target_id=target_id, + target_kind=target_kind, + reason="mention", + entry=entry, + ) + else: + await self._enqueue_delayed_entry( + key=delay_key, + target_id=target_id, + target_kind=target_kind, + entry=entry, + ) + return + + await self._dispatch_entries( + target_id=target_id, + target_kind=target_kind, + entries=[entry], + was_mentioned=was_mentioned, + ) + + def _remember_message_id(self, key: str, message_id: str) -> bool: + seen_set = self._seen_set.setdefault(key, set()) + seen_queue = self._seen_queue.setdefault(key, deque()) + + if message_id in seen_set: + return True + + seen_set.add(message_id) + seen_queue.append(message_id) + + while len(seen_queue) > MAX_SEEN_MESSAGE_IDS: + removed = seen_queue.popleft() + seen_set.discard(removed) + + return False + + async def _enqueue_delayed_entry( + self, + key: str, + target_id: str, + target_kind: str, + entry: MoltchatBufferedEntry, + ) -> None: + state = self._delay_states.setdefault(key, DelayState()) + + async with state.lock: + state.entries.append(entry) + if state.timer: + state.timer.cancel() + + state.timer = asyncio.create_task( + self._delay_flush_after(key, target_id, target_kind) + ) + + async def _delay_flush_after(self, key: str, target_id: str, target_kind: str) -> None: + await asyncio.sleep(max(0, self.config.reply_delay_ms) / 1000.0) + await self._flush_delayed_entries( + key=key, + target_id=target_id, + target_kind=target_kind, + reason="timer", + entry=None, + ) + + async def _flush_delayed_entries( + self, + key: str, + target_id: str, + target_kind: str, + reason: str, + entry: MoltchatBufferedEntry | None, + ) -> None: + state = self._delay_states.setdefault(key, DelayState()) + + async with state.lock: + if entry: + state.entries.append(entry) + + current = asyncio.current_task() + if state.timer and state.timer is not current: + state.timer.cancel() + state.timer = None + elif state.timer is current: + state.timer = None + + entries = state.entries[:] + state.entries.clear() + + if not entries: + return + + await self._dispatch_entries( + target_id=target_id, + target_kind=target_kind, + entries=entries, + was_mentioned=(reason == "mention"), + ) + + async def _dispatch_entries( + self, + target_id: str, + target_kind: str, + entries: list[MoltchatBufferedEntry], + was_mentioned: bool, + ) -> None: + if not entries: + return + + is_group = bool(entries[-1].group_id) + body = build_buffered_body(entries, is_group) + if not body: + body = "[empty message]" + + last = entries[-1] + metadata = { + "message_id": last.message_id, + "timestamp": last.timestamp, + "is_group": is_group, + "group_id": last.group_id, + "sender_name": last.sender_name, + "sender_username": last.sender_username, + "target_kind": target_kind, + "was_mentioned": was_mentioned, + "buffered_count": len(entries), + } + + await self._handle_message( + sender_id=last.author, + chat_id=target_id, + content=body, + metadata=metadata, + ) + + async def _cancel_delay_timers(self) -> None: + for state in self._delay_states.values(): + if state.timer: + state.timer.cancel() + state.timer = None + self._delay_states.clear() + + async def _handle_notify_chat_message(self, payload: Any) -> None: + if not isinstance(payload, dict): + return + + group_id = str(payload.get("groupId") or "").strip() + panel_id = str(payload.get("converseId") or payload.get("panelId") or "").strip() + if not group_id or not panel_id: + return + + if self._panel_set and panel_id not in self._panel_set: + return + + synthetic_event = { + "type": "message.add", + "timestamp": payload.get("createdAt") or datetime.utcnow().isoformat(), + "payload": { + "messageId": str(payload.get("_id") or payload.get("messageId") or ""), + "author": str(payload.get("author") or ""), + "authorInfo": payload.get("authorInfo") if isinstance(payload.get("authorInfo"), dict) else {}, + "content": payload.get("content"), + "meta": payload.get("meta") if isinstance(payload.get("meta"), dict) else {}, + "groupId": group_id, + "converseId": panel_id, + }, + } + await self._process_inbound_event( + target_id=panel_id, + event=synthetic_event, + target_kind="panel", + ) + + async def _handle_notify_inbox_append(self, payload: Any) -> None: + if not isinstance(payload, dict): + return + + if payload.get("type") != "message": + return + + detail = payload.get("payload") + if not isinstance(detail, dict): + return + + group_id = str(detail.get("groupId") or "").strip() + if group_id: + return + + converse_id = str(detail.get("converseId") or "").strip() + if not converse_id: + return + + session_id = self._session_by_converse.get(converse_id) + if not session_id: + await self._refresh_sessions_directory(subscribe_new=self._ws_ready) + session_id = self._session_by_converse.get(converse_id) + if not session_id: + return + + message_id = str(detail.get("messageId") or payload.get("_id") or "").strip() + author = str(detail.get("messageAuthor") or "").strip() + content = str(detail.get("messagePlainContent") or detail.get("messageSnippet") or "").strip() + + synthetic_event = { + "type": "message.add", + "timestamp": payload.get("createdAt") or datetime.utcnow().isoformat(), + "payload": { + "messageId": message_id, + "author": author, + "content": content, + "meta": { + "source": "notify:chat.inbox.append", + "converseId": converse_id, + }, + "converseId": converse_id, + }, + } + + await self._process_inbound_event( + target_id=session_id, + event=synthetic_event, + target_kind="session", + ) + + def _mark_session_cursor(self, session_id: str, cursor: int) -> None: + if cursor < 0: + return + + previous = self._session_cursor.get(session_id, 0) + if cursor < previous: + return + + self._session_cursor[session_id] = cursor + self._schedule_cursor_save() + + def _schedule_cursor_save(self) -> None: + if self._cursor_save_task and not self._cursor_save_task.done(): + return + + self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced()) + + async def _save_cursor_debounced(self) -> None: + await asyncio.sleep(CURSOR_SAVE_DEBOUNCE_S) + await self._save_session_cursors() + + async def _load_session_cursors(self) -> None: + if not self._cursor_path.exists(): + return + + try: + data = json.loads(self._cursor_path.read_text("utf-8")) + except Exception as e: + logger.warning(f"Failed to read Moltchat cursor file: {e}") + return + + cursors = data.get("cursors") if isinstance(data, dict) else None + if not isinstance(cursors, dict): + return + + for session_id, cursor in cursors.items(): + if isinstance(session_id, str) and isinstance(cursor, int) and cursor >= 0: + self._session_cursor[session_id] = cursor + + async def _save_session_cursors(self) -> None: + payload = { + "schemaVersion": 1, + "updatedAt": datetime.utcnow().isoformat(), + "cursors": self._session_cursor, + } + + try: + self._state_dir.mkdir(parents=True, exist_ok=True) + self._cursor_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", "utf-8") + except Exception as e: + logger.warning(f"Failed to save Moltchat cursor file: {e}") + + def _base_url(self) -> str: + return self.config.base_url.strip().rstrip("/") + + async def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]: + if not self._http: + raise RuntimeError("Moltchat HTTP client not initialized") + + url = f"{self._base_url()}{path}" + response = await self._http.post( + url, + headers={ + "Content-Type": "application/json", + "X-Claw-Token": self.config.claw_token, + }, + json=payload, + ) + + text = response.text + if not response.is_success: + raise RuntimeError(f"Moltchat HTTP {response.status_code}: {text[:200]}") + + parsed: Any + try: + parsed = response.json() + except Exception: + parsed = text + + if isinstance(parsed, dict) and isinstance(parsed.get("code"), int): + if parsed["code"] != 200: + message = str(parsed.get("message") or parsed.get("name") or "request failed") + raise RuntimeError(f"Moltchat API error: {message} (code={parsed['code']})") + data = parsed.get("data") + return data if isinstance(data, dict) else {} + + if isinstance(parsed, dict): + return parsed + + return {} + + async def _watch_session( + self, + session_id: str, + cursor: int, + timeout_ms: int, + limit: int, + ) -> dict[str, Any]: + return await self._post_json( + "/api/claw/sessions/watch", + { + "sessionId": session_id, + "cursor": cursor, + "timeoutMs": timeout_ms, + "limit": limit, + }, + ) + + async def _send_session_message( + self, + session_id: str, + content: str, + reply_to: str | None, + ) -> dict[str, Any]: + payload = { + "sessionId": session_id, + "content": content, + } + if reply_to: + payload["replyTo"] = reply_to + return await self._post_json("/api/claw/sessions/send", payload) + + async def _send_panel_message( + self, + panel_id: str, + content: str, + reply_to: str | None, + group_id: str | None, + ) -> dict[str, Any]: + payload = { + "panelId": panel_id, + "content": content, + } + if reply_to: + payload["replyTo"] = reply_to + if group_id: + payload["groupId"] = group_id + return await self._post_json("/api/claw/groups/panels/send", payload) + + async def _list_sessions(self) -> dict[str, Any]: + return await self._post_json("/api/claw/sessions/list", {}) + + async def _get_workspace_group(self) -> dict[str, Any]: + return await self._post_json("/api/claw/groups/get", {}) + + async def _list_panel_messages(self, panel_id: str, limit: int) -> dict[str, Any]: + return await self._post_json( + "/api/claw/groups/panels/messages", + { + "panelId": panel_id, + "limit": limit, + }, + ) + + def _read_group_id(self, metadata: dict[str, Any]) -> str | None: + if not isinstance(metadata, dict): + return None + value = metadata.get("group_id") or metadata.get("groupId") + if isinstance(value, str) and value.strip(): + return value.strip() + return None diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 19e62e9..2039f82 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -366,6 +366,24 @@ def channels_status(): "✓" if dc.enabled else "✗", dc.gateway_url ) + + # Feishu + fs = config.channels.feishu + fs_config = f"app_id: {fs.app_id[:10]}..." if fs.app_id else "[dim]not configured[/dim]" + table.add_row( + "Feishu", + "✓" if fs.enabled else "✗", + fs_config + ) + + # Moltchat + mc = config.channels.moltchat + mc_base = mc.base_url or "[dim]not configured[/dim]" + table.add_row( + "Moltchat", + "✓" if mc.enabled else "✗", + mc_base + ) # Telegram tg = config.channels.telegram diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 7724288..4df4251 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -39,12 +39,49 @@ class DiscordConfig(BaseModel): intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT +class MoltchatMentionConfig(BaseModel): + """Moltchat mention behavior configuration.""" + require_in_groups: bool = False + + +class MoltchatGroupRule(BaseModel): + """Moltchat per-group mention requirement.""" + require_mention: bool = False + + +class MoltchatConfig(BaseModel): + """Moltchat channel configuration.""" + enabled: bool = False + base_url: str = "http://localhost:11000" + socket_url: str = "" + socket_path: str = "/socket.io" + socket_disable_msgpack: bool = False + socket_reconnect_delay_ms: int = 1000 + socket_max_reconnect_delay_ms: int = 10000 + socket_connect_timeout_ms: int = 10000 + refresh_interval_ms: int = 30000 + watch_timeout_ms: int = 25000 + watch_limit: int = 100 + retry_delay_ms: int = 500 + max_retry_attempts: int = 0 # 0 means unlimited retries + claw_token: str = "" + agent_user_id: str = "" + sessions: list[str] = Field(default_factory=list) + panels: list[str] = Field(default_factory=list) + allow_from: list[str] = Field(default_factory=list) + mention: MoltchatMentionConfig = Field(default_factory=MoltchatMentionConfig) + groups: dict[str, MoltchatGroupRule] = Field(default_factory=dict) + reply_delay_mode: str = "non-mention" # off | non-mention + reply_delay_ms: int = 120000 + + class ChannelsConfig(BaseModel): """Configuration for chat channels.""" whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig) telegram: TelegramConfig = Field(default_factory=TelegramConfig) discord: DiscordConfig = Field(default_factory=DiscordConfig) feishu: FeishuConfig = Field(default_factory=FeishuConfig) + moltchat: MoltchatConfig = Field(default_factory=MoltchatConfig) class AgentDefaults(BaseModel): diff --git a/pyproject.toml b/pyproject.toml index 2a952a1..81d38b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ dependencies = [ "croniter>=2.0.0", "python-telegram-bot>=21.0", "lark-oapi>=1.0.0", + "python-socketio>=5.11.0", + "msgpack>=1.0.8", ] [project.optional-dependencies] diff --git a/tests/test_moltchat_channel.py b/tests/test_moltchat_channel.py new file mode 100644 index 0000000..1f65a68 --- /dev/null +++ b/tests/test_moltchat_channel.py @@ -0,0 +1,115 @@ +import pytest + +from nanobot.bus.queue import MessageBus +from nanobot.channels.moltchat import ( + MoltchatBufferedEntry, + MoltchatChannel, + build_buffered_body, + resolve_moltchat_target, + resolve_require_mention, + resolve_was_mentioned, +) +from nanobot.config.schema import MoltchatConfig, MoltchatGroupRule, MoltchatMentionConfig + + +def test_resolve_moltchat_target_prefixes() -> None: + t = resolve_moltchat_target("panel:abc") + assert t.id == "abc" + assert t.is_panel is True + + t = resolve_moltchat_target("session_123") + assert t.id == "session_123" + assert t.is_panel is False + + t = resolve_moltchat_target("mochat:session_456") + assert t.id == "session_456" + assert t.is_panel is False + + +def test_resolve_was_mentioned_from_meta_and_text() -> None: + payload = { + "content": "hello", + "meta": { + "mentionIds": ["bot-1"], + }, + } + assert resolve_was_mentioned(payload, "bot-1") is True + + payload = {"content": "ping <@bot-2>", "meta": {}} + assert resolve_was_mentioned(payload, "bot-2") is True + + +def test_resolve_require_mention_priority() -> None: + cfg = MoltchatConfig( + groups={ + "*": MoltchatGroupRule(require_mention=False), + "group-a": MoltchatGroupRule(require_mention=True), + }, + mention=MoltchatMentionConfig(require_in_groups=False), + ) + + assert resolve_require_mention(cfg, session_id="panel-x", group_id="group-a") is True + assert resolve_require_mention(cfg, session_id="panel-x", group_id="group-b") is False + + +@pytest.mark.asyncio +async def test_delay_buffer_flushes_on_mention() -> None: + bus = MessageBus() + cfg = MoltchatConfig( + enabled=True, + claw_token="token", + agent_user_id="bot", + reply_delay_mode="non-mention", + reply_delay_ms=60_000, + ) + channel = MoltchatChannel(cfg, bus) + + first = { + "type": "message.add", + "timestamp": "2026-02-07T10:00:00Z", + "payload": { + "messageId": "m1", + "author": "user1", + "content": "first", + "groupId": "group-1", + "meta": {}, + }, + } + second = { + "type": "message.add", + "timestamp": "2026-02-07T10:00:01Z", + "payload": { + "messageId": "m2", + "author": "user2", + "content": "hello <@bot>", + "groupId": "group-1", + "meta": {}, + }, + } + + await channel._process_inbound_event(target_id="panel-1", event=first, target_kind="panel") + assert bus.inbound_size == 0 + + await channel._process_inbound_event(target_id="panel-1", event=second, target_kind="panel") + assert bus.inbound_size == 1 + + msg = await bus.consume_inbound() + assert msg.channel == "moltchat" + assert msg.chat_id == "panel-1" + assert "user1: first" in msg.content + assert "user2: hello <@bot>" in msg.content + assert msg.metadata.get("buffered_count") == 2 + + await channel._cancel_delay_timers() + + +def test_build_buffered_body_group_labels() -> None: + body = build_buffered_body( + entries=[ + MoltchatBufferedEntry(raw_body="a", author="u1", sender_name="Alice"), + MoltchatBufferedEntry(raw_body="b", author="u2", sender_username="bot"), + ], + is_group=True, + ) + assert "Alice: a" in body + assert "bot: b" in body