找回密码
 立即注册
首页 业界区 安全 Spring AI Alibaba 项目源码学习(三)-Graph 执行流程 ...

Spring AI Alibaba 项目源码学习(三)-Graph 执行流程分析

侧胥咽 2025-11-12 23:20:02
Graph 执行流程分析

概述

本文档分析 spring-ai-alibaba-graph-core 模块中 Graph 的执行流程,包括执行器(Executor)、调度机制、Checkpoint 机制和状态管理。
入口类说明

GraphRunner - 执行入口

GraphRunner 是基于 Project Reactor 的响应式图执行引擎,是执行 Graph 的主要入口。
核心职责

  • 封装编译后的图和执行配置
  • 提供响应式执行接口(返回 Flux)
  • 委托给 MainGraphExecutor 执行实际流程
关键代码
  1. public class GraphRunner {
  2.         private final CompiledGraph compiledGraph;
  3.         private final RunnableConfig config;
  4.         private final AtomicReference<Object> resultValue = new AtomicReference<>();
  5.         // Handler for main execution flow - demonstrates encapsulation
  6.         private final MainGraphExecutor mainGraphExecutor;
  7.         public GraphRunner(CompiledGraph compiledGraph, RunnableConfig config) {
  8.                 this.compiledGraph = compiledGraph;
  9.                 this.config = config;
  10.                 // Initialize the main execution handler - demonstrates encapsulation
  11.                 this.mainGraphExecutor = new MainGraphExecutor();
  12.         }
  13.         public Flux<GraphResponse<NodeOutput>> run(OverAllState initialState) {
  14.                 return Flux.defer(() -> {
  15.                         try {
  16.                                 GraphRunnerContext context = new GraphRunnerContext(initialState, config, compiledGraph);
  17.                                 // Delegate to the main execution handler - demonstrates polymorphism
  18.                                 return mainGraphExecutor.execute(context, resultValue);
  19.                         }
  20.                         catch (Exception e) {
  21.                                 return Flux.error(e);
  22.                         }
  23.                 });
  24.         }
复制代码
MainGraphExecutor - 主执行器

MainGraphExecutor 是主执行流程处理器,继承自 BaseGraphExecutor,负责协调整个图的执行。
核心职责

  • 处理开始节点和结束节点
  • 管理迭代次数和中断逻辑
  • 协调节点执行和边路由
  • 处理 Checkpoint 和恢复
关键代码
  1. public class MainGraphExecutor extends BaseGraphExecutor {
  2.         private final NodeExecutor nodeExecutor;
  3.         public MainGraphExecutor() {
  4.                 this.nodeExecutor = new NodeExecutor(this);
  5.         }
  6.         /**
  7.          * Implementation of the execute method. This demonstrates polymorphism as it provides
  8.          * a specific implementation for main execution flow.
  9.          * @param context the graph runner context
  10.          * @param resultValue the atomic reference to store the result value
  11.          * @return Flux of GraphResponse with execution result
  12.          */
  13.         @Override
  14.         public Flux<GraphResponse<NodeOutput>> execute(GraphRunnerContext context, AtomicReference<Object> resultValue) {
  15.                 try {
  16.                         if (context.shouldStop() || context.isMaxIterationsReached()) {
  17.                                 return handleCompletion(context, resultValue);
  18.                         }
  19.                         final var returnFromEmbed = context.getReturnFromEmbedAndReset();
  20.                         if (returnFromEmbed.isPresent()) {
  21.                                 var interruption = returnFromEmbed.get().value(new TypeRef<InterruptionMetadata>() {
  22.                                 });
  23.                                 if (interruption.isPresent()) {
  24.                                         return Flux.just(GraphResponse.done(interruption.get()));
  25.                                 }
  26.                                 return Flux.just(GraphResponse.done(context.buildCurrentNodeOutput()));
  27.                         }
  28.                         if (context.getCurrentNodeId() != null && context.getConfig().isInterrupted(context.getCurrentNodeId())) {
  29.                                 context.getConfig().withNodeResumed(context.getCurrentNodeId());
  30.                                 return Flux.just(GraphResponse.done(GraphResponse.done(context.getCurrentStateData())));
  31.                         }
  32.                         if (context.isStartNode()) {
  33.                                 return handleStartNode(context);
  34.                         }
  35.                         if (context.isEndNode()) {
  36.                                 return handleEndNode(context, resultValue);
  37.                         }
  38.                         final var resumeFrom = context.getResumeFromAndReset();
  39.                         if (resumeFrom.isPresent()) {
  40.                                 if (context.getCompiledGraph().compileConfig.interruptBeforeEdge()
  41.                                                 && java.util.Objects.equals(context.getNextNodeId(), INTERRUPT_AFTER)) {
  42.                                         var nextNodeCommand = context.nextNodeId(resumeFrom.get(), context.getCurrentStateData());
  43.                                         context.setNextNodeId(nextNodeCommand.gotoNode());
  44.                                         context.setCurrentNodeId(null);
  45.                                 }
  46.                         }
  47.                         if (context.shouldInterrupt()) {
  48.                                 try {
  49.                                         InterruptionMetadata metadata = InterruptionMetadata
  50.                                                 .builder(context.getCurrentNodeId(), context.cloneState(context.getCurrentStateData()))
  51.                                                 .build();
  52.                                         return Flux.just(GraphResponse.done(metadata));
复制代码
NodeExecutor - 节点执行器

NodeExecutor 负责执行单个节点,处理节点动作的执行和结果处理。
核心职责

  • 执行节点动作(NodeAction)
  • 处理中断逻辑(InterruptableAction)
  • 处理流式输出(StreamingOutput)
  • 更新状态并确定下一个节点
关键代码
  1. public class NodeExecutor extends BaseGraphExecutor {
  2.         private final MainGraphExecutor mainGraphExecutor;
  3.         public NodeExecutor(MainGraphExecutor mainGraphExecutor) {
  4.                 this.mainGraphExecutor = mainGraphExecutor;
  5.         }
  6.         /**
  7.          * Implementation of the execute method. This demonstrates polymorphism as it provides
  8.          * a specific implementation for node execution.
  9.          * @param context the graph runner context
  10.          * @param resultValue the atomic reference to store the result value
  11.          * @return Flux of GraphResponse with execution result
  12.          */
  13.         @Override
  14.         public Flux<GraphResponse<NodeOutput>> execute(GraphRunnerContext context, AtomicReference<Object> resultValue) {
  15.                 return executeNode(context, resultValue);
  16.         }
  17.         /**
  18.          * Executes a node and handles its result.
  19.          * @param context the graph runner context
  20.          * @param resultValue the atomic reference to store the result value
  21.          * @return Flux of GraphResponse with node execution result
  22.          */
  23.         private Flux<GraphResponse<NodeOutput>> executeNode(GraphRunnerContext context,
  24.                         AtomicReference<Object> resultValue) {
  25.                 try {
  26.                         context.setCurrentNodeId(context.getNextNodeId());
  27.                         String currentNodeId = context.getCurrentNodeId();
  28.                         AsyncNodeActionWithConfig action = context.getNodeAction(currentNodeId);
  29.                         if (action == null) {
  30.                                 return Flux.just(GraphResponse.error(RunnableErrors.missingNode.exception(currentNodeId)));
  31.                         }
  32.                         if (action instanceof InterruptableAction) {
  33.                                 context.getConfig().metadata(RunnableConfig.STATE_UPDATE_METADATA_KEY).ifPresent(updateFromFeedback -> {
  34.                                         if (updateFromFeedback instanceof Map<?, ?>) {
  35.                                                 context.mergeIntoCurrentState((Map<String, Object>) updateFromFeedback);
  36.                                         } else {
  37.                                                 throw new RuntimeException();
  38.                                         }
  39.                                 });
  40.                                 Optional<InterruptionMetadata> interruptMetadata = ((InterruptableAction) action)
  41.                                         .interrupt(currentNodeId, context.cloneState(context.getCurrentStateData()), context.getConfig());
  42.                                 if (interruptMetadata.isPresent()) {
复制代码
GraphRunnerContext - 执行上下文

GraphRunnerContext 管理图执行过程中的状态和上下文信息。
核心职责

  • 管理当前节点和下一个节点
  • 管理迭代次数
  • 处理 Checkpoint 和恢复
  • 管理状态更新
关键代码
  1. public class GraphRunnerContext {
  2.         public static final String INTERRUPT_AFTER = "__INTERRUPTED__";
  3.         private static final Logger log = LoggerFactory.getLogger(GraphRunner.class);
  4.         final CompiledGraph compiledGraph;
  5.         final AtomicInteger iteration = new AtomicInteger(0);
  6.         OverAllState overallState;
  7.         RunnableConfig config;
  8.         String currentNodeId;
  9.         String nextNodeId;
  10.         String resumeFrom;
  11.         ReturnFromEmbed returnFromEmbed;
  12.         public GraphRunnerContext(OverAllState initialState, RunnableConfig config, CompiledGraph compiledGraph)
  13.                         throws Exception {
  14.                 this.compiledGraph = compiledGraph;
  15.                 this.config = config;
  16.                 if (config.metadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY).isPresent()) {
  17.                         initializeFromResume(initialState, config);
  18.                 } else {
  19.                         initializeFromStart(initialState, config);
  20.                 }
  21.         }
  22.         private void initializeFromResume(OverAllState initialState, RunnableConfig config) {
  23.                 log.trace("RESUME REQUEST");
  24.                 var saver = compiledGraph.compileConfig.checkpointSaver()
  25.                         .orElseThrow(() -> new IllegalStateException("Resume request without a configured checkpoint saver!"));
  26.                 var checkpoint = saver.get(config)
  27.                         .orElseThrow(() -> new IllegalStateException("Resume request without a valid checkpoint!"));
复制代码
执行流程时序图

以下 PlantUML 时序图展示了 Graph 的完整执行流程:
1.png

Checkpoint 机制

BaseCheckpointSaver - Checkpoint 保存器接口

BaseCheckpointSaver 定义了 Checkpoint 保存和恢复的接口。
关键代码
  1. public interface BaseCheckpointSaver {
  2.         String THREAD_ID_DEFAULT = "$default";
  3.         record Tag(String threadId, Collection<Checkpoint> checkpoints) {
  4.                 public Tag(String threadId, Collection<Checkpoint> checkpoints) {
  5.                         this.threadId = threadId;
  6.                         this.checkpoints = ofNullable(checkpoints).map(List::copyOf).orElseGet(List::of);
  7.                 }
  8.         }
  9.         default Tag release(RunnableConfig config) throws Exception {
  10.                 return null;
  11.         }
  12.         Collection<Checkpoint> list(RunnableConfig config);
  13.         Optional<Checkpoint> get(RunnableConfig config);
  14.         RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;
  15.         boolean clear(RunnableConfig config);
  16.         default Optional<Checkpoint> getLast(LinkedList<Checkpoint> checkpoints, RunnableConfig config) {
  17.                 return (checkpoints == null || checkpoints.isEmpty()) ? Optional.empty() : ofNullable(checkpoints.peek());
  18.         }
  19.         default LinkedList<Checkpoint> getLinkedList(List<Checkpoint> checkpoints) {
  20.                 return Objects.nonNull(checkpoints) ? new LinkedList<>(checkpoints) : new LinkedList<>();
  21.         }
  22. }
复制代码
Checkpoint 流程


  • 保存 Checkpoint:在执行节点后,通过 GraphRunnerContext.addCheckpoint() 保存当前状态
  • 恢复 Checkpoint:通过 RunnableConfig 中的 checkpoint ID 恢复执行
  • 释放 Checkpoint:执行完成后释放 Checkpoint 资源
状态管理

OverAllState 更新机制

状态更新通过 KeyStrategy 控制:

  • ReplaceStrategy:替换策略,新值完全替换旧值
  • AppendStrategy:追加策略,新值追加到列表
  • Reducer:归约策略,使用自定义函数合并值
状态更新流程:

  • 节点执行返回 Map 更新
  • GraphRunnerContext 根据 KeyStrategy 合并更新
  • 更新后的状态传递给下一个节点
实现关键点说明

1. 响应式编程

使用 Project Reactor 的 Flux 实现响应式执行:

  • 支持流式输出
  • 支持背压控制
  • 支持异步执行
2. 模板方法模式

BaseGraphExecutor 定义执行框架,子类实现具体逻辑:

  • MainGraphExecutor:主执行流程
  • NodeExecutor:节点执行流程
3. 上下文模式

GraphRunnerContext 封装执行上下文:

  • 管理当前执行状态
  • 提供状态访问接口
  • 处理 Checkpoint 和恢复
4. 中断和恢复机制

支持执行中断和恢复:

  • InterruptableAction:可中断的动作
  • InterruptionMetadata:中断元数据
  • Checkpoint 保存和恢复
5. 迭代控制

通过 maxIterations 控制最大迭代次数,防止无限循环。
总结说明

核心执行流程


  • 初始化:创建 GraphRunnerContext,初始化状态
  • 开始执行:从 START 节点开始
  • 节点执行:执行当前节点,更新状态
  • 边路由:根据 EdgeAction 确定下一个节点
  • Checkpoint:保存执行状态(可选)
  • 迭代:重复步骤 3-5,直到到达 END 节点
  • 完成:返回最终结果
关键设计特点


  • 响应式:基于 Reactor 的响应式执行
  • 可中断:支持执行中断和恢复
  • 状态管理:灵活的状态更新策略
  • 可观测:支持监控和追踪
  • 容错性:错误处理和恢复机制
执行器层次结构
  1. BaseGraphExecutor (抽象基类)
  2.     ├── MainGraphExecutor (主执行器)
  3.     └── NodeExecutor (节点执行器)
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册