feat: add QQ channel integration with botpy SDK

Add official QQ platform support using botpy SDK with WebSocket connection.

Features:
- C2C (private message) support via QQ Open Platform
- WebSocket-based bot connection (no public IP required)
- Message deduplication with efficient deque-based LRU cache
- User whitelist support via allow_from configuration
- Clean async architecture using single event loop

Changes:
- Add QQChannel implementation in nanobot/channels/qq.py
- Add QQConfig schema with appId and secret fields
- Register QQ channel in ChannelManager
- Update README with QQ setup instructions
- Add qq-botpy dependency to pyproject.toml
- Add botpy.log to .gitignore

Setup:
1. Get AppID and Secret from q.qq.com
2. Configure in ~/.nanobot/config.json:
   {
     "channels": {
       "qq": {
         "enabled": true,
         "appId": "YOUR_APP_ID",
         "secret": "YOUR_APP_SECRET",
         "allowFrom": []
       }
     }
   }
3. Run: nanobot gateway

Note: Group chat support will be added in future updates.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
yinwm
2026-02-09 15:47:55 +08:00
parent cbca6297d6
commit 34dc933fce
6 changed files with 276 additions and 2 deletions

3
.gitignore vendored
View File

@@ -17,4 +17,5 @@ docs/
__pycache__/
poetry.lock
.pytest_cache/
tests/
tests/
botpy.log

View File

@@ -166,7 +166,7 @@ nanobot agent -m "Hello from my local LLM!"
## 💬 Chat Apps
Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, or Email — anytime, anywhere.
Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, Email, or QQ — anytime, anywhere.
| Channel | Setup |
|---------|-------|
@@ -176,6 +176,7 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, or E
| **Feishu** | Medium (app credentials) |
| **DingTalk** | Medium (app credentials) |
| **Email** | Medium (IMAP/SMTP credentials) |
| **QQ** | Easy (app credentials) |
<details>
<summary><b>Telegram</b> (Recommended)</summary>
@@ -335,6 +336,45 @@ nanobot gateway
</details>
<details>
<summary><b>QQ (QQ私聊)</b></summary>
Uses **botpy SDK** with WebSocket — no public IP required.
**1. Create a QQ bot**
- Visit [QQ Open Platform](https://q.qq.com)
- Create a new bot application
- Get **AppID** and **Secret** from "Developer Settings"
**2. Configure**
```json
{
"channels": {
"qq": {
"enabled": true,
"appId": "YOUR_APP_ID",
"secret": "YOUR_APP_SECRET",
"allowFrom": []
}
}
}
```
> `allowFrom`: Leave empty for public access, or add user openids to restrict access.
> Example: `"allowFrom": ["user_openid_1", "user_openid_2"]`
**3. Run**
```bash
nanobot gateway
```
> [!TIP]
> QQ bot currently supports **private messages only**. Group chat support coming soon!
</details>
<details>
<summary><b>DingTalk (钉钉)</b></summary>

View File

@@ -106,6 +106,18 @@ class ChannelManager:
logger.info("Email channel enabled")
except ImportError as e:
logger.warning(f"Email channel not available: {e}")
# QQ channel
if self.config.channels.qq.enabled:
try:
from nanobot.channels.qq import QQChannel
self.channels["qq"] = QQChannel(
self.config.channels.qq,
self.bus,
)
logger.info("QQ channel enabled")
except ImportError as e:
logger.warning(f"QQ channel not available: {e}")
async def _start_channel(self, name: str, channel: BaseChannel) -> None:
"""Start a channel and log any exceptions."""

211
nanobot/channels/qq.py Normal file
View File

@@ -0,0 +1,211 @@
"""QQ channel implementation using botpy SDK."""
import asyncio
from collections import deque
from typing import TYPE_CHECKING
from loguru import logger
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import QQConfig
try:
import botpy
from botpy.message import C2CMessage
QQ_AVAILABLE = True
except ImportError:
QQ_AVAILABLE = False
botpy = None
C2CMessage = None
if TYPE_CHECKING:
from botpy.message import C2CMessage
def parse_chat_id(chat_id: str) -> tuple[str, str]:
"""Parse chat_id into (channel, user_id).
Args:
chat_id: Format "channel:user_id", e.g. "qq:openid_xxx"
Returns:
Tuple of (channel, user_id)
"""
if ":" not in chat_id:
raise ValueError(f"Invalid chat_id format: {chat_id}")
channel, user_id = chat_id.split(":", 1)
return channel, user_id
class QQChannel(BaseChannel):
"""
QQ channel using botpy SDK with WebSocket connection.
Uses botpy SDK to connect to QQ Open Platform (q.qq.com).
Requires:
- App ID and Secret from q.qq.com
- Robot capability enabled
"""
name = "qq"
def __init__(self, config: QQConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: QQConfig = config
self._client: "botpy.Client | None" = None
self._processed_message_ids: deque = deque(maxlen=1000)
self._bot_task: asyncio.Task | None = None
async def start(self) -> None:
"""Start the QQ bot."""
if not QQ_AVAILABLE:
logger.error("QQ SDK 未安装。请运行pip install qq-botpy")
return
if not self.config.app_id or not self.config.secret:
logger.error("QQ app_id 和 secret 未配置")
return
self._running = True
# Create bot client with C2C intents
intents = botpy.Intents.all()
logger.info(f"QQ Intents 配置值: {intents.value}")
# Create custom bot class with message handlers
class QQBot(botpy.Client):
def __init__(self, channel):
super().__init__(intents=intents)
self.channel = channel
async def on_ready(self):
"""Called when bot is ready."""
logger.info(f"QQ bot ready: {self.robot.name}")
async def on_c2c_message_create(self, message: "C2CMessage"):
"""Handle C2C (Client to Client) messages - private chat."""
await self.channel._on_message(message, "c2c")
async def on_direct_message_create(self, message):
"""Handle direct messages - alternative event name."""
await self.channel._on_message(message, "direct")
# TODO: Group message support - implement in future PRD
# async def on_group_at_message_create(self, message):
# """Handle group @ messages."""
# pass
self._client = QQBot(self)
# Start bot - use create_task to run concurrently
self._bot_task = asyncio.create_task(
self._run_bot_with_retry(self.config.app_id, self.config.secret)
)
logger.info("QQ bot started with C2C (private message) support")
async def _run_bot_with_retry(self, app_id: str, secret: str) -> None:
"""Run bot with error handling."""
try:
await self._client.start(appid=app_id, secret=secret)
except Exception as e:
logger.error(
f"QQ 鉴权失败,请检查 AppID 和 Secret 是否正确。"
f"访问 q.qq.com 获取凭证。错误: {e}"
)
self._running = False
async def stop(self) -> None:
"""Stop the QQ bot."""
self._running = False
if self._bot_task:
self._bot_task.cancel()
try:
await self._bot_task
except asyncio.CancelledError:
pass
logger.info("QQ bot stopped")
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through QQ."""
if not self._client:
logger.warning("QQ client not initialized")
return
try:
# Parse chat_id format: qq:{user_id}
channel, user_id = parse_chat_id(msg.chat_id)
if channel != "qq":
logger.warning(f"Invalid channel in chat_id: {msg.chat_id}")
return
# Send private message using botpy API
await self._client.api.post_c2c_message(
openid=user_id,
msg_type=0,
content=msg.content,
)
logger.debug(f"QQ message sent to {msg.chat_id}")
except ValueError as e:
logger.error(f"Invalid chat_id format: {e}")
except Exception as e:
logger.error(f"Error sending QQ message: {e}")
async def _on_message(self, data: "C2CMessage", msg_type: str) -> None:
"""Handle incoming message from QQ."""
try:
# Message deduplication using deque with maxlen
message_id = data.id
if message_id in self._processed_message_ids:
logger.debug(f"Duplicate message {message_id}, skipping")
return
self._processed_message_ids.append(message_id)
# Extract user ID and chat ID from message
author = data.author
# Try different possible field names for user ID
user_id = str(getattr(author, 'id', None) or getattr(author, 'user_openid', 'unknown'))
user_name = getattr(author, 'username', None) or 'unknown'
# For C2C messages, chat_id is the user's ID
chat_id = f"qq:{user_id}"
# Check allow_from list (if configured)
if self.config.allow_from and user_id not in self.config.allow_from:
logger.info(f"User {user_id} not in allow_from list")
return
# Get message content
content = data.content or ""
if not content:
logger.debug(f"Empty message from {user_id}, skipping")
return
# Publish to message bus
msg = InboundMessage(
channel=self.name,
sender_id=user_id,
chat_id=chat_id,
content=content,
metadata={
"message_id": message_id,
"user_name": user_name,
"msg_type": msg_type,
},
)
await self.bus.publish_inbound(msg)
logger.info(f"Received QQ message from {user_id} ({msg_type}): {content[:50]}")
except Exception as e:
logger.error(f"Error handling QQ message: {e}")

View File

@@ -77,6 +77,14 @@ class EmailConfig(BaseModel):
allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses
class QQConfig(BaseModel):
"""QQ channel configuration using botpy SDK."""
enabled: bool = False
app_id: str = "" # 机器人 ID (AppID) from q.qq.com
secret: str = "" # 机器人密钥 (AppSecret) from q.qq.com
allow_from: list[str] = Field(default_factory=list) # Allowed user openids (empty = public access)
class ChannelsConfig(BaseModel):
"""Configuration for chat channels."""
whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig)
@@ -85,6 +93,7 @@ class ChannelsConfig(BaseModel):
feishu: FeishuConfig = Field(default_factory=FeishuConfig)
dingtalk: DingTalkConfig = Field(default_factory=DingTalkConfig)
email: EmailConfig = Field(default_factory=EmailConfig)
qq: QQConfig = Field(default_factory=QQConfig)
class AgentDefaults(BaseModel):

View File

@@ -32,6 +32,7 @@ dependencies = [
"python-telegram-bot[socks]>=21.0",
"lark-oapi>=1.0.0",
"socksio>=1.0.0",
"qq-botpy>=1.0.0",
]
[project.optional-dependencies]