Skip to main content
 首页 » 编程设计

基于H2实现 Spring Batch应用

2022年07月19日133kerrycode

基于H2实现 Spring Batch应用

上文我们已经学习了Spring batch的入门教程,但没有使用数据库,仅使用内存存储spring batch元信息及执行信息。本文我们学习如何配置数据库运行,为了简化使用h2数据库。

需求说明

使用数据库保存元信息,可以随时跟踪执行进度,重新执行失败记录。这里使用H2数据库存储。
从csv文件中读取信息,每条信息判断是数据库中是否存在,如果存在更新余额,否则插入新的记录。

假设原始数据如下:

1,John Wick,101,134 
2,Neo,102,445 
3,Jack Bauer,103,344 
4,Pavan Solapure,101,-34 

每列数据分别表示用户编号、用户名称、部门编号和金额。

引入依赖

implementation 'org.springframework.boot:spring-boot-starter-batch' 
implementation 'org.springframework.boot:spring-boot-starter-data-jpa' 
implementation 'org.springframework.boot:spring-boot-starter-web' 
runtimeOnly 'com.h2database:h2' 
 
compileOnly 'org.projectlombok:lombok' 
annotationProcessor 'org.projectlombok:lombok' 
 
testImplementation 'org.springframework.boot:spring-boot-starter-test' 
testImplementation 'org.springframework.batch:spring-batch-test' 

配置application.properties

# Enabling H2 Console 
spring.h2.console.enabled=true 
 
# Datasource 
spring.datasource.url=jdbc:h2:mem:testdb 
spring.datasource.username=sa 
spring.datasource.password= 
spring.datasource.driver-class-name=org.h2.Driver 
 
# disable spring batch auto run job 
spring.batch.job.enabled=false 
 
input.file=classpath:person.csv 

配置spring.h2.console.enabled=true为了在浏览器中查看h2数据。下面配置h2数据源。

spring.batch.job.enabled=false 禁止spring boot 自动运行job。input.file配置输入文件名称。

定义实体类及repository

定义输入的实体类:

@Data 
@Entity 
public class Person { 
 
    @Id 
    private Long userId; 
 
    private String name; 
 
    private String dept; 
 
    private BigDecimal account; 
} 

定义PersonRepository:

public interface PersonRepository extends CrudRepository<Person, Long> { 
 
} 
 

实现Reader

读取csv文件,这里使用FlatFileItemReader,并转换为Person输出。

public class Reader extends FlatFileItemReader<Person> { 
 
    public Reader(Resource resource) { 
        super(); 
        setResource(resource); 
 
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); 
        lineTokenizer.setNames("userId", "name", "dept", "amount"); 
        lineTokenizer.setDelimiter(","); 
        lineTokenizer.setStrict(false); 
 
        BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>(); 
        fieldSetMapper.setTargetType(Person.class); 
 
        DefaultLineMapper<Person> defaultLineMapper = new DefaultLineMapper<>(); 
        defaultLineMapper.setLineTokenizer(lineTokenizer); 
        defaultLineMapper.setFieldSetMapper(fieldSetMapper); 
        setLineMapper(defaultLineMapper); 
    } 
} 

实现Writer

写所有person对象至数据库:

@Component 
public class Writer implements ItemWriter<Person> { 
 
    @Autowired 
    private PersonRepository repo; 
 
    @Override 
    @Transactional(rollbackFor = Exception.class) 
    public void write(List<? extends Person> users) { 
        repo.saveAll(users); 
    } 
} 

实现Processor

处理过程为,首先查询是否存在对应用户信息,存在则更新acount,否则不处理。

@Component 
public class Processor implements ItemProcessor<Person, Person> { 
 
    private final PersonRepository userRepo; 
    public Processor(PersonRepository userRepo) {this.userRepo = userRepo;} 
 
    @Override 
    public Person process(Person user) { 
        Optional<Person> userFromDb = userRepo.findById(user.getUserId()); 
        userFromDb.ifPresent(person -> user.setAccount(user.getAccount().add(person.getAccount()))); 
        return user; 
    } 
} 

配置job

@Component 
@Slf4j 
public class AccountKeeperJob extends JobExecutionListenerSupport { 
 
    private final JobBuilderFactory jobBuilderFactory; 
    private final StepBuilderFactory stepBuilderFactory; 
    private final Processor processor; 
    private final Writer writer; 
 
    @Value("${input.file}") 
    Resource resource; 
 
    @Autowired 
    public AccountKeeperJob(JobBuilderFactory jobBuilderFactory, 
                            StepBuilderFactory stepBuilderFactory, Processor processor, Writer writer) { 
        this.jobBuilderFactory = jobBuilderFactory; 
        this.stepBuilderFactory = stepBuilderFactory; 
        this.processor = processor; 
        this.writer = writer; 
    } 
 
    @Bean(name = "accountJob") 
    public Job accountKeeperJob() { 
 
        Step step = stepBuilderFactory.get("step-1") 
            .<Person, Person> chunk(1) 
            .reader(new Reader(resource)) 
            .processor(processor) 
            .writer(writer) 
            .build(); 
 
        return jobBuilderFactory.get("accounting-job") 
                                .incrementer(new RunIdIncrementer()) 
                                .listener(this) 
                                .start(step) 
                                .build(); 
    } 
 
    @Override 
    public void afterJob(JobExecution jobExecution) { 
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) { 
            log.info("BATCH JOB COMPLETED SUCCESSFULLY"); 
        } 
    } 
} 

我们通过reader、processor、writer创建step,有步骤创建job。并增加监听器,执行完毕打印日志。

通过web请求执行job

@RestController 
public class JobInvokerController { 
 
    private final JobLauncher jobLauncher; 
    private final Job accountKeeperJob; 
 
    public JobInvokerController(JobLauncher jobLauncher, @Qualifier("accountJob") Job accountKeeperJob) { 
        this.jobLauncher = jobLauncher; 
        this.accountKeeperJob = accountKeeperJob; 
    } 
 
    @RequestMapping("/run-batch-job") 
    public String handle() throws Exception { 
 
        JobParameters jobParameters = new JobParametersBuilder() 
            .addString("source", "Spring Boot") 
            .toJobParameters(); 
        jobLauncher.run(accountKeeperJob, jobParameters); 
 
        return "Batch job has been invoked"; 
    } 
} 

前面通过spring.batch.job.enabled=false配置禁用自动运行job。这里提供请求方式执行job。

配置启用SpringBatch

最后在启动类中增加@EnableBatchProcessing,生成必要基础bean,详细内容见入门篇

@SpringBootApplication 
@EnableBatchProcessing 
public class BatchWebApplication { 
 
    public static void main(String[] args) { 
        SpringApplication.run(BatchWebApplication.class, args); 
    } 
 
} 

系统启动后,会自动创建spring batch数据库表。执行成功后可以查看执行记录。

通过h2查看运行结果

当程序首次运行后,通过该链接 http://localhost:8080/h2-console 查看h2数据库。如下图:
在这里插入图片描述
一旦通过rest请求方式执行job之后,再次查看H2,结果如下:
在这里插入图片描述

总结

本文在入门篇的基础上,使用h2数据库序列化spring batch元信息及执行信息,通过web方式执行job。


本文参考链接:https://blog.csdn.net/neweastsun/article/details/89003082