Skip to content

数据流

系统存在 4 条独立的数据路径,分别对应不同的交互场景。

Path A: Web面板 → Agent (场景对话)

流程: 用户通过外部通道发送消息 → FastAPI 路由到 Agent → 直接返回结果。

外部用户 → HTTP POST /api/scenes/{id}/chat

FastAPI (cococat/app.py)

AgentPool.get_agent(scene_id) → AgentLoop.run()

ReAct 循环 (同一进程内直接调用)

回复给用户 + 记录到 scene_router 历史

特点: v2 中所有调用在同一进程内完成,无需子进程或 JSON-RPC。

Path B: Agent间任务分发

流程: Agent A 通过 dispatch_task 工具 → EventBus 路由 → 直接调用 Agent B 的 AgentLoop → 回复写回消息总线。

Agent A (dispatch_task 工具)

EventBus.publish("dispatch", task)

TaskWorker 接收事件, 解析 target_id + prompt

AgentPool.get_agent(target_id) → AgentLoop.run()
         ↓ 处理完成
写 chat/group.jsonl (消息总线)

关键目录:

  • runs/ — DAG 编排的任务运行记录

Path C: 聊天群组

流程: Admin 发消息到群组 → 心跳检查未读 → 按优先级评分 → agent 处理。

Admin → REST API → chat/groups.json → messages.jsonl

Agent heartbeat (300s 周期)

chat_reader.get_unread_messages(agent_id)

每 条消 息评分: @提及 +100, @all +80, admin +50, leader +30

score >= 10 才处理,按分数降序

AgentLoop.run() 处理

mark_as_read() 标记已读

Path D: 邮箱

流程: Agent A 通过 send_message 工具写 inbox → Agent B 心跳读取 → 处理 → 标记已读。

Agent A (send_message 工具)

agents/mailbox/{target_id}/inbox.jsonl

Agent B 心跳

read_inbox(agent_id) → 过滤 unread

for each unread: _execute_task(prompt)

mark_read(agent_id, idx)

基于 MIT 协议开源