杰瑞科技汇

SpringBatch教程怎么学?核心步骤有哪些?

Spring Batch 全面教程

目录

  1. 什么是 Spring Batch?
  2. 核心概念与架构
    • 1. Job
    • 2. Step
    • 3. JobInstance, JobExecution, StepExecution
    • 4. ItemReader
    • 5. ItemProcessor
    • 6. ItemWriter
    • 7. JobRepository
    • 8. JobLauncher
  3. 第一个 Spring Batch 程序
    • 1. 环境准备
    • 2. Maven 依赖
    • 3. 主配置类
    • 4. 创建一个简单的 Job
    • 5. 运行与查看结果
  4. 核心组件详解
    • 1. ItemReader 常见实现
    • 2. ItemProcessor 实战
    • 3. ItemWriter 常见实现
    • 4. ItemStreamReaderItemStreamWriter (处理大数据)
  5. 进阶特性
    • 1. 作业流: 控制步骤的执行顺序
    • 2. 决策: 动态决定下一步执行哪个步骤
    • 3. 分区: 并行处理数据,大幅提升性能
    • 4. 重试与跳过: 处理异常数据
    • 5. Spring Boot 集成: 使用 @EnableBatchProcessing
  6. 实战项目:从 CSV 读取数据,处理后写入数据库
  7. 最佳实践与总结

什么是 Spring Batch?

Spring Batch 是一个开源的、全面的批处理框架,它是 Spring 生态系统的一部分,它旨在帮助企业开发健壮、可维护的批处理应用程序。

SpringBatch教程怎么学?核心步骤有哪些?-图1
(图片来源网络,侵删)

核心特点:

  • 企业级: 提供了事务管理、资源管理、错误处理、作业监控等企业级功能。
  • 可扩展: 易于与 Spring 框架集成,也支持与其他技术栈(如 JDBC, JMS, NoSQL 等)结合。
  • 健壮性: 内置了作业重启、失败处理、日志记录等机制,确保作业的稳定运行。
  • 高性能: 支持并行处理、分区、步进等特性,可以处理大规模数据集。

适用场景:

  • 定期处理大量数据(如 nightly ETL jobs)。
  • 从数据库、文件(CSV, XML, JSON)等源数据转换并加载到目标系统。
  • 执行大规模操作,如更新数据库中的所有记录。
  • 生成月度财务报告、客户账单等。

重要提醒: Spring Batch 不是一个调度框架,它本身不负责“何时”运行作业,而是负责“如何”运行作业,它通常与调度工具(如 Quartz, Spring Scheduler, Airflow, Kubernetes CronJob)结合使用。


核心概念与架构

理解 Spring Batch 的核心概念是掌握它的关键。

SpringBatch教程怎么学?核心步骤有哪些?-图2
(图片来源网络,侵删)

1. Job

Job 是一个完整的批处理过程,它由一个或多个 Step 组成,它是你启动和执行的最高级别的抽象,一个 Job 有一个逻辑上的名称,用于唯一标识它。

2. Step

StepJob 的组成单元,代表了一个独立的、连续的处理阶段,一个批处理作业至少包含一个 Step,一个 Job 可以包含 "读取数据 -> 处理数据 -> 写入数据" 三个 Step

3. JobInstance, JobExecution, StepExecution

这是理解 Spring Batch 作业生命周期的三个核心对象。

  • JobInstance: 作业的逻辑运行实例,它由 Job 名称和运行参数(JobParameters)共同决定。同一个 Job 使用不同的参数运行,会创建不同的 JobInstance,如果使用相同的参数再次运行,Spring Batch 会尝试从上一次失败的地方重启。
  • JobExecution: 作业的一次物理运行,它包含了作业运行时的状态(如 COMPLETED, FAILED, RUNNING)、开始时间、结束时间、退出码等信息,一个 JobInstance 可以对应多个 JobExecution(第一次失败后重启)。
  • StepExecution: 类似于 JobExecution,它代表 Step 的一次物理运行,包含了该步骤的运行状态、统计信息(读取/处理/写入的条数)等。

4. ItemReader

ItemReader 的职责是从数据源中读取一条数据,并将其传递给 ItemProcessor,它是一个“读取”策略的抽象,读取完成后,它会返回 null 来表示数据源已耗尽。

常见实现:

  • FlatFileItemReader: 从平面文件(如 CSV)中读取。
  • JdbcPagingItemReader: 通过分页查询从数据库中读取。
  • JpaPagingItemReader: 使用 JPA 分页查询。
  • MongoItemReader: 从 MongoDB 中读取。

5. ItemProcessor

ItemProcessor 是可选的,它负责对 ItemReader 读取到的数据进行业务处理,它接收一个输入项,经过处理后返回一个输出项,如果处理失败或需要跳过,可以返回 null

核心接口:

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

6. ItemWriter

ItemWriter 的职责接收 ItemProcessor 处理后的数据(或直接从 ItemReader 接收,如果没有 Processor),并将其写入到目标系统,它通常以“批量”的形式写入,以提高性能。

常见实现:

  • FlatFileItemWriter: 写入平面文件。
  • JdbcBatchItemWriter: 批量写入数据库。
  • JpaItemWriter: 使用 JPA 批量写入。
  • MongoItemWriter: 写入 MongoDB。

7. JobRepository

JobRepository 是 Spring Batch 的“大脑”或“持久化层”,它负责存储所有与作业执行相关的元数据,如 JobInstance, JobExecution, StepExecution 等,它将这些信息保存在数据库中,使得作业可以被监控、重启和管理。

8. JobLauncher

JobLauncher 负责启动一个 Job,它接收一个 Job 和一组 JobParameters,然后创建一个新的 JobExecution,并委托给 JobRepository 来执行这个 Job


第一个 Spring Batch 程序

让我们创建一个最简单的 "Hello World" 程序,它只打印 "Hello, World!"。

1. 环境准备

  • JDK 8+
  • Maven
  • IDE (如 IntelliJ IDEA)

2. Maven 依赖

创建一个普通的 Maven 项目,并添加以下依赖:

<dependencies>
    <!-- Spring Batch Core -->
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-core</artifactId>
        <version>4.3.6</version> <!-- 使用较新的稳定版本 -->
    </dependency>
    <!-- 为了简化配置,我们使用 Spring 的 XML 配置方式 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.3.21</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jdbc</artifactId>
        <version>5.3.21</version>
    </dependency>
    <!-- 内存数据库,用于存储 JobRepository 的元数据 -->
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <version>2.1.214</version>
        <scope>runtime</scope>
    </dependency>
</dependencies>

3. 主配置类

创建一个主应用类来启动 Spring 上下文。

// src/main/java/com/example/HelloWorldJobApplication.java
package com.example;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HelloWorldJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    // 1. 定义一个 Step
    @Bean
    public Step helloWorldStep() {
        return stepBuilderFactory.get("helloWorldStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("Hello, World!");
                    return RepeatStatus.FINISHED; // 告诉框架这个任务已经完成
                }).build();
    }
    // 2. 定义一个 Job,并将 Step 包含进去
    @Bean
    public Job helloWorldJob() {
        return jobBuilderFactory.get("helloWorldJob")
                .incrementer(new RunIdIncrementer()) // 每次运行生成一个新的 ID,防止重复执行
                .start(helloWorldStep())
                .build();
    }
}

4. 创建主程序

创建一个主程序来运行这个 Job。

// src/main/java/com/example/BatchApplication.java
package com.example;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class BatchApplication {
    public static void main(String[] args) {
        // 1. 加载 Spring 配置
        ApplicationContext context = new AnnotationConfigApplicationContext(HelloWorldJobConfiguration.class);
        // 2. 获取 JobLauncher 和 Job
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job helloWorldJob = context.getBean("helloWorldJob", Job.class);
        try {
            // 3. 启动 Job
            JobParameters jobParameters = new JobParametersBuilder()
                    .addLong("time", System.currentTimeMillis())
                    .toJobParameters();
            jobLauncher.run(helloWorldJob, jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

5. 运行与查看结果

运行 BatchApplicationmain 方法,你会在控制台看到:

Hello, World!

Spring Batch 会在内存数据库中创建元数据表,并记录这次执行的日志,如果你使用的是 H2 内存数据库,程序结束后数据会丢失,在实际项目中,你会使用 MySQL 或 PostgreSQL 等持久化数据库。


核心组件详解

1. ItemReader 常见实现:FlatFileItemReader

让我们读取一个 input.csv 文件。

input.csv:

id,name,age
1,Alice,30
2,Bob,25
3,Charlie,35

配置 Reader:

@Bean
public FlatFileItemReader<Person> csvReader() {
    return new FlatFileItemReaderBuilder<Person>()
            .name("personItemReader")
            .resource(new ClassPathResource("input.csv"))
            .delimited() // 使用分隔符格式
            .names(new String[]{"id", "name", "age"}) // 对应 CSV 的列名
            .fieldSetMapper(fieldSet -> {
                Person person = new Person();
                person.setId(fieldSet.readInt("id"));
                person.setName(fieldSet.readString("name"));
                person.setAge(fieldSet.readInt("age"));
                return person;
            })
            .linesToSkip(1) // 跳过标题行
            .build();
}

你需要一个 Person 类来映射数据:

public class Person {
    private int id;
    private String name;
    private int age;
    // Getters and Setters...
}

2. ItemProcessor 实战

假设我们要给每个 Person 对象添加一个 processed 标记。

@Bean
public ItemProcessor<Person, Person> personItemProcessor() {
    return person -> {
        System.out.println("Processing person: " + person.getName());
        person.setName(person.getName().toUpperCase()); // 将名字转为大写
        return person;
    };
}

3. ItemWriter 常见实现:JdbcBatchItemWriter

我们将处理后的 Person 对象批量写入数据库。

确保数据库有对应的表:

CREATE TABLE people (
    id INT PRIMARY KEY,
    name VARCHAR(100),
    age INT
);

配置 Writer:

@Bean
public JdbcBatchItemWriter<Person> databaseWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Person>()
            .dataSource(dataSource)
            .sql("INSERT INTO people (id, name, age) VALUES (:id, :name, :age)")
            .beanMapped() // 使用 Java Bean 的属性名作为 SQL 参数名
            .build();
}

4. ItemStreamReaderItemStreamWriter (处理大数据)

当处理的数据量非常大时,不能一次性全部加载到内存中,Spring Batch 提供了 Stream 版本的读写器,它们可以逐条或分块地处理数据,并且支持“重启”功能,记录已处理的位置。

  • FileItemReader: 继承自 ItemStreamReader,记录文件中的行号。
  • JdbcCursorItemReader: 使用数据库游标,避免 LIMIT/OFFSET 的性能问题。
  • JpaPagingItemReader: 支持从上次失败的位置重启。

进阶特性

1. 作业流: 控制步骤的执行顺序

使用 Jobflow() 方法来定义步骤之间的复杂流程。

@Bean
public Job jobWithFlow() {
    return jobBuilderFactory.get("jobWithFlow")
            .start(step1())
            .on("COMPLETED").to(step2()) // step1 完成,则执行 step2
            .from(step1())
            .on("FAILED").to(step3())    // step1 失败,则执行 step3
            .end()
            .build();
}

2. 决策: 动态决定下一步

创建一个实现 JobExecutionDecider 接口的类,来根据业务逻辑决定下一步执行哪个步骤。

@Bean
public JobExecutionDecider decider() {
    return new MyDecider();
}
@Bean
public Job conditionalJob() {
    return jobBuilderFactory.get("conditionalJob")
            .start(step1())
            .next(decider())
            .from(decider()).on("ODD").to(step2())
            .from(decider()).on("EVEN").to(step3())
            .end()
            .build();
}

3. 分区: 并行处理数据

分区是 Spring Batch 提供的高性能特性,它可以将一个大的 Step 分解成多个小的 Step(分区),在不同的线程中并行执行。

  • Partitioner: 负责将数据源(如数据范围)分割成多个“分区上下文”。
  • StepExecutionSplitter: 根据分区上下文,为每个分区创建一个 Step 实例。
  • TaskExecutor: 用于并行执行这些分区的 Step

4. 重试与跳过

处理不完美数据集的利器。

  • 重试: 当某个 Item 处理时抛出特定异常,可以自动重试。
    @Bean
    public Step retryStep() {
        return stepBuilderFactory.get("retryStep")
                .<String, String>chunk(10)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .faultTolerant() // 开启容错模式
                .retry(RetryableException.class) // 对 RetryableException 进行重试
                .retryLimit(3) // 最多重试 3 次
                .build();
    }
  • 跳过: 当某个 Item 处理失败时,可以跳过它并继续处理下一个。
    .faultTolerant()
    .skip(SkippableException.class) // 跳过 SkippableException
    .skipLimit(10) // 最多跳过 10 个

5. Spring Boot 集成

在 Spring Boot 中使用 Spring Batch 极其简单。

  1. 添加依赖:
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
  2. 启用批处理: 在主启动类上添加 @EnableBatchProcessing
    @SpringBootApplication
    @EnableBatchProcessing // 自动配置了 JobRepository, JobLauncher 等
    public class MyBatchApplication {
        public static void main(String[] args) {
            SpringApplication.run(MyBatchApplication.class, args);
        }
    }
  3. 编写配置: Spring Boot 会自动配置一个 JobRepositoryJobLauncher,你只需要专注于定义 JobStep,通常使用 @Configuration 类来组织你的批处理作业。

实战项目:从 CSV 读取数据,处理后写入数据库

目标: 将 people.csv 文件中的数据读取出来,将姓名转为大写,然后批量写入到 MySQL 数据库的 processed_people 表中。

步骤:

  1. 创建 Spring Boot 项目 (使用 Spring Initializr)。

  2. 添加依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
  3. 配置 application.properties:

    # DataSource
    spring.datasource.url=jdbc:mysql://localhost:3306/batch_db?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
    spring.datasource.username=root
    spring.datasource.password=password
    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    # Batch (Spring Boot 会自动创建这些表)
    spring.batch.jdbc.initialize-schema=always
  4. 创建 Person 模型类

  5. 创建 BatchConfig 配置类:

    @Configuration
    @EnableBatchProcessing
    public class BatchConfig {
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
        @Autowired
        private DataSource dataSource;
        @Bean
        public FlatFileItemReader<Person> reader() {
            return new FlatFileItemReaderBuilder<Person>()
                    .name("personItemReader")
                    .resource(new ClassPathResource("people.csv"))
                    .delimited()
                    .names(new String[]{"id", "name", "age"})
                    .fieldSetMapper(fieldSet -> {
                        Person person = new Person();
                        person.setId(fieldSet.readInt("id"));
                        person.setName(fieldSet.readString("name"));
                        person.setAge(fieldSet.readInt("age"));
                        return person;
                    })
                    .linesToSkip(1)
                    .build();
        }
        @Bean
        public ItemProcessor<Person, Person> processor() {
            return person -> {
                person.setName(person.getName().toUpperCase());
                return person;
            };
        }
        @Bean
        public JdbcBatchItemWriter<Person> writer() {
            return new JdbcBatchItemWriterBuilder<Person>()
                    .dataSource(dataSource)
                    .sql("INSERT INTO processed_people (id, name, age) VALUES (:id, :name, :age)")
                    .beanMapped()
                    .build();
        }
        @Bean
        public Step importPeopleStep() {
            return stepBuilderFactory.get("importPeopleStep")
                    .<Person, Person>chunk(10) // 每处理10条提交一次
                    .reader(reader())
                    .processor(processor())
                    .writer(writer())
                    .build();
        }
        @Bean
        public Job importPeopleJob() {
            return jobBuilderFactory.get("importPeopleJob")
                    .incrementer(new RunIdIncrementer())
                    .flow(importPeopleStep())
                    .end()
                    .build();
        }
    }
  6. src/main/resources 下创建 people.csv 文件

  7. 创建数据库表:

    CREATE DATABASE batch_db;
    USE batch_db;
    CREATE TABLE processed_people (
        id INT PRIMARY KEY,
        name VARCHAR(100),
        age INT
    );
  8. 运行: 直接运行主启动类,Spring Boot 会自动执行所有标记为 @Bean 且类型为 Job 的作业。


最佳实践与总结

  • 配置分离: 将批处理作业的配置与主应用配置分开,使用 @Profile 来区分开发、测试和生产环境。
  • 合理设置 chunk 大小: chunk 大小是性能的关键,太小会导致频繁的数据库 I/O,太大会占用过多内存,通常从 100 到 1000 之间开始测试和调整。
  • 使用 JobParameters 而非硬编码参数: 通过 JobParameters 传递文件路径、日期等参数,使作业更加灵活和可重用。
  • 充分利用 Spring Boot: 在 Spring Boot 项目中,@EnableBatchProcessing 能为你省去大量基础配置工作。
  • 监控与日志: 利用 Spring Boot Actuator 或自定义的 JobListener 来监控作业状态和性能,详细的日志对于排查问题至关重要。
  • 测试: 编写单元测试来测试 ItemProcessor,编写集成测试来测试整个 Step 的流程。

Spring Batch 是一个功能强大且设计精良的批处理框架,通过掌握其核心的 Job/Step/Reader/Processor/Writer 模式,并熟悉其高级特性(如流、分区、重试),你可以构建出能够处理海量数据、稳定可靠的批处理应用,希望这份教程能为你提供一个良好的起点!

分享:
扫描分享到社交APP
上一篇
下一篇