Skip to main content
 首页 » 编程设计

Spring Batch 入门教程

2022年07月19日157mayingbao

Spring Batch 入门教程

Spring Batch用于健壮地执行批处理工作的框架。本文通过实战方式介绍Spring Batch,聚焦案例代码方式进行阐述。

当前版本4.1,支持Spring 5 和 java 8 或以上版本。也遵循JSR-352批处理规范。这里详细说明了其应用场景。

核心概念及流程

Spring Batch 核心概念

Spring batch 遵循作业仓库(job repository)执行调度工作并负责与批处理作业进行交互的传统批处理架构。job可以有多个步骤,每个步骤一般遵循读数据、处理数据以及写数据顺序。spring batch框架帮助我们处理大量的繁重工作————特别是底层对job的持久化工作等。

示例需求

这里我们处理一个简单示例,从csv文件中迁移一些财务数据至xml文件。输入文件结构很简单,每行包括一个事物,包括姓名、用户id、发生日期以及金额:

username, userid, transaction_date, transaction_amount 
devendra, 1234, 31/10/2015, 10000 
john, 2134, 3/12/2015, 12321 
robin, 2134, 2/02/2015, 23411 

gradle 依赖

dependencies { 
    compile group: 'org.apache.poi', name: 'poi', version: '4.0.1' 
    compile group: 'org.apache.poi', name: 'poi-ooxml', version: '4.0.1' 
    compile group: 'com.google.guava', name: 'guava', version: '23.0' 
 
    compile 'org.springframework:spring-oxm' 
 
    implementation 'org.springframework.boot:spring-boot-starter-batch' 
 
    compileOnly 'org.projectlombok:lombok' 
    annotationProcessor 'org.projectlombok:lombok' 
 
    runtimeOnly 'mysql:mysql-connector-java' 
    compile "org.springframework:spring-jdbc" 
 
    testImplementation 'org.springframework.boot:spring-boot-starter-test' 
    testImplementation 'org.springframework.batch:spring-batch-test' 
} 
 

schema 初始化

spring batch 通过数据库持久化job及step执行状态。因此首先需要初始化spring batch的数据库脚本。spring-batch-core.jar的\org\springframework\batch\core路径下有所有支持数据的脚本。我们拷贝schema-mysql.sql至resouces目录下,然后在application.properties中配置数据源并启动初始化脚本属性:

spring.datasource.username= 
spring.datasource.password= 
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver 
spring.datasource.url=jdbc:mysql://192.168.0.12:3306/batch-db?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC 
 
spring.datasource.initialization-mode=always 
spring.datasource.platform=mysql 

spring.datasource.initialization-mode 是spring boot中启用执行schema.sql 和 data.sql;示例中使用mysql,所以也需要指定spring.datasource.platform。

从csv文件读数据

首先配置cvsFileItemReader,从csv文件读数据并转换为Transation对象:

@Setter 
@Getter 
@ToString 
@XmlRootElement(name = "transactionRecord") 
public class Transaction { 
    private String username; 
    private int userId; 
    private Date transactionDate; 
    private double amount; 
} 

为了实现转换,需要定义RecordFieldSetMapper类:

public class RecordFieldSetMapper implements FieldSetMapper<Transaction> { 
    private static String dateFormat = "d/MM/yyyy"; 
 
    @Override 
    public Transaction mapFieldSet(FieldSet fieldSet)  { 
 
        Transaction transaction = new Transaction(); 
 
        transaction.setUsername(fieldSet.readString("username")); 
        transaction.setUserId(fieldSet.readInt(1)); 
        transaction.setAmount(fieldSet.readDouble(3)); 
        String dateString = fieldSet.readString(2); 
        transaction.setTransactionDate( Date.from( 
                LocalDate.parse(dateString, 
                DateTimeFormatter.ofPattern(dateFormat)).atStartOfDay(ZoneId.systemDefault()).toInstant() 
        )); 
        return transaction; 
    } 
} 

使用ItemProcessor处理数据

现在我们创建处理器,一般用于处理或转换读取的数据。为了演示,这里没有做任何实际处理。

public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> { 
  
    public Transaction process(Transaction item) { 
        return item; 
    } 
} 

spring batch 配置

下面配置spring batch,整合step和job:

@Configuration 
@EnableBatchProcessing 
public class FinancialBatchConfiguration { 
 
    private final JobBuilderFactory jobs; 
    private final StepBuilderFactory steps; 
 
    @Value("input/record.csv") 
    private Resource inputCsv; 
 
    @Value("file:xml/output.xml") 
    private Resource outputXml; 
 
    @Autowired 
    public FinancialBatchConfiguration(JobBuilderFactory jobs, StepBuilderFactory steps) { 
        this.jobs = jobs; 
        this.steps = steps; 
    } 
 
    @Bean 
    protected Step step1(ItemReader<Transaction> reader, 
                         ItemProcessor<Transaction, Transaction> processor, 
                         ItemWriter<Transaction> writer) { 
        return steps.get("step1").<Transaction, Transaction> chunk(10) 
            .reader(reader).processor(processor).writer(writer).build(); 
    } 
 
    @Bean(name = "firstBatchJob") 
    public Job job(@Qualifier("step1") Step step1) { 
        return jobs.get("firstBatchJob").start(step1).build(); 
    } 
 
    @Bean 
    public ItemReader<Transaction> itemReader() throws UnexpectedInputException, ParseException { 
        FlatFileItemReader<Transaction> reader = new FlatFileItemReader<>(); 
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); 
        String[] tokens = { "username", "userid", "transactiondate", "amount" }; 
        tokenizer.setNames(tokens); 
        reader.setResource(inputCsv); 
        DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<>(); 
        lineMapper.setLineTokenizer(tokenizer); 
        lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); 
        reader.setLineMapper(lineMapper); 
        reader.setLinesToSkip(1); 
        return reader; 
    } 
 
    @Bean 
    public ItemProcessor<Transaction, Transaction> itemProcessor() { 
        return new CustomItemProcessor(); 
    } 
 
    @Bean 
    public ItemWriter<Transaction> itemWriter(Marshaller marshaller){ 
        StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>(); 
        itemWriter.setMarshaller(marshaller); 
        itemWriter.setRootTagName("transactionRecord"); 
        itemWriter.setResource(outputXml); 
        return itemWriter; 
    } 
 
    @Bean 
    public Marshaller marshaller() { 
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); 
        marshaller.setClassesToBeBound(Transaction.class); 
        return marshaller; 
    } 
} 

我们看到itemReader()定义 ItemReader,itemWriter定义ItemWriter,ItemProcessor定义转换器bean。通过StepBuilderFactory创建step,JobBuilderFactory创建job。

spring batch执行job

spring boot 默认对自动运行job,运维每次job运行都在数据库持久化运行参数,如果不设置参数,则job只能运行一次。因此我们需要禁用自动运行job功能,然后手动启动并执行job。

禁用自动执行job

在application.properties 中增加属性并设置为false。

spring.batch.job.enabled=false 

运行job

@SpringBootApplication 
public class JpaToExcelApplication implements CommandLineRunner { 
 
    @Autowired 
    private JobLauncher jobLauncher; 
 
    @Autowired 
    @Qualifier("firstBatchJob") 
    private Job job; 
 
    public static void main(String[] args) { 
        SpringApplication.run(JpaToExcelApplication.class, args); 
    } 
 
    @Override 
    public void run(String... args) throws Exception { 
        callFinacialBatch(); 
    } 
 
    private void callFinacialBatch() throws Exception{ 
        jobLauncher.run(job, new JobParametersBuilder() 
            .addLong("date", LocalDateTime.now().getLong(ChronoField.MICRO_OF_DAY)) 
            .toJobParameters()); 
    } 
} 
 

spring batch 支持多种方式执行job,命令行方式、定时任务方式、web方式。为了演示方便,这里采用命令行方式执行。callFinacialBatch方法给job指定了参数,为了每次运行时参数值不同,可以让job执行多次。执行成功后读者可以查看数据库表记录,也可以看到生成的xml文件。

总结

本文通过一个简单示例给你一个基本的spring batch入门。包括如何初始化spring batch schema脚本,自定义读数据、处理数据以及写数据等。


本文参考链接:https://blog.csdn.net/neweastsun/article/details/88747455