Spring Boot - 批处理服务



批处理服务是一个在单个任务中执行多个命令的过程。在本章中,您将学习如何在 Spring Boot 应用程序中创建批处理服务。

让我们考虑一个将 CSV 文件内容保存到 MySql 的示例。

要创建批处理服务程序,我们需要在构建配置文件中添加 Spring Boot Starter Batch、MySQL 和 Spring Data JPA 依赖项。

Maven 用户可以在 pom.xml 文件中添加以下依赖项。

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.mysql</groupId> <artifactId>mysql-connector-j</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency>

Gradle 用户可以在 build.gradle 文件中添加以下依赖项。

compile("org.springframework.boot:spring-boot-starter-batch")
compile("org.springframework.boot:spring-boot-starter-data-jpa")
compile("com.mysql:mysql-connector-j")

现在,将简单的 CSV 数据文件添加到类路径资源 - src/main/resources 下,并将文件命名为 report.csv,如下所示 -

report.csv

William,John
Mike, Sebastian
Lawarance, Lime
Kumar, Mahesh
Goswami, Saikat
Khan, Jaid
Kaur, Manpreet

为 USERS 模型创建一个 POJO 类,如下所示 -

User.java

package com.tutorialspoint.batchservice.model; import jakarta.persistence.Entity; import jakarta.persistence.GeneratedValue; import jakarta.persistence.Id; @Entity public class User { @Id @GeneratedValue private Long id; private String lastName; private String firstName; public User() { } public User(String firstName, String lastName) { this.firstName = firstName; this.lastName = lastName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @Override public String toString() { return "firstName: " + firstName + ", lastName: " + lastName; } }

现在,创建一个中间处理器,在从 CSV 文件读取数据和将数据写入 SQL 之前执行操作。

UserItemProcessor.java

package com.tutorialspoint.batchservice.processor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemProcessor; import com.tutorialspoint.batchservice.model.User; public class UserItemProcessor implements ItemProcessor<User, User> { private static final Logger log = LoggerFactory.getLogger(UserItemProcessor.class); @Override public User process(final User user) throws Exception { final String firstName = user.getFirstName().toUpperCase(); final String lastName = user.getLastName().toUpperCase(); final User transformedPerson = new User(firstName, lastName); log.info("Converting (" + user + ") into (" + transformedPerson + ")"); return transformedPerson; } }

让我们创建一个批处理配置文件,用于读取 CSV 中的数据并将其写入数据库,如下所示。

BatchConfiguration.java

package com.tutorialspoint.batchservice.configuration; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.transaction.PlatformTransactionManager; import com.tutorialspoint.batchservice.listener.UserListener; import com.tutorialspoint.batchservice.model.User; import com.tutorialspoint.batchservice.processor.UserItemProcessor; import com.tutorialspoint.batchservice.repository.UserRepository; @Configuration public class BatchConfiguration { @Bean FlatFileItemReader<User> reader() { FlatFileItemReader<User> reader= new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("/report.csv")); reader.setLineMapper(new DefaultLineMapper<>() {{ setLineTokenizer(new DelimitedLineTokenizer() {{ setDelimiter(DELIMITER_COMMA); setNames("lastName","firstName"); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<>() {{ setTargetType(User.class); }}); }}); return reader; } @Autowired UserRepository repository; @Bean ItemWriter<User> writer(){ return users -> { System.out.println("Saving User: " +users); repository.saveAll(users); }; } @Bean public JobExecutionListener listener() { return new UserListener(); } @Bean Job importUserJob(JobRepository jobRepository, Step step1) { return new JobBuilder("importUserJob", jobRepository) .incrementer(new RunIdIncrementer()) .listener(listener()) .start(step1) .build(); } @Bean Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<User, User>chunk(100, transactionManager) .reader(reader()) .processor(userProcessor()) .writer(writer()) .build(); } @Bean @StepScope UserItemProcessor userProcessor() { return new UserItemProcessor(); } }

reader() 方法用于从 CSV 文件读取数据,而 writer() 方法用于将数据写入数据库。

接下来,我们将不得不编写一个作业完成通知监听器类 - 用于在作业完成之后发出通知。

UserListener.java

package com.tutorialspoint.batchservice.listener; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; public class UserListener implements JobExecutionListener{ @Override public void beforeJob(JobExecution jobExecution) { System.out.println("Job Started: "+ jobExecution.getStartTime() + ", Status: " + jobExecution.getStatus()); } @Override public void afterJob(JobExecution jobExecution) { System.out.println("Job Started: "+ jobExecution.getStartTime() + ", Status: " + jobExecution.getStatus()); } }

我们将不得不编写一个存储库,它将用户保存到数据库中。

UserRepository.java

package com.tutorialspoint.batchservice.repository; import org.springframework.data.jpa.repository.JpaRepository; import com.tutorialspoint.batchservice.model.User; public interface UserRepository extends JpaRepository<User, Long> { }

我们有自定义处理器来显示保存到数据库之前记录的自定义。

UserItemProcessor.java

package com.tutorialspoint.batchservice.processor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemProcessor; import com.tutorialspoint.batchservice.model.User; public class UserItemProcessor implements ItemProcessor<User, User> { private static final Logger log = LoggerFactory.getLogger(UserItemProcessor.class); @Override public User process(final User user) throws Exception { final String firstName = user.getFirstName().toUpperCase(); final String lastName = user.getLastName().toUpperCase(); final User transformedPerson = new User(firstName, lastName); log.info("Converting (" + user + ") into (" + transformedPerson + ")"); return transformedPerson; } }

主要的 Spring Boot 应用程序类。

BatchserviceApplication.java

package com.tutorialspoint.batchservice; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication() public class BatchserviceApplication { public static void main(String[] args) { SpringApplication.run(BatchserviceApplication.class, args); } }

这是完整的 pom.xml

pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.3.4</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.tutorialspoint</groupId> <artifactId>batchservice</artifactId> <version>0.0.1-SNAPSHOT</version> <name>batchservice</name> <description>Demo project for Spring Boot</description> <url/> <licenses> <license/> </licenses> <developers> <developer/> </developers> <scm> <connection/> <developerConnection/> <tag/> <url/> </scm> <properties> <java.version>21</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.mysql</groupId> <artifactId>mysql-connector-j</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

application.properties

spring.application.name=batchservice
#Database Connection
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/details
spring.datasource.username=root
spring.datasource.password=Admin@123

spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=create

spring.batch.jdbc.initialize-schema=always

编译和执行

现在,创建一个可执行的 JAR 文件,并使用以下 Maven 或 Gradle 命令运行 Spring Boot 应用程序。

对于 Maven,使用如下所示的命令 -

mvn clean install

在“BUILD SUCCESS”之后,您可以在 target 目录下找到 JAR 文件。

对于 Gradle,您可以使用如下所示的命令 -

gradle clean build

在“BUILD SUCCESSFUL”之后,您可以在 build/libs 目录下找到 JAR 文件。

使用此处提供的命令运行 JAR 文件 -

java –jar <JARFILE>

您可以在控制台窗口中看到输出,如下所示 -

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

[32m :: Spring Boot :: [39m              [2m (v3.3.4)[0;39m

...
[2m2024-09-23T17:12:15.384+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mcom.zaxxer.hikari.HikariDataSource      [0;39m [2m:[0;39m HikariPool-1 - Starting...
...
Hibernate: drop table if exists user
Hibernate: drop table if exists user_seq
Hibernate: create table user (id bigint not null, first_name varchar(255), last_name varchar(255), primary key (id)) engine=InnoDB
Hibernate: create table user_seq (next_val bigint) engine=InnoDB
Hibernate: insert into user_seq values ( 1 )
...
[2m2024-09-23T17:12:19.006+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.BatchserviceApplication           [0;39m [2m:[0;39m Started BatchserviceApplication in 4.923 seconds (process running for 5.83)
[2m2024-09-23T17:12:19.010+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mo.s.b.a.b.JobLauncherApplicationRunner  [0;39m [2m:[0;39m Running default command line with: []
[2m2024-09-23T17:12:19.110+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mo.s.b.c.l.support.SimpleJobLauncher     [0;39m [2m:[0;39m Job: [SimpleJob: [name=importUserJob]] launched with the following parameters: [{'run.id':'{value=1, type=class java.lang.Long, identifying=true}'}]
Job Started: 2024-09-23T17:12:19.131510100, Status: STARTED
[2m2024-09-23T17:12:19.186+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mo.s.batch.core.job.SimpleStepHandler    [0;39m [2m:[0;39m Executing step: [step1]
[2m2024-09-23T17:12:19.252+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: John, lastName: William) into (firstName: JOHN, lastName: WILLIAM)
[2m2024-09-23T17:12:19.253+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Sebastian, lastName: Mike) into (firstName: SEBASTIAN, lastName: MIKE)
[2m2024-09-23T17:12:19.253+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Lime, lastName: Lawarance) into (firstName: LIME, lastName: LAWARANCE)
[2m2024-09-23T17:12:19.253+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Mahesh, lastName: Kumar) into (firstName: MAHESH, lastName: KUMAR)
[2m2024-09-23T17:12:19.253+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Saikat, lastName: Goswami) into (firstName: SAIKAT, lastName: GOSWAMI)
[2m2024-09-23T17:12:19.253+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Jaid, lastName: Khan) into (firstName: JAID, lastName: KHAN)
[2m2024-09-23T17:12:19.253+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Manpreet, lastName: Kaur) into (firstName: MANPREET, lastName: KAUR)
Saving User: [items=[firstName: JOHN, lastName: WILLIAM, firstName: SEBASTIAN, lastName: MIKE, firstName: LIME, lastName: LAWARANCE, firstName: MAHESH, lastName: KUMAR, firstName: SAIKAT, lastName: GOSWAMI, firstName: JAID, lastName: KHAN, firstName: MANPREET, lastName: KAUR], skips=[]]
Hibernate: select next_val as id_val from user_seq for update
Hibernate: update user_seq set next_val= ? where next_val=?
Hibernate: select next_val as id_val from user_seq for update
Hibernate: update user_seq set next_val= ? where next_val=?
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
[2m2024-09-23T17:12:19.368+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mo.s.batch.core.step.AbstractStep        [0;39m [2m:[0;39m Step: [step1] executed in 181ms
Job Started: 2024-09-23T17:12:19.131510100, Status: COMPLETED
[2m2024-09-23T17:12:19.411+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mo.s.b.c.l.support.SimpleJobLauncher     [0;39m [2m:[0;39m Job: [SimpleJob: [name=importUserJob]] completed with the following parameters: [{'run.id':'{value=1, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 263ms
[2m2024-09-23T17:12:19.456+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mo.s.b.c.l.support.SimpleJobLauncher     [0;39m [2m:[0;39m Job: [SimpleJob: [name=importUserJob]] launched with the following parameters: [{'time':'{value=1727091739411, type=class java.lang.Long, identifying=true}'}]
Job Started: 2024-09-23T17:12:19.456491500, Status: STARTED
[2m2024-09-23T17:12:19.495+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mo.s.batch.core.job.SimpleStepHandler    [0;39m [2m:[0;39m Executing step: [step1]
[2m2024-09-23T17:12:19.530+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: John, lastName: William) into (firstName: JOHN, lastName: WILLIAM)
[2m2024-09-23T17:12:19.531+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Sebastian, lastName: Mike) into (firstName: SEBASTIAN, lastName: MIKE)
[2m2024-09-23T17:12:19.531+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Lime, lastName: Lawarance) into (firstName: LIME, lastName: LAWARANCE)
[2m2024-09-23T17:12:19.531+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Mahesh, lastName: Kumar) into (firstName: MAHESH, lastName: KUMAR)
[2m2024-09-23T17:12:19.531+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Saikat, lastName: Goswami) into (firstName: SAIKAT, lastName: GOSWAMI)
[2m2024-09-23T17:12:19.531+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Jaid, lastName: Khan) into (firstName: JAID, lastName: KHAN)
[2m2024-09-23T17:12:19.531+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mc.t.b.processor.UserItemProcessor       [0;39m [2m:[0;39m Converting (firstName: Manpreet, lastName: Kaur) into (firstName: MANPREET, lastName: KAUR)
Saving User: [items=[firstName: JOHN, lastName: WILLIAM, firstName: SEBASTIAN, lastName: MIKE, firstName: LIME, lastName: LAWARANCE, firstName: MAHESH, lastName: KUMAR, firstName: SAIKAT, lastName: GOSWAMI, firstName: JAID, lastName: KHAN, firstName: MANPREET, lastName: KAUR], skips=[]]
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
Hibernate: insert into user (first_name,last_name,id) values (?,?,?)
[2m2024-09-23T17:12:19.563+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mo.s.batch.core.step.AbstractStep        [0;39m [2m:[0;39m Step: [step1] executed in 67ms
Job Started: 2024-09-23T17:12:19.456491500, Status: COMPLETED
[2m2024-09-23T17:12:19.598+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [           main][0;39m [2m[0;39m[36mo.s.b.c.l.support.SimpleJobLauncher     [0;39m [2m:[0;39m Job: [SimpleJob: [name=importUserJob]] completed with the following parameters: [{'time':'{value=1727091739411, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 129ms
JOB Execution completed!
[2m2024-09-23T17:12:19.603+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [ionShutdownHook][0;39m [2m[0;39m[36mj.LocalContainerEntityManagerFactoryBean[0;39m [2m:[0;39m Closing JPA EntityManagerFactory for persistence unit 'default'
[2m2024-09-23T17:12:19.604+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [ionShutdownHook][0;39m [2m[0;39m[36mcom.zaxxer.hikari.HikariDataSource      [0;39m [2m:[0;39m HikariPool-1 - Shutdown initiated...
[2m2024-09-23T17:12:19.611+05:30[0;39m [32m INFO[0;39m [35m9904[0;39m [2m---[0;39m [2m[batchservice] [ionShutdownHook][0;39m [2m[0;39m[36mcom.zaxxer.hikari.HikariDataSource      [0;39m [2m:[0;39m HikariPool-1 - Shutdown completed.

您也可以检查数据库中的 users 表以查看条目。

广告