output = operator.apply(input);
+
+ // Then
+ StepVerifier.create(output)
+ .expectNext("value-1", "value-2", "value-3")
+ .verifyComplete();
+}
+```
+
+### 集成测试
+
+使用`@SpringBootTest`进行集成测试。
+
+## 文档规范
+
+### JavaDoc
+
+```java
+/**
+ * 数据源接口,所有Source实现必须实现此接口。
+ *
+ * DataSource负责从外部系统读取数据并转换为响应式流。
+ *
+ *
+ * @param 输出数据类型
+ * @author Your Name
+ * @since 1.0.0
+ */
+public interface DataSource {
+ // ...
+}
+```
+
+### Markdown文档
+
+- 使用清晰的标题层级
+- 添加代码示例
+- 包含必要的图表
+
+## 设计模式
+
+必须使用的模式:
+
+1. **Builder模式**: 复杂对象构建
+2. **Factory模式**: 组件创建
+3. **Strategy模式**: 算法选择
+4. **Observer模式**: 状态通知
+5. **Template方法**: 流程定义
+
+## 提交前检查清单
+
+- [ ] 代码遵循项目规范
+- [ ] 添加了必要的测试
+- [ ] 所有测试通过
+- [ ] 更新了相关文档
+- [ ] 提交信息清晰明确
+- [ ] 没有引入不必要的依赖
+- [ ] 代码通过了静态分析
+
+## 联系方式
+
+如有问题,请通过以下方式联系:
+
+- GitHub Issues
+- 邮件: etl-framework-team@example.com
+
+感谢你的贡献!
diff --git a/pipeline-framework/DESIGN_PATTERN_EXPLANATION.md b/pipeline-framework/DESIGN_PATTERN_EXPLANATION.md
new file mode 100644
index 000000000..dd291a535
--- /dev/null
+++ b/pipeline-framework/DESIGN_PATTERN_EXPLANATION.md
@@ -0,0 +1,527 @@
+# Pipeline Framework 设计模式详解
+
+## 📐 设计模式应用
+
+### 1. 策略模式(Strategy Pattern)
+
+**问题**:如何避免 switch case 来创建不同类型的组件?
+
+**解决方案**:使用策略模式 + Spring 依赖注入
+
+#### 之前的代码(使用 switch case):
+
+```java
+public Operator createOperator(OperatorType type, OperatorConfig config) {
+ switch (type) {
+ case FILTER:
+ return new FilterOperator(config);
+ case MAP:
+ return new MapOperator(config);
+ case AGGREGATE:
+ return new AggregateOperator(config);
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + type);
+ }
+}
+```
+
+**问题**:
+- 每增加一个类型,就要修改这个方法(违反开闭原则)
+- 代码耦合度高
+- 难以测试
+
+#### 现在的代码(使用策略模式):
+
+**步骤 1**: 定义策略接口
+
+```java
+public interface ComponentCreator {
+ Mono create(C config);
+ String getType();
+ int getOrder();
+}
+
+public interface OperatorCreator extends ComponentCreator, OperatorConfig> {
+}
+```
+
+**步骤 2**: 实现具体策略(每个类型一个)
+
+```java
+@Component // Spring 自动扫描
+public class FilterOperatorCreator implements OperatorCreator {
+
+ @Override
+ public Mono> create(OperatorConfig config) {
+ return Mono.fromCallable(() -> new FilterOperator<>(config));
+ }
+
+ @Override
+ public String getType() {
+ return "filter";
+ }
+}
+
+@Component
+public class MapOperatorCreator implements OperatorCreator {
+
+ @Override
+ public Mono> create(OperatorConfig config) {
+ return Mono.fromCallable(() -> new MapOperator<>(config));
+ }
+
+ @Override
+ public String getType() {
+ return "map";
+ }
+}
+```
+
+**步骤 3**: Spring 工厂自动注入所有策略
+
+```java
+@Component
+public class SpringOperatorFactory {
+
+ private final Map creatorMap;
+
+ // Spring 自动注入所有 OperatorCreator 实现
+ public SpringOperatorFactory(List creators) {
+ this.creatorMap = new ConcurrentHashMap<>();
+ for (OperatorCreator creator : creators) {
+ creatorMap.put(creator.getType(), creator);
+ }
+ }
+
+ public Mono> createOperator(OperatorConfig config) {
+ String type = config.getType().name().toLowerCase();
+ OperatorCreator creator = creatorMap.get(type);
+
+ if (creator == null) {
+ return Mono.error(new IllegalArgumentException("Unsupported type: " + type));
+ }
+
+ return creator.create(config);
+ }
+}
+```
+
+**优势**:
+- ✅ **开闭原则**:新增类型只需添加一个 `@Component` 类,无需修改工厂
+- ✅ **低耦合**:每个策略独立,互不影响
+- ✅ **易测试**:可以单独测试每个策略
+- ✅ **Spring 管理**:自动发现和注入
+
+---
+
+### 2. 工厂模式(Factory Pattern)+ Spring IoC
+
+**问题**:如何统一管理组件的创建?
+
+**解决方案**:工厂模式 + Spring 依赖注入
+
+```java
+@Component
+public class SpringSourceFactory {
+
+ private final Map creatorMap;
+
+ // Spring 自动注入所有 SourceCreator
+ public SpringSourceFactory(List creators) {
+ this.creatorMap = new ConcurrentHashMap<>();
+ for (SourceCreator creator : creators) {
+ creatorMap.put(creator.getType().toLowerCase(), creator);
+ }
+ }
+
+ public Mono> createSource(SourceConfig config) {
+ String type = config.getType().name().toLowerCase();
+ SourceCreator creator = creatorMap.get(type);
+ return creator.create(config);
+ }
+}
+```
+
+**使用示例**:
+
+```java
+@Component
+public class SpringGraphBasedPipelineBuilder {
+
+ private final SpringSourceFactory sourceFactory;
+ private final SpringSinkFactory sinkFactory;
+ private final SpringOperatorFactory operatorFactory;
+
+ // Spring 自动注入三个工厂
+ public SpringGraphBasedPipelineBuilder(
+ SpringSourceFactory sourceFactory,
+ SpringSinkFactory sinkFactory,
+ SpringOperatorFactory operatorFactory) {
+ this.sourceFactory = sourceFactory;
+ this.sinkFactory = sinkFactory;
+ this.operatorFactory = operatorFactory;
+ }
+
+ private Mono> createSource(StreamNode node) {
+ SourceConfig config = parseSourceConfig(node);
+ return sourceFactory.createSource(config); // 无需 switch
+ }
+}
+```
+
+---
+
+### 3. 建造者模式(Builder Pattern)
+
+**问题**:如何优雅地构建复杂的 Pipeline?
+
+**解决方案**:建造者模式
+
+```java
+@Component
+public class SpringGraphBasedPipelineBuilder {
+
+ public Mono> buildFromGraph(StreamGraph graph) {
+ return Mono.defer(() -> {
+ // 1. 验证
+ if (!graph.validate()) {
+ return Mono.error(new IllegalArgumentException("Invalid graph"));
+ }
+
+ // 2. 分类节点
+ StreamNode sourceNode = findSourceNode(graph);
+ List operatorNodes = findOperatorNodes(graph);
+ StreamNode sinkNode = findSinkNode(graph);
+
+ // 3. 创建组件
+ return createSource(sourceNode)
+ .flatMap(source -> createOperators(operatorNodes)
+ .flatMap(operators -> createSink(sinkNode)
+ .map(sink -> assemblePipeline(graph, source, operators, sink))));
+ });
+ }
+}
+```
+
+---
+
+### 4. 模板方法模式(Template Method Pattern)
+
+**问题**:Pipeline 执行流程固定,但具体实现不同?
+
+**解决方案**:模板方法模式
+
+```java
+public abstract class AbstractPipeline implements Pipeline {
+
+ // 模板方法:定义执行流程
+ @Override
+ public final Mono execute() {
+ return Mono.defer(() -> {
+ // 1. 执行前钩子
+ return beforeExecute()
+ // 2. 构建数据流
+ .then(Mono.defer(this::buildDataFlow))
+ // 3. 执行数据流
+ .flatMap(this::executeDataFlow)
+ // 4. 执行后钩子
+ .flatMap(this::afterExecute);
+ });
+ }
+
+ // 子类实现具体逻辑
+ protected abstract Mono beforeExecute();
+ protected abstract Flux buildDataFlow();
+ protected abstract Mono executeDataFlow(Flux flow);
+ protected abstract Mono afterExecute(PipelineResult result);
+}
+```
+
+---
+
+### 5. 观察者模式(Observer Pattern)
+
+**问题**:如何监控 Pipeline 的执行状态?
+
+**解决方案**:使用 Reactor 的 `doOnXxx` 操作符(内置观察者模式)
+
+```java
+public Mono execute() {
+ return Mono.defer(() -> {
+ Flux dataFlow = buildDataFlow();
+
+ return sink.write(dataFlow)
+ .doOnSubscribe(s -> notifyListeners(PipelineEvent.STARTED))
+ .doOnNext(data -> notifyListeners(PipelineEvent.PROCESSING, data))
+ .doOnComplete(() -> notifyListeners(PipelineEvent.COMPLETED))
+ .doOnError(e -> notifyListeners(PipelineEvent.FAILED, e));
+ });
+}
+```
+
+---
+
+## 🔧 Spring 注解应用
+
+### 1. 组件扫描
+
+```java
+// Source Creator
+@Component
+public class KafkaSourceCreator implements SourceCreator {
+ // Spring 自动扫描并注册
+}
+
+// Sink Creator
+@Component
+public class ConsoleSinkCreator implements SinkCreator {
+ // Spring 自动扫描并注册
+}
+
+// Operator Creator
+@Component
+public class FilterOperatorCreator implements OperatorCreator {
+ // Spring 自动扫描并注册
+}
+```
+
+### 2. 依赖注入
+
+```java
+@Component
+public class ConsoleSourceCreator implements SourceCreator {
+
+ private final Scheduler ioScheduler;
+
+ // 构造函数注入
+ public ConsoleSourceCreator(@Qualifier("ioScheduler") Scheduler ioScheduler) {
+ this.ioScheduler = ioScheduler;
+ }
+}
+```
+
+### 3. 配置管理
+
+```java
+@Component
+@ConfigurationProperties(prefix = "reactor.scheduler")
+public class ReactorSchedulerProperties {
+ private SchedulerConfig io;
+ private SchedulerConfig compute;
+ // Spring 自动绑定配置
+}
+```
+
+### 4. Bean 管理
+
+```java
+@Configuration
+public class ReactorSchedulerConfig {
+
+ @Bean(name = "ioScheduler", destroyMethod = "dispose")
+ public Scheduler ioScheduler(ReactorSchedulerProperties properties) {
+ return Schedulers.newBoundedElastic(...);
+ }
+
+ @Bean(name = "computeScheduler", destroyMethod = "dispose")
+ public Scheduler computeScheduler(ReactorSchedulerProperties properties) {
+ return Schedulers.newParallel(...);
+ }
+}
+```
+
+### 5. 服务层
+
+```java
+@Service
+public class PipelineExecutionService {
+
+ private final SpringGraphBasedPipelineBuilder pipelineBuilder;
+ private final Scheduler pipelineScheduler;
+
+ public PipelineExecutionService(
+ SpringGraphBasedPipelineBuilder pipelineBuilder,
+ @Qualifier("pipelineScheduler") Scheduler pipelineScheduler) {
+ this.pipelineBuilder = pipelineBuilder;
+ this.pipelineScheduler = pipelineScheduler;
+ }
+
+ public Mono execute(StreamGraph graph) {
+ return pipelineBuilder.buildFromGraph(graph)
+ .flatMap(Pipeline::execute)
+ .subscribeOn(pipelineScheduler);
+ }
+}
+```
+
+---
+
+## 🎯 Reactor 线程池配置
+
+### 1. 配置文件
+
+```yaml
+reactor:
+ scheduler:
+ # IO 密集型操作
+ io:
+ pool-size: 100
+ queue-size: 1000
+ thread-name-prefix: reactor-io-
+
+ # CPU 密集型操作
+ compute:
+ pool-size: 0 # 0 = CPU 核心数
+ thread-name-prefix: reactor-compute-
+
+ # 阻塞操作包装
+ bounded-elastic:
+ pool-size: 200
+ queue-size: 10000
+ ttl-seconds: 60
+ thread-name-prefix: reactor-bounded-
+
+ # Pipeline 执行专用
+ pipeline:
+ pool-size: 50
+ queue-size: 500
+ thread-name-prefix: pipeline-exec-
+```
+
+### 2. Scheduler 使用场景
+
+| Scheduler | 使用场景 | 示例 |
+|-----------|---------|------|
+| `ioScheduler` | IO 密集型操作 | 数据库查询、HTTP 请求、消息队列 |
+| `computeScheduler` | CPU 密集型操作 | 数据转换、计算、聚合 |
+| `boundedElasticScheduler` | 阻塞操作包装 | JDBC 调用、同步第三方库 |
+| `pipelineScheduler` | Pipeline 执行 | Graph 构建、Pipeline 执行 |
+
+### 3. 使用示例
+
+```java
+@Component
+public class ConsoleSourceCreator implements SourceCreator {
+
+ private final Scheduler ioScheduler;
+
+ public ConsoleSourceCreator(@Qualifier("ioScheduler") Scheduler ioScheduler) {
+ this.ioScheduler = ioScheduler;
+ }
+
+ @Override
+ public Mono> create(SourceConfig config) {
+ return Mono.fromCallable(() -> {
+ // 创建逻辑
+ return new ConsoleSource(config);
+ })
+ .subscribeOn(ioScheduler); // 在 IO 线程池执行
+ }
+}
+```
+
+---
+
+## 📊 架构对比
+
+### 之前(使用 switch case)
+
+```
+GraphBuilder
+ ↓
+switch (type) {
+ case SOURCE_A: return new SourceA();
+ case SOURCE_B: return new SourceB();
+ ...
+}
+```
+
+**问题**:
+- ❌ 违反开闭原则
+- ❌ 代码耦合度高
+- ❌ 难以扩展
+- ❌ 测试困难
+
+### 现在(使用设计模式 + Spring)
+
+```
+Spring 容器启动
+ ↓
+自动扫描所有 @Component
+ ↓
+注入到 Factory
+ ↓
+Factory.create(config)
+ ↓
+根据 type 查找 Creator
+ ↓
+Creator.create(config)
+```
+
+**优势**:
+- ✅ 符合开闭原则
+- ✅ 低耦合、高内聚
+- ✅ 易于扩展
+- ✅ 便于测试
+- ✅ Spring 自动管理
+
+---
+
+## 🚀 如何添加新组件?
+
+### 示例:添加一个新的 Source
+
+**步骤 1**:实现 `DataSource` 接口
+
+```java
+public class MyCustomSource implements DataSource {
+ @Override
+ public Flux read() {
+ return Flux.just(new MyData());
+ }
+}
+```
+
+**步骤 2**:创建 Creator(添加 `@Component`)
+
+```java
+@Component // 这就够了!Spring 会自动发现
+public class MyCustomSourceCreator implements SourceCreator {
+
+ @Override
+ public Mono> create(SourceConfig config) {
+ return Mono.just(new MyCustomSource());
+ }
+
+ @Override
+ public String getType() {
+ return "mycustom"; // 定义类型标识
+ }
+}
+```
+
+**步骤 3**:完成!
+
+不需要修改任何其他代码,Spring 会自动:
+1. 扫描到 `MyCustomSourceCreator`
+2. 注入到 `SpringSourceFactory`
+3. 在 `creatorMap` 中注册
+
+---
+
+## 📝 总结
+
+### 核心改进
+
+1. **策略模式替代 switch case**:每个类型一个策略类
+2. **Spring 依赖注入**:自动发现和管理所有组件
+3. **Reactor 线程池配置**:针对不同场景使用不同的 Scheduler
+4. **开闭原则**:扩展无需修改现有代码
+5. **可测试性**:每个组件独立,易于单元测试
+
+### 设计原则
+
+- ✅ 单一职责原则(SRP)
+- ✅ 开闭原则(OCP)
+- ✅ 依赖倒置原则(DIP)
+- ✅ 接口隔离原则(ISP)
diff --git a/pipeline-framework/Dockerfile b/pipeline-framework/Dockerfile
new file mode 100644
index 000000000..10d315475
--- /dev/null
+++ b/pipeline-framework/Dockerfile
@@ -0,0 +1,69 @@
+# Multi-stage build for ETL Framework
+
+# Stage 1: Build
+FROM maven:3.9-eclipse-temurin-17 AS build
+
+WORKDIR /app
+
+# Copy pom files
+COPY pom.xml .
+COPY etl-api/pom.xml etl-api/
+COPY etl-core/pom.xml etl-core/
+COPY etl-connectors/pom.xml etl-connectors/
+COPY etl-operators/pom.xml etl-operators/
+COPY etl-scheduler/pom.xml etl-scheduler/
+COPY etl-executor/pom.xml etl-executor/
+COPY etl-state/pom.xml etl-state/
+COPY etl-checkpoint/pom.xml etl-checkpoint/
+COPY etl-metrics/pom.xml etl-metrics/
+COPY etl-web/pom.xml etl-web/
+COPY etl-starter/pom.xml etl-starter/
+
+# Download dependencies
+RUN mvn dependency:go-offline -B
+
+# Copy source code
+COPY etl-api/src etl-api/src
+COPY etl-core/src etl-core/src
+COPY etl-connectors/src etl-connectors/src
+COPY etl-operators/src etl-operators/src
+COPY etl-scheduler/src etl-scheduler/src
+COPY etl-executor/src etl-executor/src
+COPY etl-state/src etl-state/src
+COPY etl-checkpoint/src etl-checkpoint/src
+COPY etl-metrics/src etl-metrics/src
+COPY etl-web/src etl-web/src
+COPY etl-starter/src etl-starter/src
+
+# Build application
+RUN mvn clean package -DskipTests -B
+
+# Stage 2: Runtime
+FROM eclipse-temurin:17-jre-alpine
+
+LABEL maintainer="etl-framework-team"
+LABEL description="Reactive ETL Framework"
+LABEL version="1.0.0-SNAPSHOT"
+
+# Set working directory
+WORKDIR /app
+
+# Create data directories
+RUN mkdir -p /data/checkpoints /var/log/etl-framework
+
+# Copy JAR from build stage
+COPY --from=build /app/etl-starter/target/etl-starter-*.jar /app/etl-framework.jar
+
+# Set environment variables
+ENV JAVA_OPTS="-Xms512m -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
+ENV SPRING_PROFILES_ACTIVE=prod
+
+# Expose port
+EXPOSE 8080
+
+# Health check
+HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
+ CMD wget --quiet --tries=1 --spider http://localhost:8080/actuator/health || exit 1
+
+# Run application
+ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar /app/etl-framework.jar"]
diff --git a/pipeline-framework/FINAL_REFACTORING_SUMMARY.md b/pipeline-framework/FINAL_REFACTORING_SUMMARY.md
new file mode 100644
index 000000000..675cb654c
--- /dev/null
+++ b/pipeline-framework/FINAL_REFACTORING_SUMMARY.md
@@ -0,0 +1,521 @@
+# Pipeline Framework 终极重构总结
+
+## 🎉 重构完成
+
+本次重构彻底改造了整个项目架构,消除了所有 switch case,大幅增强了抽象能力和可扩展性。
+
+---
+
+## 📊 改造成果统计
+
+### 代码清理
+
+| 类型 | 数量 |
+|-----|------|
+| 删除的无用类 | 6 个 |
+| 新增的接口 | 11 个 |
+| 新增的实现类 | 7 个 |
+| 消除的 switch case | 3+ 处 |
+
+### 删除的无用类
+
+1. ❌ `DefaultPipeline` → ✅ 使用 `SimplePipeline`
+2. ❌ `GraphBasedPipelineBuilder` → ✅ 使用 `SpringGraphBasedPipelineBuilder`
+3. ❌ `PipelineBuilder` → ✅ 无实际用途
+4. ❌ `GraphExecutor` → ✅ 使用 `EnhancedGraphExecutor`
+5. ❌ `OperatorChain` → ✅ 直接在 Pipeline 中实现
+6. ❌ `DefaultOperatorChain` → ✅ 直接在 Pipeline 中实现
+
+---
+
+## 🏗️ 新的架构层次
+
+### 1. API 层 - 接口抽象(5 层继承)
+
+```
+Level 1: Component
+ ├── ComponentType
+ ├── ComponentMetadata
+ └── getName(), getConfig()
+
+Level 2: LifecycleAware
+ └── start(), stop(), isRunning()
+
+Level 2: StreamingComponent extends Component
+ └── process(), getInputType(), getOutputType()
+
+Level 3: DataSource extends Component + LifecycleAware
+ └── read(), getType()
+
+Level 3: Operator extends StreamingComponent
+ └── apply(), getType()
+
+Level 3: DataSink extends Component + LifecycleAware
+ └── write(), writeBatch(), flush()
+```
+
+### 2. Core 层 - 策略模式实现
+
+```
+NodeExecutor (策略接口)
+├── AbstractNodeExecutor (模板方法)
+ ├── SourceNodeExecutor (@Component)
+ ├── OperatorNodeExecutor (@Component)
+ └── SinkNodeExecutor (@Component)
+
+NodeExecutorRegistry (@Component)
+└── 自动注入所有 NodeExecutor
+
+EnhancedGraphExecutor (@Component)
+└── 使用 Registry,无 switch case
+```
+
+---
+
+## 🚀 核心改进详解
+
+### 1. 消除 Switch Case - 使用策略模式
+
+#### ❌ 改造前(硬编码)
+
+```java
+switch (node.getNodeType()) {
+ case SOURCE:
+ flux = buildSourceFlux(node);
+ break;
+ case OPERATOR:
+ flux = buildOperatorFlux(node);
+ break;
+ case SINK:
+ flux = buildOperatorFlux(node);
+ break;
+ default:
+ throw new IllegalStateException("Unknown node type");
+}
+```
+
+**问题**:
+- 违反开闭原则
+- 新增类型需修改代码
+- 代码耦合度高
+- 难以测试
+
+#### ✅ 改造后(策略模式)
+
+```java
+// 1. 定义策略接口
+public interface NodeExecutor {
+ Flux buildFlux(StreamNode node, NodeExecutionContext context);
+ NodeType getSupportedNodeType();
+}
+
+// 2. 实现具体策略
+@Component
+public class SourceNodeExecutor extends AbstractNodeExecutor