Spring Batch 入门教程
Spring Batch用于健壮地执行批处理工作的框架。本文通过实战方式介绍Spring Batch,聚焦案例代码方式进行阐述。
当前版本4.1,支持Spring 5 和 java 8 或以上版本。也遵循JSR-352批处理规范。这里详细说明了其应用场景。
核心概念及流程
Spring batch 遵循作业仓库(job repository)执行调度工作并负责与批处理作业进行交互的传统批处理架构。job可以有多个步骤,每个步骤一般遵循读数据、处理数据以及写数据顺序。spring batch框架帮助我们处理大量的繁重工作————特别是底层对job的持久化工作等。
示例需求
这里我们处理一个简单示例,从csv文件中迁移一些财务数据至xml文件。输入文件结构很简单,每行包括一个事物,包括姓名、用户id、发生日期以及金额:
username, userid, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411
gradle 依赖
dependencies {
compile group: 'org.apache.poi', name: 'poi', version: '4.0.1'
compile group: 'org.apache.poi', name: 'poi-ooxml', version: '4.0.1'
compile group: 'com.google.guava', name: 'guava', version: '23.0'
compile 'org.springframework:spring-oxm'
implementation 'org.springframework.boot:spring-boot-starter-batch'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
runtimeOnly 'mysql:mysql-connector-java'
compile "org.springframework:spring-jdbc"
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.batch:spring-batch-test'
}
schema 初始化
spring batch 通过数据库持久化job及step执行状态。因此首先需要初始化spring batch的数据库脚本。spring-batch-core.jar的\org\springframework\batch\core路径下有所有支持数据的脚本。我们拷贝schema-mysql.sql至resouces目录下,然后在application.properties中配置数据源并启动初始化脚本属性:
spring.datasource.username=
spring.datasource.password=
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.0.12:3306/batch-db?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC
spring.datasource.initialization-mode=always
spring.datasource.platform=mysql
spring.datasource.initialization-mode 是spring boot中启用执行schema.sql 和 data.sql;示例中使用mysql,所以也需要指定spring.datasource.platform。
从csv文件读数据
首先配置cvsFileItemReader,从csv文件读数据并转换为Transation对象:
@Setter
@Getter
@ToString
@XmlRootElement(name = "transactionRecord")
public class Transaction {
private String username;
private int userId;
private Date transactionDate;
private double amount;
}
为了实现转换,需要定义RecordFieldSetMapper类:
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
private static String dateFormat = "d/MM/yyyy";
@Override
public Transaction mapFieldSet(FieldSet fieldSet) {
Transaction transaction = new Transaction();
transaction.setUsername(fieldSet.readString("username"));
transaction.setUserId(fieldSet.readInt(1));
transaction.setAmount(fieldSet.readDouble(3));
String dateString = fieldSet.readString(2);
transaction.setTransactionDate( Date.from(
LocalDate.parse(dateString,
DateTimeFormatter.ofPattern(dateFormat)).atStartOfDay(ZoneId.systemDefault()).toInstant()
));
return transaction;
}
}
使用ItemProcessor处理数据
现在我们创建处理器,一般用于处理或转换读取的数据。为了演示,这里没有做任何实际处理。
public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {
public Transaction process(Transaction item) {
return item;
}
}
spring batch 配置
下面配置spring batch,整合step和job:
@Configuration
@EnableBatchProcessing
public class FinancialBatchConfiguration {
private final JobBuilderFactory jobs;
private final StepBuilderFactory steps;
@Value("input/record.csv")
private Resource inputCsv;
@Value("file:xml/output.xml")
private Resource outputXml;
@Autowired
public FinancialBatchConfiguration(JobBuilderFactory jobs, StepBuilderFactory steps) {
this.jobs = jobs;
this.steps = steps;
}
@Bean
protected Step step1(ItemReader<Transaction> reader,
ItemProcessor<Transaction, Transaction> processor,
ItemWriter<Transaction> writer) {
return steps.get("step1").<Transaction, Transaction> chunk(10)
.reader(reader).processor(processor).writer(writer).build();
}
@Bean(name = "firstBatchJob")
public Job job(@Qualifier("step1") Step step1) {
return jobs.get("firstBatchJob").start(step1).build();
}
@Bean
public ItemReader<Transaction> itemReader() throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = { "username", "userid", "transactiondate", "amount" };
tokenizer.setNames(tokens);
reader.setResource(inputCsv);
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLineMapper(lineMapper);
reader.setLinesToSkip(1);
return reader;
}
@Bean
public ItemProcessor<Transaction, Transaction> itemProcessor() {
return new CustomItemProcessor();
}
@Bean
public ItemWriter<Transaction> itemWriter(Marshaller marshaller){
StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(outputXml);
return itemWriter;
}
@Bean
public Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(Transaction.class);
return marshaller;
}
}
我们看到itemReader()定义 ItemReader,itemWriter定义ItemWriter,ItemProcessor定义转换器bean。通过StepBuilderFactory创建step,JobBuilderFactory创建job。
spring batch执行job
spring boot 默认对自动运行job,运维每次job运行都在数据库持久化运行参数,如果不设置参数,则job只能运行一次。因此我们需要禁用自动运行job功能,然后手动启动并执行job。
禁用自动执行job
在application.properties 中增加属性并设置为false。
spring.batch.job.enabled=false
运行job
@SpringBootApplication
public class JpaToExcelApplication implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("firstBatchJob")
private Job job;
public static void main(String[] args) {
SpringApplication.run(JpaToExcelApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
callFinacialBatch();
}
private void callFinacialBatch() throws Exception{
jobLauncher.run(job, new JobParametersBuilder()
.addLong("date", LocalDateTime.now().getLong(ChronoField.MICRO_OF_DAY))
.toJobParameters());
}
}
spring batch 支持多种方式执行job,命令行方式、定时任务方式、web方式。为了演示方便,这里采用命令行方式执行。callFinacialBatch方法给job指定了参数,为了每次运行时参数值不同,可以让job执行多次。执行成功后读者可以查看数据库表记录,也可以看到生成的xml文件。
总结
本文通过一个简单示例给你一个基本的spring batch入门。包括如何初始化spring batch schema脚本,自定义读数据、处理数据以及写数据等。
本文参考链接:https://blog.csdn.net/neweastsun/article/details/88747455