侧胥咽 发表于 2025-11-12 23:20:02

Spring AI Alibaba 项目源码学习(三)-Graph 执行流程分析

Graph 执行流程分析

概述

本文档分析 spring-ai-alibaba-graph-core 模块中 Graph 的执行流程,包括执行器(Executor)、调度机制、Checkpoint 机制和状态管理。
入口类说明

GraphRunner - 执行入口

GraphRunner 是基于 Project Reactor 的响应式图执行引擎,是执行 Graph 的主要入口。
核心职责:

[*]封装编译后的图和执行配置
[*]提供响应式执行接口(返回 Flux)
[*]委托给 MainGraphExecutor 执行实际流程
关键代码:
public class GraphRunner {

        private final CompiledGraph compiledGraph;

        private final RunnableConfig config;

        private final AtomicReference<Object> resultValue = new AtomicReference<>();

        // Handler for main execution flow - demonstrates encapsulation
        private final MainGraphExecutor mainGraphExecutor;

        public GraphRunner(CompiledGraph compiledGraph, RunnableConfig config) {
                this.compiledGraph = compiledGraph;
                this.config = config;
                // Initialize the main execution handler - demonstrates encapsulation
                this.mainGraphExecutor = new MainGraphExecutor();
        }

        public Flux<GraphResponse<NodeOutput>> run(OverAllState initialState) {
                return Flux.defer(() -> {
                        try {
                                GraphRunnerContext context = new GraphRunnerContext(initialState, config, compiledGraph);
                                // Delegate to the main execution handler - demonstrates polymorphism
                                return mainGraphExecutor.execute(context, resultValue);
                        }
                        catch (Exception e) {
                                return Flux.error(e);
                        }
                });
        }MainGraphExecutor - 主执行器

MainGraphExecutor 是主执行流程处理器,继承自 BaseGraphExecutor,负责协调整个图的执行。
核心职责:

[*]处理开始节点和结束节点
[*]管理迭代次数和中断逻辑
[*]协调节点执行和边路由
[*]处理 Checkpoint 和恢复
关键代码:
public class MainGraphExecutor extends BaseGraphExecutor {

        private final NodeExecutor nodeExecutor;

        public MainGraphExecutor() {
                this.nodeExecutor = new NodeExecutor(this);
        }

        /**
       * Implementation of the execute method. This demonstrates polymorphism as it provides
       * a specific implementation for main execution flow.
       * @param context the graph runner context
       * @param resultValue the atomic reference to store the result value
       * @return Flux of GraphResponse with execution result
       */
        @Override
        public Flux<GraphResponse<NodeOutput>> execute(GraphRunnerContext context, AtomicReference<Object> resultValue) {
                try {
                        if (context.shouldStop() || context.isMaxIterationsReached()) {
                                return handleCompletion(context, resultValue);
                        }

                        final var returnFromEmbed = context.getReturnFromEmbedAndReset();
                        if (returnFromEmbed.isPresent()) {
                                var interruption = returnFromEmbed.get().value(new TypeRef<InterruptionMetadata>() {
                                });
                                if (interruption.isPresent()) {
                                        return Flux.just(GraphResponse.done(interruption.get()));
                                }
                                return Flux.just(GraphResponse.done(context.buildCurrentNodeOutput()));
                        }

                        if (context.getCurrentNodeId() != null && context.getConfig().isInterrupted(context.getCurrentNodeId())) {
                                context.getConfig().withNodeResumed(context.getCurrentNodeId());
                                return Flux.just(GraphResponse.done(GraphResponse.done(context.getCurrentStateData())));
                        }

                        if (context.isStartNode()) {
                                return handleStartNode(context);
                        }

                        if (context.isEndNode()) {
                                return handleEndNode(context, resultValue);
                        }

                        final var resumeFrom = context.getResumeFromAndReset();
                        if (resumeFrom.isPresent()) {
                                if (context.getCompiledGraph().compileConfig.interruptBeforeEdge()
                                                && java.util.Objects.equals(context.getNextNodeId(), INTERRUPT_AFTER)) {
                                        var nextNodeCommand = context.nextNodeId(resumeFrom.get(), context.getCurrentStateData());
                                        context.setNextNodeId(nextNodeCommand.gotoNode());
                                        context.setCurrentNodeId(null);
                                }
                        }

                        if (context.shouldInterrupt()) {
                                try {
                                        InterruptionMetadata metadata = InterruptionMetadata
                                                .builder(context.getCurrentNodeId(), context.cloneState(context.getCurrentStateData()))
                                                .build();
                                        return Flux.just(GraphResponse.done(metadata));NodeExecutor - 节点执行器

NodeExecutor 负责执行单个节点,处理节点动作的执行和结果处理。
核心职责:

[*]执行节点动作(NodeAction)
[*]处理中断逻辑(InterruptableAction)
[*]处理流式输出(StreamingOutput)
[*]更新状态并确定下一个节点
关键代码:
public class NodeExecutor extends BaseGraphExecutor {

        private final MainGraphExecutor mainGraphExecutor;

        public NodeExecutor(MainGraphExecutor mainGraphExecutor) {
                this.mainGraphExecutor = mainGraphExecutor;
        }

        /**
       * Implementation of the execute method. This demonstrates polymorphism as it provides
       * a specific implementation for node execution.
       * @param context the graph runner context
       * @param resultValue the atomic reference to store the result value
       * @return Flux of GraphResponse with execution result
       */
        @Override
        public Flux<GraphResponse<NodeOutput>> execute(GraphRunnerContext context, AtomicReference<Object> resultValue) {
                return executeNode(context, resultValue);
        }

        /**
       * Executes a node and handles its result.
       * @param context the graph runner context
       * @param resultValue the atomic reference to store the result value
       * @return Flux of GraphResponse with node execution result
       */
        private Flux<GraphResponse<NodeOutput>> executeNode(GraphRunnerContext context,
                        AtomicReference<Object> resultValue) {
                try {
                        context.setCurrentNodeId(context.getNextNodeId());
                        String currentNodeId = context.getCurrentNodeId();
                        AsyncNodeActionWithConfig action = context.getNodeAction(currentNodeId);

                        if (action == null) {
                                return Flux.just(GraphResponse.error(RunnableErrors.missingNode.exception(currentNodeId)));
                        }

                        if (action instanceof InterruptableAction) {
                                context.getConfig().metadata(RunnableConfig.STATE_UPDATE_METADATA_KEY).ifPresent(updateFromFeedback -> {
                                        if (updateFromFeedback instanceof Map<?, ?>) {
                                                context.mergeIntoCurrentState((Map<String, Object>) updateFromFeedback);
                                        } else {
                                                throw new RuntimeException();
                                        }
                                });
                                Optional<InterruptionMetadata> interruptMetadata = ((InterruptableAction) action)
                                        .interrupt(currentNodeId, context.cloneState(context.getCurrentStateData()), context.getConfig());
                                if (interruptMetadata.isPresent()) {GraphRunnerContext - 执行上下文

GraphRunnerContext 管理图执行过程中的状态和上下文信息。
核心职责:

[*]管理当前节点和下一个节点
[*]管理迭代次数
[*]处理 Checkpoint 和恢复
[*]管理状态更新
关键代码:
public class GraphRunnerContext {

        public static final String INTERRUPT_AFTER = "__INTERRUPTED__";

        private static final Logger log = LoggerFactory.getLogger(GraphRunner.class);

        final CompiledGraph compiledGraph;

        final AtomicInteger iteration = new AtomicInteger(0);

        OverAllState overallState;

        RunnableConfig config;

        String currentNodeId;

        String nextNodeId;

        String resumeFrom;

        ReturnFromEmbed returnFromEmbed;

        public GraphRunnerContext(OverAllState initialState, RunnableConfig config, CompiledGraph compiledGraph)
                        throws Exception {
                this.compiledGraph = compiledGraph;
                this.config = config;

                if (config.metadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY).isPresent()) {
                        initializeFromResume(initialState, config);
                } else {
                        initializeFromStart(initialState, config);
                }
        }

        private void initializeFromResume(OverAllState initialState, RunnableConfig config) {
                log.trace("RESUME REQUEST");

                var saver = compiledGraph.compileConfig.checkpointSaver()
                        .orElseThrow(() -> new IllegalStateException("Resume request without a configured checkpoint saver!"));
                var checkpoint = saver.get(config)
                        .orElseThrow(() -> new IllegalStateException("Resume request without a valid checkpoint!"));执行流程时序图

以下 PlantUML 时序图展示了 Graph 的完整执行流程:

Checkpoint 机制

BaseCheckpointSaver - Checkpoint 保存器接口

BaseCheckpointSaver 定义了 Checkpoint 保存和恢复的接口。
关键代码:
public interface BaseCheckpointSaver {

        String THREAD_ID_DEFAULT = "$default";

        record Tag(String threadId, Collection<Checkpoint> checkpoints) {
                public Tag(String threadId, Collection<Checkpoint> checkpoints) {
                        this.threadId = threadId;
                        this.checkpoints = ofNullable(checkpoints).map(List::copyOf).orElseGet(List::of);
                }
        }

        default Tag release(RunnableConfig config) throws Exception {
                return null;
        }

        Collection<Checkpoint> list(RunnableConfig config);

        Optional<Checkpoint> get(RunnableConfig config);

        RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;

        boolean clear(RunnableConfig config);

        default Optional<Checkpoint> getLast(LinkedList<Checkpoint> checkpoints, RunnableConfig config) {
                return (checkpoints == null || checkpoints.isEmpty()) ? Optional.empty() : ofNullable(checkpoints.peek());
        }

        default LinkedList<Checkpoint> getLinkedList(List<Checkpoint> checkpoints) {
                return Objects.nonNull(checkpoints) ? new LinkedList<>(checkpoints) : new LinkedList<>();
        }

}Checkpoint 流程


[*]保存 Checkpoint:在执行节点后,通过 GraphRunnerContext.addCheckpoint() 保存当前状态
[*]恢复 Checkpoint:通过 RunnableConfig 中的 checkpoint ID 恢复执行
[*]释放 Checkpoint:执行完成后释放 Checkpoint 资源
状态管理

OverAllState 更新机制

状态更新通过 KeyStrategy 控制:

[*]ReplaceStrategy:替换策略,新值完全替换旧值
[*]AppendStrategy:追加策略,新值追加到列表
[*]Reducer:归约策略,使用自定义函数合并值
状态更新流程:

[*]节点执行返回 Map 更新
[*]GraphRunnerContext 根据 KeyStrategy 合并更新
[*]更新后的状态传递给下一个节点
实现关键点说明

1. 响应式编程

使用 Project Reactor 的 Flux 实现响应式执行:

[*]支持流式输出
[*]支持背压控制
[*]支持异步执行
2. 模板方法模式

BaseGraphExecutor 定义执行框架,子类实现具体逻辑:

[*]MainGraphExecutor:主执行流程
[*]NodeExecutor:节点执行流程
3. 上下文模式

GraphRunnerContext 封装执行上下文:

[*]管理当前执行状态
[*]提供状态访问接口
[*]处理 Checkpoint 和恢复
4. 中断和恢复机制

支持执行中断和恢复:

[*]InterruptableAction:可中断的动作
[*]InterruptionMetadata:中断元数据
[*]Checkpoint 保存和恢复
5. 迭代控制

通过 maxIterations 控制最大迭代次数,防止无限循环。
总结说明

核心执行流程


[*]初始化:创建 GraphRunnerContext,初始化状态
[*]开始执行:从 START 节点开始
[*]节点执行:执行当前节点,更新状态
[*]边路由:根据 EdgeAction 确定下一个节点
[*]Checkpoint:保存执行状态(可选)
[*]迭代:重复步骤 3-5,直到到达 END 节点
[*]完成:返回最终结果
关键设计特点


[*]响应式:基于 Reactor 的响应式执行
[*]可中断:支持执行中断和恢复
[*]状态管理:灵活的状态更新策略
[*]可观测:支持监控和追踪
[*]容错性:错误处理和恢复机制
执行器层次结构

BaseGraphExecutor (抽象基类)
    ├── MainGraphExecutor (主执行器)
    └── NodeExecutor (节点执行器)
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

卢莹洁 发表于 2025-11-28 09:10:01

感谢分享

讣丢 发表于 昨天 11:44

yyds。多谢分享
页: [1]
查看完整版本: Spring AI Alibaba 项目源码学习(三)-Graph 执行流程分析