找回密码
 立即注册
首页 业界区 业界 AI Agent 框架探秘:拆解 OpenHands(4)--- 服务 ...

AI Agent 框架探秘:拆解 OpenHands(4)--- 服务

腥狩频 4 天前
AI Agent 框架探秘:拆解 OpenHands(4)--- 服务


目录

  • AI Agent 框架探秘:拆解 OpenHands(4)--- 服务

    • 0x00 概述
    • 0x01 服务

      • 1.1 API 模式

        • 1.1.1 Actions
        • 1.1.2 observation

      • 1.2 服务器组件

        • session.py
        • session/agent_session.py
        • session/conversation_manager/conversation_manager.py
        • listen.py

      • 1.3 服务工作流程描述
      • 1.4 listen_socket.py

        • 1.4.1 核心特色
        • 1.4.2 具体功能
        • 1.4.3 流程图
        • 1.4.4 会话连接
        • 1.4.5 代码


    • 0xFF 参考


0x00 概述

本篇结合官方文档进行解读OpenHands的服务器,这是OpenHands系统的立身基础。
因为本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
0x01 服务

OpenHands提供了WebSocket服务器。
1.jpeg

1.1 API 模式

可以发送或从服务器接收两种类型的消息:

  • Actions
  • Observations
1.1.1 Actions

一个action 包含三个部分:

  • action:要采取的动作
  • args:动作的参数
  • message:可以放在聊天记录中的友好消息
有几种action 。它们的参数列在下面。 随着时间的推移,这个列表可能会增长。

  • initialize - 初始化代理。仅由客户端发送。

    • model - 要使用的模型名称
    • directory - 工作空间的路径
    • agent_cls - 要使用的代理类

  • start - 开始一个新的开发任务。仅由客户端发送。

    • task - 要开始的任务

  • read - 读取文件内容。

    • path - 要读取的文件路径

  • write - 写入内容到文件。

    • path - 要写入的文件路径
    • content - 写入文件的内容

  • run - 运行命令。

    • command - 要运行的命令



  • browse - 打开网页。

    • url - 要打开的URL

  • think - 允许代理制定计划、设定目标或记录想法

    • thought - 要记录的想法

  • finish - 代理发出任务完成的信号
1.1.2 observation

一个observation 包含四个部分:

  • observation:观察类型
  • content:表示观察数据的字符串
  • extras:额外的结构化数据
  • message:可以放在聊天记录中的友好消息
有几种observation 。它们的额外信息列在下面。 随着时间的推移,这个列表可能会增长。

  • read - 文件内容

    • path - 读取的文件路径

  • browse - URL的HTML内容

    • url - 打开的URL

  • run - 命令的输出

    • command - 运行的命令
    • exit_code - 命令的退出代码

  • chat - 用户的消息
1.2 服务器组件

以下部分描述了OpenHands项目的服务器端组件。
session.py

session.py 文件定义了Session类,它代表与客户端的WebSocket会话。关键特性包括:

  • 处理WebSocket连接和断开
  • 初始化和管理代理会话
  • 在客户端和代理之间分发事件
  • 向客户端发送消息和错误
session/agent_session.py

agent_session.py 文件包含AgentSession类,它管理会话内代理的生命周期。关键特性包括:

  • 创建和管理运行时环境
  • 初始化代理控制器
  • 处理安全分析
  • 管理事件流
session/conversation_manager/conversation_manager.py

conversation_manager.py 文件定义了ConversationManager类,它负责管理多个客户端会话。关键特性包括:

  • 添加和重启会话
  • 向特定会话发送消息
  • 清理非活动会话
listen.py

listen.py 文件是主服务器文件,它设置FastAPI应用程序并定义各种API端点。关键特性包括:

  • 设置CORS中间件
  • 处理WebSocket连接
  • 管理文件上传
  • 提供代理交互、文件操作和安全分析的API端点
  • 为前端提供静态文件服务
该脚本定义了服务接口,主要分为两个部分:

  • 一部分是通过FastAPI库实现的HTTP接口,其具体实现位于openhands/server/routes目录中;
  • 另一部分是利用socketio库实现的WebSocket接口,其代码实现在openhands/server/listen_socket.py文件中。用户与代理的交互通过WebSocket进行,连接初始化时会触发connect事件,用户发送消息时会触发oh_user_action事件,连接断开时会触发disconnect事件。因此,梳理代理交互逻辑的核心在于对这三个事件的处理流程进行整理。
  1. import socketio
  2. from openhands.server.app import app as base_app
  3. from openhands.server.listen_socket import sio
  4. from openhands.server.middleware import (
  5.     CacheControlMiddleware,
  6.     InMemoryRateLimiter,
  7.     LocalhostCORSMiddleware,
  8.     RateLimitMiddleware,
  9. )
  10. from openhands.server.static import SPAStaticFiles
  11. if os.getenv('SERVE_FRONTEND', 'true').lower() == 'true':
  12.     base_app.mount(
  13.         '/', SPAStaticFiles(directory='./frontend/build', html=True), name='dist'
  14.     )
  15. base_app.add_middleware(LocalhostCORSMiddleware)
  16. base_app.add_middleware(CacheControlMiddleware)
  17. base_app.add_middleware(
  18.     RateLimitMiddleware,
  19.     rate_limiter=InMemoryRateLimiter(requests=10, seconds=1),
  20. )
  21. app = socketio.ASGIApp(sio, other_asgi_app=base_app)
复制代码
1.3 服务工作流程描述

服务的工作流程如下:

  • 服务器初始化

    • FastAPI应用程序在listen.py中创建和配置。
    • 设置CORS中间件和静态文件服务。
    • 初始化ConversationManager。

  • 客户端连接

    • 当客户端通过WebSocket连接时,创建新的Session或重启现有一个。
    • Session初始化AgentSession,设置运行时环境和代理控制器。

  • 代理初始化

    • 客户端发送初始化请求。
    • 服务器根据提供的参数创建和配置代理。
    • 设置运行时环境,初始化代理控制器。

  • 事件处理

    • Session管理客户端和代理之间的事件流。
    • 客户端的事件分发到代理。
    • 代理的观察结果发送回客户端。

  • 文件操作

    • 服务器处理文件上传,确保它们符合大小和类型限制。
    • 通过运行时环境执行文件读取和写入操作。

  • 安全分析

    • 如果配置了,每个会话初始化安全分析器。
    • 安全相关的API请求转发到安全分析器。

  • 会话管理

    • ConversationManager定期清理非活动会话。
    • 它还在需要时处理向特定会话发送消息。

  • API端点

    • 提供各种API端点,用于代理交互、文件操作和获取配置默认值。

这种服务器架构允许管理多个客户端会话,每个会话都有自己的代理实例、运行时环境和安全分析器。事件驱动设计促进了客户端和代理之间的实时通信,而模块化结构允许轻松扩展和维护不同组件。
1.4 listen_socket.py

listen_socket.py是 OpenHands 服务器端的 Socket.IO 事件监听器,负责处理客户端和服务器之间的实时双向通信,包括连接建立、事件回放、用户行动转发和连接断开四大核心场景,是客户端与后端会话、代理系统交互的桥梁。
1.4.1 核心特色

listen_socket.py的核心特色如下:

  • 断点续传的事件回放:支持通过 latest_event_id 参数实现事件断点续传,客户端重连时仅回放未接收的事件,避免重复数据传输,提升连接效率。
  • 严格的身份与权限校验:连接建立时校验会话 ID、API 密钥、用户身份(通过 Cookie 和 Authorization 头),确保会话安全性,防止未授权访问。
  • 向后兼容的事件处理:保留 oh_action 处理器兼容旧版客户端,同时提供 oh_user_action 新版接口,平滑过渡不中断服务。
  • 有序的事件推送逻辑:代理状态变更事件(AgentStateChangedObservation)最后发送,确保客户端先接收历史事件,再同步最新状态,避免状态不一致。
  • 异步高效的事件处理:基于异步 IO(async/await)实现事件回放和转发,支持高并发连接,不阻塞主线程,提升系统吞吐量。
  • 完善的错误处理:连接失败时主动断开无效连接,记录详细日志,便于问题排查;过滤无效事件(如 NullAction),减少不必要的网络传输。
1.4.2 具体功能

listen_socket.py的具体功能如下:

  • 连接管理(connect 事件)

    • 身份验证:验证连接参数中的 conversation_id 和 API 密钥
    • 用户认证:通过 conversation_validator 验证用户身份
    • 会话恢复:为已存在的会话重放事件流历史
    • 事件重播:向新连接的客户端发送历史事件,包括过滤特定事件类型
    • 会话加入:将客户端连接加入到对应的会话中

  • 动作处理(oh_user_action 和 oh_action 事件)

    • 用户动作接收:处理来自客户端的用户操作请求
    • 事件转发:将用户动作转发到会话管理器进行处理
    • 向后兼容:同时支持 oh_user_action 和 oh_action 事件处理(后者为兼容旧客户端保留)

  • 断开连接处理(disconnect 事件)

    • 连接清理:当客户端断开连接时,清理相关会话资源
    • 状态管理:通知会话管理器客户端已断开连接

listen_socket.py的核心工作流程为:

  • 连接建立:

    • 解析查询参数(会话 ID、最新事件 ID等)
    • 验证会话和用户身份
    • 创建事件存储实例

  • 事件历史重播:

    • 为客户端重放会话历史事件
    • 过滤掉 NullAction、NullObservation、RecallAction 等特定事件
    • 确保 AgentStateChangedObservation 事件最后发送

  • 会话加入:

    • 将连接 ID 与会话关联
    • 初始化会话设置

  • 安全机制

    • API密钥验证:检查 SESSION_API_KEY 环境变量与查询参数中的密钥是否匹配
    • 会话权限控制:通过 conversation_validator 验证用户是否有权访问定会话

  • 错误处理

    • 连接拒绝:在验证失败或出现错误时拒绝连接
    • 异常传播:使用ConnectionRefusedError处理连接错误
    • 异步清理:在连接被拒绝后异步断开连接

listen_socket.py 与其他组件关系

  • 与EventStream紧密配合,负责事件的传输和分发
  • 通过 connection_manager  管理会话状态
  • 使用  event_to_dict  进行事件序列化以便通过网络传输
1.4.3 流程图

2.png

1.4.4 会话连接

此处关键一步为与会话管理器 ConversationManager 建立连接。
  1.         conversation_init_data = await setup_init_conversation_settings(
  2.             user_id, conversation_id, providers_set
  3.         )
  4.         agent_loop_info = await conversation_manager.join_conversation(
  5.             conversation_id,
  6.             connection_id,
  7.             conversation_init_data,
  8.             user_id,
  9.         )
复制代码
1.4.5 代码

listen_socket.py的代码举例如下:
  1. @sio.event
  2. async def connect(connection_id: str, environ: dict) -> None:
  3.     """
  4.     SocketIO连接事件处理器:客户端建立连接时触发,完成会话验证、事件回放、会话加入等初始化流程。
  5.    
  6.     参数:
  7.         connection_id: 客户端连接唯一标识(SocketIO分配)
  8.         environ: WSGI环境变量字典,包含请求头、查询参数等信息
  9.     """
  10.     try:
  11.         logger.info(f"SocketIO连接建立:connection_id={connection_id}")
  12.         
  13.         # 解析查询参数(从WSGI环境变量中提取QUERY_STRING)
  14.         query_params = parse_qs(environ.get('QUERY_STRING', ''))
  15.         
  16.         # 解析最新事件ID(用于断点续传,默认-1表示从最开始回放)
  17.         latest_event_id_str = query_params.get('latest_event_id', [-1])[0]
  18.         try:
  19.             latest_event_id = int(latest_event_id_str)
  20.         except ValueError:
  21.             logger.debug(f"无效的latest_event_id值:{latest_event_id_str},默认设为-1")
  22.             latest_event_id = -1
  23.         
  24.         # 解析会话ID(必需参数,用于关联特定对话)
  25.         conversation_id = query_params.get('conversation_id', [None])[0]
  26.         logger.info(f"会话连接请求:conversation_id={conversation_id}, connection_id={connection_id}")
  27.         
  28.         # 解析提供者集合(如支持的LLM提供商列表,用于限制可用资源)
  29.         raw_list = query_params.get('providers_set', [])
  30.         providers_list = []
  31.         for item in raw_list:
  32.             # 拆分逗号分隔的提供者名称,过滤空值
  33.             providers_list.extend(item.split(',') if isinstance(item, str) else [])
  34.         providers_list = [p for p in providers_list if p]
  35.         providers_set = [ProviderType(p) for p in providers_list]  # 转换为ProviderType枚举类型
  36.         # 校验会话ID是否存在
  37.         if not conversation_id:
  38.             logger.error("查询参数中缺少conversation_id")
  39.             raise ConnectionRefusedError("缺少会话ID(conversation_id)")
  40.         # 校验会话API密钥是否有效
  41.         if _invalid_session_api_key(query_params):
  42.             raise ConnectionRefusedError("无效的会话API密钥")
  43.         # 提取请求中的Cookie和Authorization头(用于用户身份验证)
  44.         cookies_str = environ.get('HTTP_COOKIE', '')
  45.         # WSGI环境中,HTTP头会转为"HTTP_前缀+下划线替换短横线"格式
  46.         authorization_header = environ.get('HTTP_AUTHORIZATION', None)
  47.         
  48.         # 创建会话验证器,校验用户身份(关联会话ID、Cookie、授权头)
  49.         conversation_validator = create_conversation_validator()
  50.         user_id = await conversation_validator.validate(
  51.             conversation_id, cookies_str, authorization_header
  52.         )
  53.         
  54.         # 创建事件存储实例(用于读取会话历史事件)
  55.         try:
  56.             event_store = EventStore(
  57.                 conversation_id, conversation_manager.file_store, user_id
  58.             )
  59.         except FileNotFoundError as e:
  60.             logger.error(f"创建会话事件存储失败:conversation_id={conversation_id}, 错误={e}")
  61.             raise ConnectionRefusedError(f"无法访问会话事件:{e}")
  62.         agent_state_changed = None  # 存储代理状态变更事件(最后单独发送)
  63.         # 创建异步事件存储包装器,从latest_event_id+1开始回放事件(避免重复)
  64.         async_store = AsyncEventStoreWrapper(event_store, latest_event_id + 1)
  65.         # 异步回放历史事件(向客户端推送未接收过的事件)
  66.         async for event in async_store:
  67.             logger.debug(f"回放事件:{event.__class__.__name__}")
  68.             # 跳过无效/召回类事件(无需推送给客户端)
  69.             if isinstance(
  70.                 event,
  71.                 (NullAction, NullObservation, RecallAction),
  72.             ):
  73.                 continue
  74.             # 暂存代理状态变更事件(最后发送,确保客户端状态同步)
  75.             elif isinstance(event, AgentStateChangedObservation):
  76.                 agent_state_changed = event
  77.             # 其他事件直接推送给客户端
  78.             else:
  79.                 await sio.emit('oh_event', event_to_dict(event), to=connection_id)
  80.         # 最后发送代理状态变更事件(确保客户端获取最新状态)
  81.         if agent_state_changed:
  82.             await sio.emit(
  83.                 'oh_event', event_to_dict(agent_state_changed), to=connection_id
  84.             )
  85.         logger.info(f"会话事件回放完成:conversation_id={conversation_id}")
  86.         # 初始化会话设置(用户偏好、提供者配置等)
  87.         conversation_init_data = await setup_init_conversation_settings(
  88.             user_id, conversation_id, providers_set
  89.         )
  90.         # 加入会话:关联connection_id与会话,启动代理循环
  91.         agent_loop_info = await conversation_manager.join_conversation(
  92.             conversation_id,
  93.             connection_id,
  94.             conversation_init_data,
  95.             user_id,
  96.         )
  97.         # 校验会话加入结果
  98.         if agent_loop_info is None:
  99.             raise ConnectionRefusedError("加入会话失败")
  100.         logger.info(f"会话加入成功:conversation_id={conversation_id}, connection_id={connection_id}")
  101.         
  102.     except ConnectionRefusedError:
  103.         # 发送错误后断开无效连接
  104.         asyncio.create_task(sio.disconnect(connection_id))
  105.         raise
  106. @sio.event
  107. async def oh_user_action(connection_id: str, data: dict[str, Any]) -> None:
  108.     """
  109.     处理客户端发送的用户行动事件(如用户输入、操作指令)。
  110.    
  111.     参数:
  112.         connection_id: 客户端连接ID
  113.         data: 用户行动数据(字典格式,包含行动类型、内容等)
  114.     """
  115.     # 将用户行动转发到事件流,由会话管理器处理
  116.     await conversation_manager.send_to_event_stream(connection_id, data)
  117. @sio.event
  118. async def oh_action(connection_id: str, data: dict[str, Any]) -> None:
  119.     """
  120.     兼容旧版客户端的行动事件处理器(保留用于向后兼容)。
  121.    
  122.     注意:待所有客户端升级为使用oh_user_action后,可移除该处理器
  123.     目前用于支持正在进行中的旧会话,避免中断服务
  124.     """
  125.     await conversation_manager.send_to_event_stream(connection_id, data)
  126. @sio.event
  127. async def disconnect(connection_id: str) -> None:
  128.     """
  129.     SocketIO断开连接事件处理器:客户端断开连接时触发。
  130.    
  131.     参数:
  132.         connection_id: 断开连接的客户端ID
  133.     """
  134.     logger.info(f"SocketIO连接断开:connection_id={connection_id}")
  135.     # 通知会话管理器,断开该连接与会话的关联
  136.     await conversation_manager.disconnect_from_session(connection_id)
复制代码
0xFF 参考

https://docs.all-hands.dev/openhands/usage/architecture/backend
当AI Agent从“玩具”走向“工具”,我们该关注什么?Openhands架构解析【第二篇:Agent 相关核心概念】  克里
当AI Agent从“玩具”走向“工具”,我们该关注什么?Openhands架构解析【第一篇:系列导读】 克里
Coding Agent之Openhands解析(含代码)  Arrow
OpenHands 源码解读  一力辉

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册