基于H2实现 Spring Batch应用
上文我们已经学习了Spring batch的入门教程,但没有使用数据库,仅使用内存存储spring batch元信息及执行信息。本文我们学习如何配置数据库运行,为了简化使用h2数据库。
需求说明
使用数据库保存元信息,可以随时跟踪执行进度,重新执行失败记录。这里使用H2数据库存储。
从csv文件中读取信息,每条信息判断是数据库中是否存在,如果存在更新余额,否则插入新的记录。
假设原始数据如下:
1,John Wick,101,134
2,Neo,102,445
3,Jack Bauer,103,344
4,Pavan Solapure,101,-34
每列数据分别表示用户编号、用户名称、部门编号和金额。
引入依赖
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-web'
runtimeOnly 'com.h2database:h2'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.batch:spring-batch-test'
配置application.properties
# Enabling H2 Console
spring.h2.console.enabled=true
# Datasource
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.username=sa
spring.datasource.password=
spring.datasource.driver-class-name=org.h2.Driver
# disable spring batch auto run job
spring.batch.job.enabled=false
input.file=classpath:person.csv
配置spring.h2.console.enabled=true为了在浏览器中查看h2数据。下面配置h2数据源。
spring.batch.job.enabled=false 禁止spring boot 自动运行job。input.file配置输入文件名称。
定义实体类及repository
定义输入的实体类:
@Data
@Entity
public class Person {
@Id
private Long userId;
private String name;
private String dept;
private BigDecimal account;
}
定义PersonRepository:
public interface PersonRepository extends CrudRepository<Person, Long> {
}
实现Reader
读取csv文件,这里使用FlatFileItemReader,并转换为Person输出。
public class Reader extends FlatFileItemReader<Person> {
public Reader(Resource resource) {
super();
setResource(resource);
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames("userId", "name", "dept", "amount");
lineTokenizer.setDelimiter(",");
lineTokenizer.setStrict(false);
BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(Person.class);
DefaultLineMapper<Person> defaultLineMapper = new DefaultLineMapper<>();
defaultLineMapper.setLineTokenizer(lineTokenizer);
defaultLineMapper.setFieldSetMapper(fieldSetMapper);
setLineMapper(defaultLineMapper);
}
}
实现Writer
写所有person对象至数据库:
@Component
public class Writer implements ItemWriter<Person> {
@Autowired
private PersonRepository repo;
@Override
@Transactional(rollbackFor = Exception.class)
public void write(List<? extends Person> users) {
repo.saveAll(users);
}
}
实现Processor
处理过程为,首先查询是否存在对应用户信息,存在则更新acount,否则不处理。
@Component
public class Processor implements ItemProcessor<Person, Person> {
private final PersonRepository userRepo;
public Processor(PersonRepository userRepo) {this.userRepo = userRepo;}
@Override
public Person process(Person user) {
Optional<Person> userFromDb = userRepo.findById(user.getUserId());
userFromDb.ifPresent(person -> user.setAccount(user.getAccount().add(person.getAccount())));
return user;
}
}
配置job
@Component
@Slf4j
public class AccountKeeperJob extends JobExecutionListenerSupport {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final Processor processor;
private final Writer writer;
@Value("${input.file}")
Resource resource;
@Autowired
public AccountKeeperJob(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory, Processor processor, Writer writer) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.processor = processor;
this.writer = writer;
}
@Bean(name = "accountJob")
public Job accountKeeperJob() {
Step step = stepBuilderFactory.get("step-1")
.<Person, Person> chunk(1)
.reader(new Reader(resource))
.processor(processor)
.writer(writer)
.build();
return jobBuilderFactory.get("accounting-job")
.incrementer(new RunIdIncrementer())
.listener(this)
.start(step)
.build();
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("BATCH JOB COMPLETED SUCCESSFULLY");
}
}
}
我们通过reader、processor、writer创建step,有步骤创建job。并增加监听器,执行完毕打印日志。
通过web请求执行job
@RestController
public class JobInvokerController {
private final JobLauncher jobLauncher;
private final Job accountKeeperJob;
public JobInvokerController(JobLauncher jobLauncher, @Qualifier("accountJob") Job accountKeeperJob) {
this.jobLauncher = jobLauncher;
this.accountKeeperJob = accountKeeperJob;
}
@RequestMapping("/run-batch-job")
public String handle() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("source", "Spring Boot")
.toJobParameters();
jobLauncher.run(accountKeeperJob, jobParameters);
return "Batch job has been invoked";
}
}
前面通过spring.batch.job.enabled=false配置禁用自动运行job。这里提供请求方式执行job。
配置启用SpringBatch
最后在启动类中增加@EnableBatchProcessing,生成必要基础bean,详细内容见入门篇。
@SpringBootApplication
@EnableBatchProcessing
public class BatchWebApplication {
public static void main(String[] args) {
SpringApplication.run(BatchWebApplication.class, args);
}
}
系统启动后,会自动创建spring batch数据库表。执行成功后可以查看执行记录。
通过h2查看运行结果
当程序首次运行后,通过该链接 http://localhost:8080/h2-console 查看h2数据库。如下图:
一旦通过rest请求方式执行job之后,再次查看H2,结果如下:
总结
本文在入门篇的基础上,使用h2数据库序列化spring batch元信息及执行信息,通过web方式执行job。
本文参考链接:https://blog.csdn.net/neweastsun/article/details/89003082