第 20 章 IM 渠道系统
DeerFlow 不仅是一个 Web 应用,还可以作为 IM 机器人嵌入到 Telegram、Slack 和飞书中。本章将深入分析 IM 渠道系统的架构设计,从抽象基类到消息总线,再到三个具体渠道的实现。
20.1 Channel 抽象基类
所有 IM 渠道实现都继承自 channels/base.py 中的 Channel 抽象基类:
class Channel(ABC):
"""Base class for all IM channel implementations.
Each channel connects to an external messaging platform and:
1. Receives messages, wraps them as InboundMessage, publishes to the bus.
2. Subscribes to outbound messages and sends replies back to the platform.
"""
def __init__(self, name: str, bus: MessageBus, config: dict[str, Any]) -> None:
self.name = name
self.bus = bus
self.config = config
self._running = False
@abstractmethod
async def start(self) -> None: ...
@abstractmethod
async def stop(self) -> None: ...
@abstractmethod
async def send(self, msg: OutboundMessage) -> None: ...
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
return False # 默认不支持文件上传基类定义了三个核心抽象方法(start、stop、send)和一个可选的文件上传方法。同时提供了两个模板方法:
_make_inbound — 创建入站消息的工厂方法:
def _make_inbound(
self, chat_id: str, user_id: str, text: str, *,
msg_type: InboundMessageType = InboundMessageType.CHAT,
thread_ts: str | None = None,
files: list[dict[str, Any]] | None = None,
metadata: dict[str, Any] | None = None,
) -> InboundMessage:
return InboundMessage(
channel_name=self.name, chat_id=chat_id, user_id=user_id,
text=text, msg_type=msg_type, thread_ts=thread_ts, ...
)_on_outbound — 出站消息回调,自动过滤目标渠道并按序发送文本和文件:
async def _on_outbound(self, msg: OutboundMessage) -> None:
if msg.channel_name == self.name:
try:
await self.send(msg) # 先发文本
except Exception:
return # 文本失败则跳过文件
for attachment in msg.attachments:
await self.send_file(msg, attachment)这个设计保证了文本消息和文件附件的发送顺序,且文本发送失败时不会尝试发送孤立的文件。
20.2 MessageBus — 异步消息总线
channels/message_bus.py 定义了整个渠道系统的消息模型和 Pub/Sub 中枢。
消息类型
系统定义了两种方向的消息:
class InboundMessageType(StrEnum):
CHAT = "chat" # 普通聊天消息
COMMAND = "command" # 斜杠命令(如 /new, /help)
@dataclass
class InboundMessage:
channel_name: str # 来源渠道名
chat_id: str # 平台会话 ID
user_id: str # 平台用户 ID
text: str # 消息文本
msg_type: InboundMessageType
thread_ts: str | None = None # 平台线程标识
topic_id: str | None = None # 映射到 DeerFlow 线程的话题 ID
files: list[dict] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
created_at: float = field(default_factory=time.time)topic_id 是连接 IM 线程和 DeerFlow 线程的关键字段。同一个 topic_id 内的消息会复用同一个 DeerFlow 线程,实现多轮对话。当 topic_id 为 None 时,每条消息创建独立线程。
出站消息包含文本和文件附件:
@dataclass
class ResolvedAttachment:
virtual_path: str # 虚拟路径
actual_path: Path # 主机路径
filename: str
mime_type: str
size: int
is_image: bool # 图片可能有特殊处理
@dataclass
class OutboundMessage:
channel_name: str # 目标渠道
chat_id: str
thread_id: str # DeerFlow 线程 ID
text: str
artifacts: list[str] = field(default_factory=list)
attachments: list[ResolvedAttachment] = field(default_factory=list)
is_final: bool = True
thread_ts: str | None = Noneis_final 标记用于通知渠道这是流式响应的最后一条消息,部分渠道(如飞书)会在最终消息发送后添加"完成"表情回应。
消息总线
MessageBus 实现了简洁的异步 Pub/Sub 模式:
class MessageBus:
def __init__(self) -> None:
self._inbound_queue: asyncio.Queue[InboundMessage] = asyncio.Queue()
self._outbound_listeners: list[OutboundCallback] = []
async def publish_inbound(self, msg: InboundMessage) -> None:
await self._inbound_queue.put(msg)
async def get_inbound(self) -> InboundMessage:
return await self._inbound_queue.get()
def subscribe_outbound(self, callback: OutboundCallback) -> None:
self._outbound_listeners.append(callback)
async def publish_outbound(self, msg: OutboundMessage) -> None:
for callback in self._outbound_listeners:
await callback(msg)入站方向使用 asyncio.Queue——渠道生产消息,ChannelManager 消费消息并调度代理处理。出站方向使用回调列表——每个渠道在启动时注册 _on_outbound 回调,ChannelManager 发布出站消息时所有渠道都会收到通知,但只有 channel_name 匹配的渠道才实际发送。
20.3 ChannelService — 渠道生命周期管理
channels/service.py 中的 ChannelService 管理所有渠道的启停:
_CHANNEL_REGISTRY: dict[str, str] = {
"feishu": "src.channels.feishu:FeishuChannel",
"slack": "src.channels.slack:SlackChannel",
"telegram": "src.channels.telegram:TelegramChannel",
}
class ChannelService:
def __init__(self, channels_config: dict[str, Any] | None = None) -> None:
self.bus = MessageBus()
self.store = ChannelStore()
self.manager = ChannelManager(
bus=self.bus, store=self.store,
langgraph_url=langgraph_url, gateway_url=gateway_url,
default_session=default_session, channel_sessions=channel_sessions,
)
async def start(self) -> None:
await self.manager.start()
for name, channel_config in self._config.items():
if channel_config.get("enabled", False):
await self._start_channel(name, channel_config)
async def _start_channel(self, name: str, config: dict) -> bool:
import_path = _CHANNEL_REGISTRY.get(name)
channel_cls = resolve_class(import_path, base_class=None)
channel = channel_cls(bus=self.bus, config=config)
await channel.start()
self._channels[name] = channel渠道的注册使用字符串导入路径,通过 resolve_class 反射加载——这意味着只有实际启用的渠道才会导入其依赖库(如 python-telegram-bot、slack-sdk、lark-oapi),未安装的库不会导致启动失败。
ChannelService 作为全局单例,由 Gateway 的 lifespan 钩子在启动时创建:
async def start_channel_service() -> ChannelService:
global _channel_service
_channel_service = ChannelService.from_app_config()
await _channel_service.start()
return _channel_service20.4 三大渠道实现
19.4.1 Telegram
TelegramChannel 基于 python-telegram-bot 库,使用长轮询(Long Polling)模式,无需公网 IP:
class TelegramChannel(Channel):
async def start(self) -> None:
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters
app = ApplicationBuilder().token(bot_token).build()
app.add_handler(CommandHandler("start", self._cmd_start))
app.add_handler(CommandHandler("new", self._cmd_generic))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self._on_text))
# 在独立线程中运行轮询
self._thread = threading.Thread(target=self._run_polling, daemon=True)
self._thread.start()Telegram 渠道在独立线程中运行事件循环,避免与主线程的 uvloop 冲突。消息处理链路为:
- 收到用户消息后,先发送"Working on it..."回复
- 将消息封装为
InboundMessage发布到 MessageBus ChannelManager消费消息并调用 LangGraph Server 处理- 处理结果通过 MessageBus 出站回调发送回 Telegram
文件发送支持图片和文档两种模式,图片不超过 10MB 时用 send_photo,其余用 send_document(最大 50MB)。
配置示例:
channels:
telegram:
enabled: true
bot_token: "123456:ABC-DEF..."
allowed_users: [12345678] # 可选,限制允许使用的用户19.4.2 Slack
SlackChannel 使用 Socket Mode(WebSocket),同样无需公网 IP:
class SlackChannel(Channel):
async def start(self) -> None:
from slack_sdk import WebClient
from slack_sdk.socket_mode import SocketModeClient
self._web_client = WebClient(token=bot_token)
self._socket_client = SocketModeClient(
app_token=app_token, web_client=self._web_client,
)
self._socket_client.socket_mode_request_listeners.append(self._on_socket_event)
# 后台线程启动 WebSocket 连接
asyncio.get_event_loop().run_in_executor(None, self._socket_client.connect)Slack 渠道有几个特色功能:
- Markdown 转换:使用
SlackMarkdownConverter将标准 Markdown 转为 Slack 的mrkdwn格式。 - 表情反应:收到消息时添加 👀 反应表示已收到,处理完成后添加 ✅,失败时添加 ❌。
- 线程对话:使用 Slack 的
thread_ts实现话题内回复,同一话题的消息共享 DeerFlow 线程。
配置需要两个 token:
channels:
slack:
enabled: true
bot_token: "xoxb-..." # Bot User OAuth Token
app_token: "xapp-..." # App-Level Token (Socket Mode)
allowed_users: ["U12345678"]19.4.3 飞书
FeishuChannel 使用 lark-oapi 的 WebSocket 长连接模式:
class FeishuChannel(Channel):
async def start(self) -> None:
import lark_oapi as lark
self._api_client = lark.Client.builder() \
.app_id(app_id).app_secret(app_secret).build()
# WebSocket 在独立线程运行
self._thread = threading.Thread(
target=self._run_ws, args=(app_id, app_secret), daemon=True,
)
self._thread.start()
def _run_ws(self, app_id, app_secret):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 修补 SDK 的模块级事件循环引用
import lark_oapi.ws.client as _ws_client_mod
_ws_client_mod.loop = loop
event_handler = lark.EventDispatcherHandler.builder("", "") \
.register_p2_im_message_receive_v1(self._on_message).build()
ws_client = lark.ws.Client(
app_id=app_id, app_secret=app_secret,
event_handler=event_handler,
)
ws_client.start()飞书渠道的实现有一个巧妙的 workaround:lark-oapi SDK 在模块级缓存了事件循环引用,当主线程使用 uvloop 时会导致冲突。DeerFlow 通过在独立线程中创建新事件循环并替换 SDK 的模块级引用来解决这个问题。
飞书的消息格式使用 Interactive Card(互动卡片)渲染 Markdown 内容:
@staticmethod
def _build_card_content(text: str) -> str:
card = {
"config": {"wide_screen_mode": True},
"elements": [{"tag": "markdown", "content": text}],
}
return json.dumps(card)消息流程中的表情反应也更丰富——收到消息时添加 "OK" 反应,处理完成后添加 "DONE" 反应。
文件上传支持图片和文档两种模式,会自动根据扩展名判断文件类型(xls/ppt/pdf/doc/stream)。
配置示例:
channels:
feishu:
enabled: true
app_id: "cli_xxx..."
app_secret: "xxx..."20.5 实战:配置 Telegram Bot
以下是从零开始配置 Telegram 渠道的完整步骤:
第一步:在 Telegram 中找到 @BotFather,发送 /newbot 创建机器人,记录返回的 Bot Token。
第二步:在 config.yaml 中添加渠道配置:
channels:
langgraph_url: "http://localhost:2024"
gateway_url: "http://localhost:8001"
telegram:
enabled: true
bot_token: "123456789:AAHxxxxxxxxxxxxxxxxxxxxxxxx"
allowed_users: [] # 空列表允许所有用户第三步:安装依赖并启动:
uv add python-telegram-bot
python -m uvicorn src.gateway.app:app --host 0.0.0.0 --port 8001启动日志应显示:
INFO - Channel telegram started
INFO - ChannelService started with channels: ['telegram']第四步:在 Telegram 中向机器人发送消息,即可开始对话。支持的命令包括 /start、/new、/status、/models、/memory、/help。
小结
DeerFlow 的 IM 渠道系统展现了优秀的架构设计:Channel 抽象基类定义了统一接口,MessageBus 通过异步 Queue 和回调列表实现了入站/出站消息的解耦,ChannelService 负责生命周期管理和延迟导入。三个渠道实现(Telegram、Slack、飞书)都采用了无需公网 IP 的连接方式(长轮询、Socket Mode、WebSocket),并各自处理了平台特有的消息格式、表情反应和文件上传逻辑。整个系统的扩展非常简单——只需继承 Channel、实现三个抽象方法、在 _CHANNEL_REGISTRY 中注册,即可接入新的 IM 平台。
