接入新渠道
渠道(Channel)是 CocoCat 与外部用户交互的入口。目前支持微信、企业微信、飞书和 Web API。接入新渠道需要实现 Channel 基类并注册到入口管理器。
架构概览
外部消息 → Channel实现 → on_message回调 → SceneKeeper → Agent邮箱
↓
Agent回复 ← Channel.send() ← Agent处理完成后回调 ← Heartbeat消费第一步:创建 Channel 子类
在 cococat/core/channels/ 下创建新文件,实现 Channel 基类:
python
"""My new channel implementation."""
from cococat.core.channels import Channel, ChatMessage
class MyChannel(Channel):
channel_type = "my_channel" # 唯一标识
def start(self, scene_id: str, config: dict):
"""启动频道连接。config 包含渠道配置参数。"""
self.scene_id = scene_id
# 初始化连接、WebSocket、Webhook 等
# 收到消息时调用 self.on_message(msg)
def send(self, reply: str, user_id: str):
"""发送回复给用户。"""
# 调用渠道 API 发送消息
passChannel 基类
python
class Channel:
channel_type = ""
def __init__(self):
self.scene_id = ""
self.on_message = None # 外部设置的回调
def start(self, scene_id: str, config: dict):
raise NotImplementedError
def send(self, reply: str, user_id: str):
raise NotImplementedError| 方法 | 说明 |
|---|---|
start(scene_id, config) | 启动频道,建立与外部平台的连接 |
send(reply, user_id) | 向指定用户发送回复 |
ChatMessage 格式
python
class ChatMessage:
channel_type: str # 渠道类型标识
scene_id: str # 所属场景
user_id: str # 用户 ID(外部平台用户标识)
content: str # 消息内容
msg_type: str # 消息类型(text, image 等)
msg_id: str # 消息 ID
timestamp: str # ISO 格式时间戳第二步:实现消息接收循环
渠道需要持续监听外部消息。典型实现方式:
WebSocket 模式(如飞书):
python
def start(self, scene_id: str, config: dict):
import asyncio
import websockets
async def listen():
async with websockets.connect(config["ws_url"]) as ws:
async for message in ws:
msg = self._parse_message(message)
if msg and self.on_message:
self.on_message(msg)
threading.Thread(target=lambda: asyncio.run(listen()), daemon=True).start()Webhook 模式(如微信公众号):
python
def start(self, scene_id: str, config: dict):
# Webhook 由 FastAPI 路由处理
pass第三步:注册到入口管理器
编辑 cococat/core/scene_keeper.py,新增频道启动函数和路由:
python
def _start_my_channel(scene_id: str, config: dict, target_id: str, target_type: str):
try:
from cococat.core.channels.my_channel import MyChannel
ch = MyChannel()
ch.on_message = lambda msg: _route_to_agent(
target_id, "my_channel", msg.user_id, msg.content
)
ch.start(scene_id, config)
except Exception as e:
print(f"[SceneKeeper] MyChannel failed: {e}")在 start_agent_entries 和 start_scene_entries 中添加分支:
python
if channel == "my_channel":
t = threading.Thread(target=_start_my_channel, ...)
t.start()第四步:添加入口配置 UI
如果渠道需要配置参数(API Key、Webhook URL 等),在 Web 面板的入口管理页面添加对应的配置表单。配置以 JSON 形式存储在 agents/{id}/entries.json 或 scenes/{id}/entries.json。
json
{
"entries": [
{
"type": "channel",
"channel": "my_channel",
"enabled": true,
"config": {
"api_key": "xxx",
"webhook_url": "https://..."
}
}
]
}最佳实践
- 异常处理:渠道启动不应阻塞主流程,使用
daemon=True线程 - 重连机制:WebSocket 类渠道需要断线重连
- 消息去重:使用
msg_id防止重复消费 - 速率限制:遵守外部平台的 API 调用限制
- 日志:记录渠道连接状态和错误