diff --git a/.gitignore b/.gitignore
index 55338f7..4e58574 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,4 +17,5 @@ docs/
__pycache__/
poetry.lock
.pytest_cache/
-tests/
\ No newline at end of file
+tests/
+botpy.log
\ No newline at end of file
diff --git a/README.md b/README.md
index 8f7c1a2..4acaca8 100644
--- a/README.md
+++ b/README.md
@@ -166,7 +166,7 @@ nanobot agent -m "Hello from my local LLM!"
## 💬 Chat Apps
-Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, or Email — anytime, anywhere.
+Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, Email, or QQ — anytime, anywhere.
| Channel | Setup |
|---------|-------|
@@ -176,6 +176,7 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, or E
| **Feishu** | Medium (app credentials) |
| **DingTalk** | Medium (app credentials) |
| **Email** | Medium (IMAP/SMTP credentials) |
+| **QQ** | Easy (app credentials) |
Telegram (Recommended)
@@ -335,6 +336,45 @@ nanobot gateway
+
+QQ (QQ私聊)
+
+Uses **botpy SDK** with WebSocket — no public IP required.
+
+**1. Create a QQ bot**
+- Visit [QQ Open Platform](https://q.qq.com)
+- Create a new bot application
+- Get **AppID** and **Secret** from "Developer Settings"
+
+**2. Configure**
+
+```json
+{
+ "channels": {
+ "qq": {
+ "enabled": true,
+ "appId": "YOUR_APP_ID",
+ "secret": "YOUR_APP_SECRET",
+ "allowFrom": []
+ }
+ }
+}
+```
+
+> `allowFrom`: Leave empty for public access, or add user openids to restrict access.
+> Example: `"allowFrom": ["user_openid_1", "user_openid_2"]`
+
+**3. Run**
+
+```bash
+nanobot gateway
+```
+
+> [!TIP]
+> QQ bot currently supports **private messages only**. Group chat support coming soon!
+
+
+
DingTalk (钉钉)
diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py
index 26fa9f3..a7b1ed5 100644
--- a/nanobot/channels/manager.py
+++ b/nanobot/channels/manager.py
@@ -106,6 +106,18 @@ class ChannelManager:
logger.info("Email channel enabled")
except ImportError as e:
logger.warning(f"Email channel not available: {e}")
+
+ # QQ channel
+ if self.config.channels.qq.enabled:
+ try:
+ from nanobot.channels.qq import QQChannel
+ self.channels["qq"] = QQChannel(
+ self.config.channels.qq,
+ self.bus,
+ )
+ logger.info("QQ channel enabled")
+ except ImportError as e:
+ logger.warning(f"QQ channel not available: {e}")
async def _start_channel(self, name: str, channel: BaseChannel) -> None:
"""Start a channel and log any exceptions."""
diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py
new file mode 100644
index 0000000..98ca883
--- /dev/null
+++ b/nanobot/channels/qq.py
@@ -0,0 +1,211 @@
+"""QQ channel implementation using botpy SDK."""
+
+import asyncio
+from collections import deque
+from typing import TYPE_CHECKING
+
+from loguru import logger
+
+from nanobot.bus.events import InboundMessage, OutboundMessage
+from nanobot.bus.queue import MessageBus
+from nanobot.channels.base import BaseChannel
+from nanobot.config.schema import QQConfig
+
+try:
+ import botpy
+ from botpy.message import C2CMessage
+
+ QQ_AVAILABLE = True
+except ImportError:
+ QQ_AVAILABLE = False
+ botpy = None
+ C2CMessage = None
+
+if TYPE_CHECKING:
+ from botpy.message import C2CMessage
+
+
+def parse_chat_id(chat_id: str) -> tuple[str, str]:
+ """Parse chat_id into (channel, user_id).
+
+ Args:
+ chat_id: Format "channel:user_id", e.g. "qq:openid_xxx"
+
+ Returns:
+ Tuple of (channel, user_id)
+ """
+ if ":" not in chat_id:
+ raise ValueError(f"Invalid chat_id format: {chat_id}")
+ channel, user_id = chat_id.split(":", 1)
+ return channel, user_id
+
+
+class QQChannel(BaseChannel):
+ """
+ QQ channel using botpy SDK with WebSocket connection.
+
+ Uses botpy SDK to connect to QQ Open Platform (q.qq.com).
+
+ Requires:
+ - App ID and Secret from q.qq.com
+ - Robot capability enabled
+ """
+
+ name = "qq"
+
+ def __init__(self, config: QQConfig, bus: MessageBus):
+ super().__init__(config, bus)
+ self.config: QQConfig = config
+ self._client: "botpy.Client | None" = None
+ self._processed_message_ids: deque = deque(maxlen=1000)
+ self._bot_task: asyncio.Task | None = None
+
+ async def start(self) -> None:
+ """Start the QQ bot."""
+ if not QQ_AVAILABLE:
+ logger.error("QQ SDK 未安装。请运行:pip install qq-botpy")
+ return
+
+ if not self.config.app_id or not self.config.secret:
+ logger.error("QQ app_id 和 secret 未配置")
+ return
+
+ self._running = True
+
+ # Create bot client with C2C intents
+ intents = botpy.Intents.all()
+ logger.info(f"QQ Intents 配置值: {intents.value}")
+
+ # Create custom bot class with message handlers
+ class QQBot(botpy.Client):
+ def __init__(self, channel):
+ super().__init__(intents=intents)
+ self.channel = channel
+
+ async def on_ready(self):
+ """Called when bot is ready."""
+ logger.info(f"QQ bot ready: {self.robot.name}")
+
+ async def on_c2c_message_create(self, message: "C2CMessage"):
+ """Handle C2C (Client to Client) messages - private chat."""
+ await self.channel._on_message(message, "c2c")
+
+ async def on_direct_message_create(self, message):
+ """Handle direct messages - alternative event name."""
+ await self.channel._on_message(message, "direct")
+
+ # TODO: Group message support - implement in future PRD
+ # async def on_group_at_message_create(self, message):
+ # """Handle group @ messages."""
+ # pass
+
+ self._client = QQBot(self)
+
+ # Start bot - use create_task to run concurrently
+ self._bot_task = asyncio.create_task(
+ self._run_bot_with_retry(self.config.app_id, self.config.secret)
+ )
+
+ logger.info("QQ bot started with C2C (private message) support")
+
+ async def _run_bot_with_retry(self, app_id: str, secret: str) -> None:
+ """Run bot with error handling."""
+ try:
+ await self._client.start(appid=app_id, secret=secret)
+ except Exception as e:
+ logger.error(
+ f"QQ 鉴权失败,请检查 AppID 和 Secret 是否正确。"
+ f"访问 q.qq.com 获取凭证。错误: {e}"
+ )
+ self._running = False
+
+ async def stop(self) -> None:
+ """Stop the QQ bot."""
+ self._running = False
+
+ if self._bot_task:
+ self._bot_task.cancel()
+ try:
+ await self._bot_task
+ except asyncio.CancelledError:
+ pass
+
+ logger.info("QQ bot stopped")
+
+ async def send(self, msg: OutboundMessage) -> None:
+ """Send a message through QQ."""
+ if not self._client:
+ logger.warning("QQ client not initialized")
+ return
+
+ try:
+ # Parse chat_id format: qq:{user_id}
+ channel, user_id = parse_chat_id(msg.chat_id)
+
+ if channel != "qq":
+ logger.warning(f"Invalid channel in chat_id: {msg.chat_id}")
+ return
+
+ # Send private message using botpy API
+ await self._client.api.post_c2c_message(
+ openid=user_id,
+ msg_type=0,
+ content=msg.content,
+ )
+ logger.debug(f"QQ message sent to {msg.chat_id}")
+
+ except ValueError as e:
+ logger.error(f"Invalid chat_id format: {e}")
+ except Exception as e:
+ logger.error(f"Error sending QQ message: {e}")
+
+ async def _on_message(self, data: "C2CMessage", msg_type: str) -> None:
+ """Handle incoming message from QQ."""
+ try:
+ # Message deduplication using deque with maxlen
+ message_id = data.id
+ if message_id in self._processed_message_ids:
+ logger.debug(f"Duplicate message {message_id}, skipping")
+ return
+
+ self._processed_message_ids.append(message_id)
+
+ # Extract user ID and chat ID from message
+ author = data.author
+ # Try different possible field names for user ID
+ user_id = str(getattr(author, 'id', None) or getattr(author, 'user_openid', 'unknown'))
+ user_name = getattr(author, 'username', None) or 'unknown'
+
+ # For C2C messages, chat_id is the user's ID
+ chat_id = f"qq:{user_id}"
+
+ # Check allow_from list (if configured)
+ if self.config.allow_from and user_id not in self.config.allow_from:
+ logger.info(f"User {user_id} not in allow_from list")
+ return
+
+ # Get message content
+ content = data.content or ""
+
+ if not content:
+ logger.debug(f"Empty message from {user_id}, skipping")
+ return
+
+ # Publish to message bus
+ msg = InboundMessage(
+ channel=self.name,
+ sender_id=user_id,
+ chat_id=chat_id,
+ content=content,
+ metadata={
+ "message_id": message_id,
+ "user_name": user_name,
+ "msg_type": msg_type,
+ },
+ )
+ await self.bus.publish_inbound(msg)
+
+ logger.info(f"Received QQ message from {user_id} ({msg_type}): {content[:50]}")
+
+ except Exception as e:
+ logger.error(f"Error handling QQ message: {e}")
diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py
index aa9729b..f31d279 100644
--- a/nanobot/config/schema.py
+++ b/nanobot/config/schema.py
@@ -77,6 +77,14 @@ class EmailConfig(BaseModel):
allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses
+class QQConfig(BaseModel):
+ """QQ channel configuration using botpy SDK."""
+ enabled: bool = False
+ app_id: str = "" # 机器人 ID (AppID) from q.qq.com
+ secret: str = "" # 机器人密钥 (AppSecret) from q.qq.com
+ allow_from: list[str] = Field(default_factory=list) # Allowed user openids (empty = public access)
+
+
class ChannelsConfig(BaseModel):
"""Configuration for chat channels."""
whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig)
@@ -85,6 +93,7 @@ class ChannelsConfig(BaseModel):
feishu: FeishuConfig = Field(default_factory=FeishuConfig)
dingtalk: DingTalkConfig = Field(default_factory=DingTalkConfig)
email: EmailConfig = Field(default_factory=EmailConfig)
+ qq: QQConfig = Field(default_factory=QQConfig)
class AgentDefaults(BaseModel):
diff --git a/pyproject.toml b/pyproject.toml
index 6fda084..21b50f0 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -32,6 +32,7 @@ dependencies = [
"python-telegram-bot[socks]>=21.0",
"lark-oapi>=1.0.0",
"socksio>=1.0.0",
+ "qq-botpy>=1.0.0",
]
[project.optional-dependencies]