Merge pull request #389 to support MoChat

This commit is contained in:
Re-bin
2026-02-10 07:06:34 +00:00
6 changed files with 1010 additions and 3 deletions

View File

@@ -168,7 +168,7 @@ nanobot agent -m "Hello from my local LLM!"
## 💬 Chat Apps
Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, Slack, Email, or QQ — anytime, anywhere.
Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, Mochat, DingTalk, Slack, Email, or QQ — anytime, anywhere.
| Channel | Setup |
|---------|-------|
@@ -176,6 +176,7 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, Slac
| **Discord** | Easy (bot token + intents) |
| **WhatsApp** | Medium (scan QR) |
| **Feishu** | Medium (app credentials) |
| **Mochat** | Medium (claw token + websocket) |
| **DingTalk** | Medium (app credentials) |
| **Slack** | Medium (bot + app tokens) |
| **Email** | Medium (IMAP/SMTP credentials) |
@@ -215,6 +216,48 @@ nanobot gateway
</details>
<details>
<summary><b>Mochat (Claw IM)</b></summary>
Uses **Socket.IO WebSocket** by default, with HTTP polling fallback.
**1. Prepare credentials**
- `clawToken`: Claw API token
- `agentUserId`: your bot user id
- Optional: `sessions`/`panels` with `["*"]` for auto-discovery
**2. Configure**
```json
{
"channels": {
"mochat": {
"enabled": true,
"baseUrl": "https://mochat.io",
"socketUrl": "https://mochat.io",
"socketPath": "/socket.io",
"clawToken": "claw_xxx",
"agentUserId": "6982abcdef",
"sessions": ["*"],
"panels": ["*"],
"replyDelayMode": "non-mention",
"replyDelayMs": 120000
}
}
}
```
**3. Run**
```bash
nanobot gateway
```
> [!TIP]
> Keep `clawToken` private. It should only be sent in `X-Claw-Token` header to your Mochat API endpoint.
</details>
<details>
<summary><b>Discord</b></summary>
@@ -644,7 +687,7 @@ docker run -v ~/.nanobot:/root/.nanobot --rm nanobot onboard
# Edit config on host to add API keys
vim ~/.nanobot/config.json
# Run gateway (connects to Telegram/WhatsApp)
# Run gateway (connects to enabled channels, e.g. Telegram/Discord/Mochat)
docker run -v ~/.nanobot:/root/.nanobot -p 18790:18790 nanobot gateway
# Or run a single command
@@ -664,7 +707,7 @@ nanobot/
│ ├── subagent.py # Background task execution
│ └── tools/ # Built-in tools (incl. spawn)
├── skills/ # 🎯 Bundled skills (github, weather, tmux...)
├── channels/ # 📱 WhatsApp integration
├── channels/ # 📱 Chat channel integrations
├── bus/ # 🚌 Message routing
├── cron/ # ⏰ Scheduled tasks
├── heartbeat/ # 💓 Proactive wake-up

View File

@@ -85,6 +85,18 @@ class ChannelManager:
except ImportError as e:
logger.warning(f"Feishu channel not available: {e}")
# Mochat channel
if self.config.channels.mochat.enabled:
try:
from nanobot.channels.mochat import MochatChannel
self.channels["mochat"] = MochatChannel(
self.config.channels.mochat, self.bus
)
logger.info("Mochat channel enabled")
except ImportError as e:
logger.warning(f"Mochat channel not available: {e}")
# DingTalk channel
if self.config.channels.dingtalk.enabled:
try:

895
nanobot/channels/mochat.py Normal file
View File

@@ -0,0 +1,895 @@
"""Mochat channel implementation using Socket.IO with HTTP polling fallback."""
from __future__ import annotations
import asyncio
import json
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import httpx
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import MochatConfig
from nanobot.utils.helpers import get_data_path
try:
import socketio
SOCKETIO_AVAILABLE = True
except ImportError:
socketio = None
SOCKETIO_AVAILABLE = False
try:
import msgpack # noqa: F401
MSGPACK_AVAILABLE = True
except ImportError:
MSGPACK_AVAILABLE = False
MAX_SEEN_MESSAGE_IDS = 2000
CURSOR_SAVE_DEBOUNCE_S = 0.5
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class MochatBufferedEntry:
"""Buffered inbound entry for delayed dispatch."""
raw_body: str
author: str
sender_name: str = ""
sender_username: str = ""
timestamp: int | None = None
message_id: str = ""
group_id: str = ""
@dataclass
class DelayState:
"""Per-target delayed message state."""
entries: list[MochatBufferedEntry] = field(default_factory=list)
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
timer: asyncio.Task | None = None
@dataclass
class MochatTarget:
"""Outbound target resolution result."""
id: str
is_panel: bool
# ---------------------------------------------------------------------------
# Pure helpers
# ---------------------------------------------------------------------------
def _safe_dict(value: Any) -> dict:
"""Return *value* if it's a dict, else empty dict."""
return value if isinstance(value, dict) else {}
def _str_field(src: dict, *keys: str) -> str:
"""Return the first non-empty str value found for *keys*, stripped."""
for k in keys:
v = src.get(k)
if isinstance(v, str) and v.strip():
return v.strip()
return ""
def _make_synthetic_event(
message_id: str, author: str, content: Any,
meta: Any, group_id: str, converse_id: str,
timestamp: Any = None, *, author_info: Any = None,
) -> dict[str, Any]:
"""Build a synthetic ``message.add`` event dict."""
payload: dict[str, Any] = {
"messageId": message_id, "author": author,
"content": content, "meta": _safe_dict(meta),
"groupId": group_id, "converseId": converse_id,
}
if author_info is not None:
payload["authorInfo"] = _safe_dict(author_info)
return {
"type": "message.add",
"timestamp": timestamp or datetime.utcnow().isoformat(),
"payload": payload,
}
def normalize_mochat_content(content: Any) -> str:
"""Normalize content payload to text."""
if isinstance(content, str):
return content.strip()
if content is None:
return ""
try:
return json.dumps(content, ensure_ascii=False)
except TypeError:
return str(content)
def resolve_mochat_target(raw: str) -> MochatTarget:
"""Resolve id and target kind from user-provided target string."""
trimmed = (raw or "").strip()
if not trimmed:
return MochatTarget(id="", is_panel=False)
lowered = trimmed.lower()
cleaned, forced_panel = trimmed, False
for prefix in ("mochat:", "group:", "channel:", "panel:"):
if lowered.startswith(prefix):
cleaned = trimmed[len(prefix):].strip()
forced_panel = prefix in {"group:", "channel:", "panel:"}
break
if not cleaned:
return MochatTarget(id="", is_panel=False)
return MochatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_"))
def extract_mention_ids(value: Any) -> list[str]:
"""Extract mention ids from heterogeneous mention payload."""
if not isinstance(value, list):
return []
ids: list[str] = []
for item in value:
if isinstance(item, str):
if item.strip():
ids.append(item.strip())
elif isinstance(item, dict):
for key in ("id", "userId", "_id"):
candidate = item.get(key)
if isinstance(candidate, str) and candidate.strip():
ids.append(candidate.strip())
break
return ids
def resolve_was_mentioned(payload: dict[str, Any], agent_user_id: str) -> bool:
"""Resolve mention state from payload metadata and text fallback."""
meta = payload.get("meta")
if isinstance(meta, dict):
if meta.get("mentioned") is True or meta.get("wasMentioned") is True:
return True
for f in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"):
if agent_user_id and agent_user_id in extract_mention_ids(meta.get(f)):
return True
if not agent_user_id:
return False
content = payload.get("content")
if not isinstance(content, str) or not content:
return False
return f"<@{agent_user_id}>" in content or f"@{agent_user_id}" in content
def resolve_require_mention(config: MochatConfig, session_id: str, group_id: str) -> bool:
"""Resolve mention requirement for group/panel conversations."""
groups = config.groups or {}
for key in (group_id, session_id, "*"):
if key and key in groups:
return bool(groups[key].require_mention)
return bool(config.mention.require_in_groups)
def build_buffered_body(entries: list[MochatBufferedEntry], is_group: bool) -> str:
"""Build text body from one or more buffered entries."""
if not entries:
return ""
if len(entries) == 1:
return entries[0].raw_body
lines: list[str] = []
for entry in entries:
if not entry.raw_body:
continue
if is_group:
label = entry.sender_name.strip() or entry.sender_username.strip() or entry.author
if label:
lines.append(f"{label}: {entry.raw_body}")
continue
lines.append(entry.raw_body)
return "\n".join(lines).strip()
def parse_timestamp(value: Any) -> int | None:
"""Parse event timestamp to epoch milliseconds."""
if not isinstance(value, str) or not value.strip():
return None
try:
return int(datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() * 1000)
except ValueError:
return None
# ---------------------------------------------------------------------------
# Channel
# ---------------------------------------------------------------------------
class MochatChannel(BaseChannel):
"""Mochat channel using socket.io with fallback polling workers."""
name = "mochat"
def __init__(self, config: MochatConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: MochatConfig = config
self._http: httpx.AsyncClient | None = None
self._socket: Any = None
self._ws_connected = self._ws_ready = False
self._state_dir = get_data_path() / "mochat"
self._cursor_path = self._state_dir / "session_cursors.json"
self._session_cursor: dict[str, int] = {}
self._cursor_save_task: asyncio.Task | None = None
self._session_set: set[str] = set()
self._panel_set: set[str] = set()
self._auto_discover_sessions = self._auto_discover_panels = False
self._cold_sessions: set[str] = set()
self._session_by_converse: dict[str, str] = {}
self._seen_set: dict[str, set[str]] = {}
self._seen_queue: dict[str, deque[str]] = {}
self._delay_states: dict[str, DelayState] = {}
self._fallback_mode = False
self._session_fallback_tasks: dict[str, asyncio.Task] = {}
self._panel_fallback_tasks: dict[str, asyncio.Task] = {}
self._refresh_task: asyncio.Task | None = None
self._target_locks: dict[str, asyncio.Lock] = {}
# ---- lifecycle ---------------------------------------------------------
async def start(self) -> None:
"""Start Mochat channel workers and websocket connection."""
if not self.config.claw_token:
logger.error("Mochat claw_token not configured")
return
self._running = True
self._http = httpx.AsyncClient(timeout=30.0)
self._state_dir.mkdir(parents=True, exist_ok=True)
await self._load_session_cursors()
self._seed_targets_from_config()
await self._refresh_targets(subscribe_new=False)
if not await self._start_socket_client():
await self._ensure_fallback_workers()
self._refresh_task = asyncio.create_task(self._refresh_loop())
while self._running:
await asyncio.sleep(1)
async def stop(self) -> None:
"""Stop all workers and clean up resources."""
self._running = False
if self._refresh_task:
self._refresh_task.cancel()
self._refresh_task = None
await self._stop_fallback_workers()
await self._cancel_delay_timers()
if self._socket:
try:
await self._socket.disconnect()
except Exception:
pass
self._socket = None
if self._cursor_save_task:
self._cursor_save_task.cancel()
self._cursor_save_task = None
await self._save_session_cursors()
if self._http:
await self._http.aclose()
self._http = None
self._ws_connected = self._ws_ready = False
async def send(self, msg: OutboundMessage) -> None:
"""Send outbound message to session or panel."""
if not self.config.claw_token:
logger.warning("Mochat claw_token missing, skip send")
return
parts = ([msg.content.strip()] if msg.content and msg.content.strip() else [])
if msg.media:
parts.extend(m for m in msg.media if isinstance(m, str) and m.strip())
content = "\n".join(parts).strip()
if not content:
return
target = resolve_mochat_target(msg.chat_id)
if not target.id:
logger.warning("Mochat outbound target is empty")
return
is_panel = (target.is_panel or target.id in self._panel_set) and not target.id.startswith("session_")
try:
if is_panel:
await self._api_send("/api/claw/groups/panels/send", "panelId", target.id,
content, msg.reply_to, self._read_group_id(msg.metadata))
else:
await self._api_send("/api/claw/sessions/send", "sessionId", target.id,
content, msg.reply_to)
except Exception as e:
logger.error(f"Failed to send Mochat message: {e}")
# ---- config / init helpers ---------------------------------------------
def _seed_targets_from_config(self) -> None:
sessions, self._auto_discover_sessions = self._normalize_id_list(self.config.sessions)
panels, self._auto_discover_panels = self._normalize_id_list(self.config.panels)
self._session_set.update(sessions)
self._panel_set.update(panels)
for sid in sessions:
if sid not in self._session_cursor:
self._cold_sessions.add(sid)
@staticmethod
def _normalize_id_list(values: list[str]) -> tuple[list[str], bool]:
cleaned = [str(v).strip() for v in values if str(v).strip()]
return sorted({v for v in cleaned if v != "*"}), "*" in cleaned
# ---- websocket ---------------------------------------------------------
async def _start_socket_client(self) -> bool:
if not SOCKETIO_AVAILABLE:
logger.warning("python-socketio not installed, Mochat using polling fallback")
return False
serializer = "default"
if not self.config.socket_disable_msgpack:
if MSGPACK_AVAILABLE:
serializer = "msgpack"
else:
logger.warning("msgpack not installed but socket_disable_msgpack=false; using JSON")
client = socketio.AsyncClient(
reconnection=True,
reconnection_attempts=self.config.max_retry_attempts or None,
reconnection_delay=max(0.1, self.config.socket_reconnect_delay_ms / 1000.0),
reconnection_delay_max=max(0.1, self.config.socket_max_reconnect_delay_ms / 1000.0),
logger=False, engineio_logger=False, serializer=serializer,
)
@client.event
async def connect() -> None:
self._ws_connected, self._ws_ready = True, False
logger.info("Mochat websocket connected")
subscribed = await self._subscribe_all()
self._ws_ready = subscribed
await (self._stop_fallback_workers() if subscribed else self._ensure_fallback_workers())
@client.event
async def disconnect() -> None:
if not self._running:
return
self._ws_connected = self._ws_ready = False
logger.warning("Mochat websocket disconnected")
await self._ensure_fallback_workers()
@client.event
async def connect_error(data: Any) -> None:
logger.error(f"Mochat websocket connect error: {data}")
@client.on("claw.session.events")
async def on_session_events(payload: dict[str, Any]) -> None:
await self._handle_watch_payload(payload, "session")
@client.on("claw.panel.events")
async def on_panel_events(payload: dict[str, Any]) -> None:
await self._handle_watch_payload(payload, "panel")
for ev in ("notify:chat.inbox.append", "notify:chat.message.add",
"notify:chat.message.update", "notify:chat.message.recall",
"notify:chat.message.delete"):
client.on(ev, self._build_notify_handler(ev))
socket_url = (self.config.socket_url or self.config.base_url).strip().rstrip("/")
socket_path = (self.config.socket_path or "/socket.io").strip().lstrip("/")
try:
self._socket = client
await client.connect(
socket_url, transports=["websocket"], socketio_path=socket_path,
auth={"token": self.config.claw_token},
wait_timeout=max(1.0, self.config.socket_connect_timeout_ms / 1000.0),
)
return True
except Exception as e:
logger.error(f"Failed to connect Mochat websocket: {e}")
try:
await client.disconnect()
except Exception:
pass
self._socket = None
return False
def _build_notify_handler(self, event_name: str):
async def handler(payload: Any) -> None:
if event_name == "notify:chat.inbox.append":
await self._handle_notify_inbox_append(payload)
elif event_name.startswith("notify:chat.message."):
await self._handle_notify_chat_message(payload)
return handler
# ---- subscribe ---------------------------------------------------------
async def _subscribe_all(self) -> bool:
ok = await self._subscribe_sessions(sorted(self._session_set))
ok = await self._subscribe_panels(sorted(self._panel_set)) and ok
if self._auto_discover_sessions or self._auto_discover_panels:
await self._refresh_targets(subscribe_new=True)
return ok
async def _subscribe_sessions(self, session_ids: list[str]) -> bool:
if not session_ids:
return True
for sid in session_ids:
if sid not in self._session_cursor:
self._cold_sessions.add(sid)
ack = await self._socket_call("com.claw.im.subscribeSessions", {
"sessionIds": session_ids, "cursors": self._session_cursor,
"limit": self.config.watch_limit,
})
if not ack.get("result"):
logger.error(f"Mochat subscribeSessions failed: {ack.get('message', 'unknown error')}")
return False
data = ack.get("data")
items: list[dict[str, Any]] = []
if isinstance(data, list):
items = [i for i in data if isinstance(i, dict)]
elif isinstance(data, dict):
sessions = data.get("sessions")
if isinstance(sessions, list):
items = [i for i in sessions if isinstance(i, dict)]
elif "sessionId" in data:
items = [data]
for p in items:
await self._handle_watch_payload(p, "session")
return True
async def _subscribe_panels(self, panel_ids: list[str]) -> bool:
if not self._auto_discover_panels and not panel_ids:
return True
ack = await self._socket_call("com.claw.im.subscribePanels", {"panelIds": panel_ids})
if not ack.get("result"):
logger.error(f"Mochat subscribePanels failed: {ack.get('message', 'unknown error')}")
return False
return True
async def _socket_call(self, event_name: str, payload: dict[str, Any]) -> dict[str, Any]:
if not self._socket:
return {"result": False, "message": "socket not connected"}
try:
raw = await self._socket.call(event_name, payload, timeout=10)
except Exception as e:
return {"result": False, "message": str(e)}
return raw if isinstance(raw, dict) else {"result": True, "data": raw}
# ---- refresh / discovery -----------------------------------------------
async def _refresh_loop(self) -> None:
interval_s = max(1.0, self.config.refresh_interval_ms / 1000.0)
while self._running:
await asyncio.sleep(interval_s)
try:
await self._refresh_targets(subscribe_new=self._ws_ready)
except Exception as e:
logger.warning(f"Mochat refresh failed: {e}")
if self._fallback_mode:
await self._ensure_fallback_workers()
async def _refresh_targets(self, subscribe_new: bool) -> None:
if self._auto_discover_sessions:
await self._refresh_sessions_directory(subscribe_new)
if self._auto_discover_panels:
await self._refresh_panels(subscribe_new)
async def _refresh_sessions_directory(self, subscribe_new: bool) -> None:
try:
response = await self._post_json("/api/claw/sessions/list", {})
except Exception as e:
logger.warning(f"Mochat listSessions failed: {e}")
return
sessions = response.get("sessions")
if not isinstance(sessions, list):
return
new_ids: list[str] = []
for s in sessions:
if not isinstance(s, dict):
continue
sid = _str_field(s, "sessionId")
if not sid:
continue
if sid not in self._session_set:
self._session_set.add(sid)
new_ids.append(sid)
if sid not in self._session_cursor:
self._cold_sessions.add(sid)
cid = _str_field(s, "converseId")
if cid:
self._session_by_converse[cid] = sid
if not new_ids:
return
if self._ws_ready and subscribe_new:
await self._subscribe_sessions(new_ids)
if self._fallback_mode:
await self._ensure_fallback_workers()
async def _refresh_panels(self, subscribe_new: bool) -> None:
try:
response = await self._post_json("/api/claw/groups/get", {})
except Exception as e:
logger.warning(f"Mochat getWorkspaceGroup failed: {e}")
return
raw_panels = response.get("panels")
if not isinstance(raw_panels, list):
return
new_ids: list[str] = []
for p in raw_panels:
if not isinstance(p, dict):
continue
pt = p.get("type")
if isinstance(pt, int) and pt != 0:
continue
pid = _str_field(p, "id", "_id")
if pid and pid not in self._panel_set:
self._panel_set.add(pid)
new_ids.append(pid)
if not new_ids:
return
if self._ws_ready and subscribe_new:
await self._subscribe_panels(new_ids)
if self._fallback_mode:
await self._ensure_fallback_workers()
# ---- fallback workers --------------------------------------------------
async def _ensure_fallback_workers(self) -> None:
if not self._running:
return
self._fallback_mode = True
for sid in sorted(self._session_set):
t = self._session_fallback_tasks.get(sid)
if not t or t.done():
self._session_fallback_tasks[sid] = asyncio.create_task(self._session_watch_worker(sid))
for pid in sorted(self._panel_set):
t = self._panel_fallback_tasks.get(pid)
if not t or t.done():
self._panel_fallback_tasks[pid] = asyncio.create_task(self._panel_poll_worker(pid))
async def _stop_fallback_workers(self) -> None:
self._fallback_mode = False
tasks = [*self._session_fallback_tasks.values(), *self._panel_fallback_tasks.values()]
for t in tasks:
t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self._session_fallback_tasks.clear()
self._panel_fallback_tasks.clear()
async def _session_watch_worker(self, session_id: str) -> None:
while self._running and self._fallback_mode:
try:
payload = await self._post_json("/api/claw/sessions/watch", {
"sessionId": session_id, "cursor": self._session_cursor.get(session_id, 0),
"timeoutMs": self.config.watch_timeout_ms, "limit": self.config.watch_limit,
})
await self._handle_watch_payload(payload, "session")
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"Mochat watch fallback error ({session_id}): {e}")
await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0))
async def _panel_poll_worker(self, panel_id: str) -> None:
sleep_s = max(1.0, self.config.refresh_interval_ms / 1000.0)
while self._running and self._fallback_mode:
try:
resp = await self._post_json("/api/claw/groups/panels/messages", {
"panelId": panel_id, "limit": min(100, max(1, self.config.watch_limit)),
})
msgs = resp.get("messages")
if isinstance(msgs, list):
for m in reversed(msgs):
if not isinstance(m, dict):
continue
evt = _make_synthetic_event(
message_id=str(m.get("messageId") or ""),
author=str(m.get("author") or ""),
content=m.get("content"),
meta=m.get("meta"), group_id=str(resp.get("groupId") or ""),
converse_id=panel_id, timestamp=m.get("createdAt"),
author_info=m.get("authorInfo"),
)
await self._process_inbound_event(panel_id, evt, "panel")
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"Mochat panel polling error ({panel_id}): {e}")
await asyncio.sleep(sleep_s)
# ---- inbound event processing ------------------------------------------
async def _handle_watch_payload(self, payload: dict[str, Any], target_kind: str) -> None:
if not isinstance(payload, dict):
return
target_id = _str_field(payload, "sessionId")
if not target_id:
return
lock = self._target_locks.setdefault(f"{target_kind}:{target_id}", asyncio.Lock())
async with lock:
prev = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0
pc = payload.get("cursor")
if target_kind == "session" and isinstance(pc, int) and pc >= 0:
self._mark_session_cursor(target_id, pc)
raw_events = payload.get("events")
if not isinstance(raw_events, list):
return
if target_kind == "session" and target_id in self._cold_sessions:
self._cold_sessions.discard(target_id)
return
for event in raw_events:
if not isinstance(event, dict):
continue
seq = event.get("seq")
if target_kind == "session" and isinstance(seq, int) and seq > self._session_cursor.get(target_id, prev):
self._mark_session_cursor(target_id, seq)
if event.get("type") == "message.add":
await self._process_inbound_event(target_id, event, target_kind)
async def _process_inbound_event(self, target_id: str, event: dict[str, Any], target_kind: str) -> None:
payload = event.get("payload")
if not isinstance(payload, dict):
return
author = _str_field(payload, "author")
if not author or (self.config.agent_user_id and author == self.config.agent_user_id):
return
if not self.is_allowed(author):
return
message_id = _str_field(payload, "messageId")
seen_key = f"{target_kind}:{target_id}"
if message_id and self._remember_message_id(seen_key, message_id):
return
raw_body = normalize_mochat_content(payload.get("content")) or "[empty message]"
ai = _safe_dict(payload.get("authorInfo"))
sender_name = _str_field(ai, "nickname", "email")
sender_username = _str_field(ai, "agentId")
group_id = _str_field(payload, "groupId")
is_group = bool(group_id)
was_mentioned = resolve_was_mentioned(payload, self.config.agent_user_id)
require_mention = target_kind == "panel" and is_group and resolve_require_mention(self.config, target_id, group_id)
use_delay = target_kind == "panel" and self.config.reply_delay_mode == "non-mention"
if require_mention and not was_mentioned and not use_delay:
return
entry = MochatBufferedEntry(
raw_body=raw_body, author=author, sender_name=sender_name,
sender_username=sender_username, timestamp=parse_timestamp(event.get("timestamp")),
message_id=message_id, group_id=group_id,
)
if use_delay:
delay_key = seen_key
if was_mentioned:
await self._flush_delayed_entries(delay_key, target_id, target_kind, "mention", entry)
else:
await self._enqueue_delayed_entry(delay_key, target_id, target_kind, entry)
return
await self._dispatch_entries(target_id, target_kind, [entry], was_mentioned)
# ---- dedup / buffering -------------------------------------------------
def _remember_message_id(self, key: str, message_id: str) -> bool:
seen_set = self._seen_set.setdefault(key, set())
seen_queue = self._seen_queue.setdefault(key, deque())
if message_id in seen_set:
return True
seen_set.add(message_id)
seen_queue.append(message_id)
while len(seen_queue) > MAX_SEEN_MESSAGE_IDS:
seen_set.discard(seen_queue.popleft())
return False
async def _enqueue_delayed_entry(self, key: str, target_id: str, target_kind: str, entry: MochatBufferedEntry) -> None:
state = self._delay_states.setdefault(key, DelayState())
async with state.lock:
state.entries.append(entry)
if state.timer:
state.timer.cancel()
state.timer = asyncio.create_task(self._delay_flush_after(key, target_id, target_kind))
async def _delay_flush_after(self, key: str, target_id: str, target_kind: str) -> None:
await asyncio.sleep(max(0, self.config.reply_delay_ms) / 1000.0)
await self._flush_delayed_entries(key, target_id, target_kind, "timer", None)
async def _flush_delayed_entries(self, key: str, target_id: str, target_kind: str, reason: str, entry: MochatBufferedEntry | None) -> None:
state = self._delay_states.setdefault(key, DelayState())
async with state.lock:
if entry:
state.entries.append(entry)
current = asyncio.current_task()
if state.timer and state.timer is not current:
state.timer.cancel()
state.timer = None
entries = state.entries[:]
state.entries.clear()
if entries:
await self._dispatch_entries(target_id, target_kind, entries, reason == "mention")
async def _dispatch_entries(self, target_id: str, target_kind: str, entries: list[MochatBufferedEntry], was_mentioned: bool) -> None:
if not entries:
return
last = entries[-1]
is_group = bool(last.group_id)
body = build_buffered_body(entries, is_group) or "[empty message]"
await self._handle_message(
sender_id=last.author, chat_id=target_id, content=body,
metadata={
"message_id": last.message_id, "timestamp": last.timestamp,
"is_group": is_group, "group_id": last.group_id,
"sender_name": last.sender_name, "sender_username": last.sender_username,
"target_kind": target_kind, "was_mentioned": was_mentioned,
"buffered_count": len(entries),
},
)
async def _cancel_delay_timers(self) -> None:
for state in self._delay_states.values():
if state.timer:
state.timer.cancel()
self._delay_states.clear()
# ---- notify handlers ---------------------------------------------------
async def _handle_notify_chat_message(self, payload: Any) -> None:
if not isinstance(payload, dict):
return
group_id = _str_field(payload, "groupId")
panel_id = _str_field(payload, "converseId", "panelId")
if not group_id or not panel_id:
return
if self._panel_set and panel_id not in self._panel_set:
return
evt = _make_synthetic_event(
message_id=str(payload.get("_id") or payload.get("messageId") or ""),
author=str(payload.get("author") or ""),
content=payload.get("content"), meta=payload.get("meta"),
group_id=group_id, converse_id=panel_id,
timestamp=payload.get("createdAt"), author_info=payload.get("authorInfo"),
)
await self._process_inbound_event(panel_id, evt, "panel")
async def _handle_notify_inbox_append(self, payload: Any) -> None:
if not isinstance(payload, dict) or payload.get("type") != "message":
return
detail = payload.get("payload")
if not isinstance(detail, dict):
return
if _str_field(detail, "groupId"):
return
converse_id = _str_field(detail, "converseId")
if not converse_id:
return
session_id = self._session_by_converse.get(converse_id)
if not session_id:
await self._refresh_sessions_directory(self._ws_ready)
session_id = self._session_by_converse.get(converse_id)
if not session_id:
return
evt = _make_synthetic_event(
message_id=str(detail.get("messageId") or payload.get("_id") or ""),
author=str(detail.get("messageAuthor") or ""),
content=str(detail.get("messagePlainContent") or detail.get("messageSnippet") or ""),
meta={"source": "notify:chat.inbox.append", "converseId": converse_id},
group_id="", converse_id=converse_id, timestamp=payload.get("createdAt"),
)
await self._process_inbound_event(session_id, evt, "session")
# ---- cursor persistence ------------------------------------------------
def _mark_session_cursor(self, session_id: str, cursor: int) -> None:
if cursor < 0 or cursor < self._session_cursor.get(session_id, 0):
return
self._session_cursor[session_id] = cursor
if not self._cursor_save_task or self._cursor_save_task.done():
self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced())
async def _save_cursor_debounced(self) -> None:
await asyncio.sleep(CURSOR_SAVE_DEBOUNCE_S)
await self._save_session_cursors()
async def _load_session_cursors(self) -> None:
if not self._cursor_path.exists():
return
try:
data = json.loads(self._cursor_path.read_text("utf-8"))
except Exception as e:
logger.warning(f"Failed to read Mochat cursor file: {e}")
return
cursors = data.get("cursors") if isinstance(data, dict) else None
if isinstance(cursors, dict):
for sid, cur in cursors.items():
if isinstance(sid, str) and isinstance(cur, int) and cur >= 0:
self._session_cursor[sid] = cur
async def _save_session_cursors(self) -> None:
try:
self._state_dir.mkdir(parents=True, exist_ok=True)
self._cursor_path.write_text(json.dumps({
"schemaVersion": 1, "updatedAt": datetime.utcnow().isoformat(),
"cursors": self._session_cursor,
}, ensure_ascii=False, indent=2) + "\n", "utf-8")
except Exception as e:
logger.warning(f"Failed to save Mochat cursor file: {e}")
# ---- HTTP helpers ------------------------------------------------------
async def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
if not self._http:
raise RuntimeError("Mochat HTTP client not initialized")
url = f"{self.config.base_url.strip().rstrip('/')}{path}"
response = await self._http.post(url, headers={
"Content-Type": "application/json", "X-Claw-Token": self.config.claw_token,
}, json=payload)
if not response.is_success:
raise RuntimeError(f"Mochat HTTP {response.status_code}: {response.text[:200]}")
try:
parsed = response.json()
except Exception:
parsed = response.text
if isinstance(parsed, dict) and isinstance(parsed.get("code"), int):
if parsed["code"] != 200:
msg = str(parsed.get("message") or parsed.get("name") or "request failed")
raise RuntimeError(f"Mochat API error: {msg} (code={parsed['code']})")
data = parsed.get("data")
return data if isinstance(data, dict) else {}
return parsed if isinstance(parsed, dict) else {}
async def _api_send(self, path: str, id_key: str, id_val: str,
content: str, reply_to: str | None, group_id: str | None = None) -> dict[str, Any]:
"""Unified send helper for session and panel messages."""
body: dict[str, Any] = {id_key: id_val, "content": content}
if reply_to:
body["replyTo"] = reply_to
if group_id:
body["groupId"] = group_id
return await self._post_json(path, body)
@staticmethod
def _read_group_id(metadata: dict[str, Any]) -> str | None:
if not isinstance(metadata, dict):
return None
value = metadata.get("group_id") or metadata.get("groupId")
return value.strip() if isinstance(value, str) and value.strip() else None

View File

@@ -561,6 +561,24 @@ def channels_status():
"" if dc.enabled else "",
dc.gateway_url
)
# Feishu
fs = config.channels.feishu
fs_config = f"app_id: {fs.app_id[:10]}..." if fs.app_id else "[dim]not configured[/dim]"
table.add_row(
"Feishu",
"" if fs.enabled else "",
fs_config
)
# Mochat
mc = config.channels.mochat
mc_base = mc.base_url or "[dim]not configured[/dim]"
table.add_row(
"Mochat",
"" if mc.enabled else "",
mc_base
)
# Telegram
tg = config.channels.telegram

View File

@@ -77,6 +77,42 @@ class EmailConfig(BaseModel):
allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses
class MochatMentionConfig(BaseModel):
"""Mochat mention behavior configuration."""
require_in_groups: bool = False
class MochatGroupRule(BaseModel):
"""Mochat per-group mention requirement."""
require_mention: bool = False
class MochatConfig(BaseModel):
"""Mochat channel configuration."""
enabled: bool = False
base_url: str = "https://mochat.io"
socket_url: str = ""
socket_path: str = "/socket.io"
socket_disable_msgpack: bool = False
socket_reconnect_delay_ms: int = 1000
socket_max_reconnect_delay_ms: int = 10000
socket_connect_timeout_ms: int = 10000
refresh_interval_ms: int = 30000
watch_timeout_ms: int = 25000
watch_limit: int = 100
retry_delay_ms: int = 500
max_retry_attempts: int = 0 # 0 means unlimited retries
claw_token: str = ""
agent_user_id: str = ""
sessions: list[str] = Field(default_factory=list)
panels: list[str] = Field(default_factory=list)
allow_from: list[str] = Field(default_factory=list)
mention: MochatMentionConfig = Field(default_factory=MochatMentionConfig)
groups: dict[str, MochatGroupRule] = Field(default_factory=dict)
reply_delay_mode: str = "non-mention" # off | non-mention
reply_delay_ms: int = 120000
class SlackDMConfig(BaseModel):
"""Slack DM policy configuration."""
enabled: bool = True
@@ -111,6 +147,7 @@ class ChannelsConfig(BaseModel):
telegram: TelegramConfig = Field(default_factory=TelegramConfig)
discord: DiscordConfig = Field(default_factory=DiscordConfig)
feishu: FeishuConfig = Field(default_factory=FeishuConfig)
mochat: MochatConfig = Field(default_factory=MochatConfig)
dingtalk: DingTalkConfig = Field(default_factory=DingTalkConfig)
email: EmailConfig = Field(default_factory=EmailConfig)
slack: SlackConfig = Field(default_factory=SlackConfig)

View File

@@ -32,6 +32,8 @@ dependencies = [
"python-telegram-bot[socks]>=21.0",
"lark-oapi>=1.0.0",
"socksio>=1.0.0",
"python-socketio>=5.11.0",
"msgpack>=1.0.8",
"slack-sdk>=3.26.0",
"qq-botpy>=1.0.0",
]