一、理论基础
1.1 Batch是什么
Spring Batch是Spring全家桶中的一员,是一个轻量级的批处理框架,比较实际的应用场景是数据迁移,比如将csv文件中的数据迁移到MySQL。
优势在于上手简单,编码规范化,能以较少的代码实现强大的功能。和ETL工具-kettle功能类似,但是定制性比较强
应用场景集中在各种DB、文件等各种已经存在的历史数据,貌似不支持消息队列的实时监听(如果有知道如何实现的,一定要告诉我),实时数据监听可以使用Storm等流式数据处理框架
1.2 基础概念
- ItemReader:读取数据,有多个封装好的类,可以支持多种数据源,如csv、jdbc等,也可以自定义功能实现。
- ItemWriter:输出数据,有Reader配套的封装类,同样可以自定义功能实现,如输出到消息队列。
- ItemProcessor:数据处理模块,输入为Reader读取的数据,输出为Writer的输入。
- Step:数据操作的步骤,包括:ItemReader->ItemProcessor->ItemWriter 整个数据流
- Job:待执行的任务,每个job可以有一个或多个step
- JobRepository:注册job的容器
- JobLauncher:启动job
- JobLocator:可以根据jobName获取到指定的job,可以配合JobRepository、JobLauncher来手动启动job
1.3 如何开发一个Batch并启动
- 确认输入输出,分别定义InputEntity和OutputEntity
- 编写Reader,输入为各种数据源(csv、MySQL等),输出为InputEntity,数据库的可以选择封装好的类: JdbcCursorItemReader
- 编写Processor,输入为InputEntity,输出为OutputEntity,继承ItemProcessor<T, T>,实现process方法即可
- 编写Writer,输入为OutputEntity,输出为指定的数据源(MySQL等)
- 配置Step和Job
抛却必要配置,实现一个迁移任务就是这么简单
二、实战代码
2.0 创建测试表
数据源表
CREATE TABLE `article` (`title` varchar(64) DEFAULT NULL COMMENT '标题',`content` varchar(255) DEFAULT NULL COMMENT '内容',`event_occurred_time` varchar(32) DEFAULT NULL COMMENT '事件发生时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='文章';
输出的数据表
CREATE TABLE `article_detail` (`title` varchar(64) DEFAULT NULL COMMENT '标题',`content` varchar(255) DEFAULT NULL COMMENT '内容',`event_occurred_time` varchar(32) DEFAULT NULL COMMENT '事件发生时间',`source` varchar(255) DEFAULT NULL COMMENT '文章来源',`description` varchar(255) DEFAULT NULL COMMENT '描述信息'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='文章详情';
2.1 依赖引入
# 本实例基于Springboot 2.X版本
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId>
</dependency>
2.2 配置文件
spring:batch:job:# 默认为true,程序启动时Job会自动执行;false,需要手动启动任务(jobLaucher.run)enabled: false# spring batch默认情况下需要在数据库中创建元数据表,always:每次都会检查表存不存在,不存在会自动创建;never:不会自动创建,如果表不存在,则会报错;initialize-schema: never
如需手动创建元数据表,请参考最后面的附录
2.3 配置JobRepository
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry){JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);return jobRegistryBeanPostProcessor;
}
如果没有该项配置,则手动启动时会报错No job configuration with the name [XJob] was registered
2.4 可选配置
2.4.1 内存模式
/**
* - NoPersistence 无持久化
*/
@Component
public class NoPersistenceBatchConfigurer extends DefaultBatchConfigurer {@Overridepublic void setDataSource(DataSource dataSource) {}
}
加了此项配置后,不会在数据库中创建元数据表,所有的job都是在内存中管理。程序重启后,任务信息会丢失,复杂的任务场景不建议加此配置,对于不需要严格任务管理的任务来讲比较合适。
2.4.2 任务监听
@Component
@Slf4j
public class JobListener extends JobExecutionListenerSupport {@Overridepublic void afterJob(JobExecution jobExecution) {if(jobExecution.getStatus() == BatchStatus.COMPLETED) {log.info("任务[{}]执行成功,参数:[{}]", jobExecution.getJobInstance().getJobName(),jobExecution.getJobParameters().getString("executedTime"));} else {log.info("任务[{}]执行失败", jobExecution.getJobInstance().getJobName());// TODO something}}
}
如果不需要在任务成功或者失败后做一些操作的话可以不加监听器,因为Batch自身包含日志执行情况日志(info级别),包括执行结果、执行参数、执行耗费时间等
2.5 定义输入、输出实体
Article:输入
@Data
public class Article {private String title;private String content;private String eventOccurredTime;
}
ArticleDetail:待输出的数据结构
@Data
public class ArticleDetail {private String title;private String content;private String eventOccurredTime;private String source;private String description;
}
2.6 Reader
2.6.1 JdbcCursorItemReader
/*** 普通读取模式* - MySQL会将所有的纪录读到内存中* - 数据量大的话内存占用会很高*/
public JdbcCursorItemReader<Article> getArticle(String executedTime) {String lastExecutedTime = "2020-01-01 00:00:00";String sql = StringUtils.join("SELECT * FROM article WHERE event_occurred_time >= '",lastExecutedTime, "' AND event_occurred_time < '", executedTime, "'");return new JdbcCursorItemReaderBuilder<Article>().dataSource(dataSource).sql(sql).fetchSize(10).name("getArticle").beanRowMapper(Article.class).build();
}
2.6.2 分页读取
/*** 分页读取模式* - 只要分页合理配置,内存占用可控*/public JdbcPagingItemReader<Article> getArticlePaging(String executedTime) {String lastExecutedTime = "";Map<String, Object> parameterValues = new HashMap<>(2);parameterValues.put("startTime", lastExecutedTime);parameterValues.put("stopTime", executedTime);return new JdbcPagingItemReaderBuilder<Article>().dataSource(dataSource).name("getArticlePaging").fetchSize(10).parameterValues(parameterValues).pageSize(10).rowMapper(new ArticleMapper()).queryProvider(articleProvider()).build();}private PagingQueryProvider articleProvider() {Map<String, Order> sortKeys = new HashMap<>(1);sortKeys.put("event_occurred_time", Order.ASCENDING);MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();provider.setSelectClause("title, content, event_occurred_time");provider.setFromClause("article");provider.setWhereClause("event_occurred_time >= :startTime AND event_occurred_time < :stopTime");provider.setSortKeys(sortKeys);return provider;}
2.6.3 说明
- 可以继承ItemReader,实现自定义功能的Reader
- 分页虽然对于资源的使用时可控的,但是效率会低很多,需要合理设置每一页的数据量。
- 如果有很多个任务一起执行,是看总数据量,比如有五个任务,每个任务采集的数据量为10W,那么设置分页的时候,要考虑到50W的数据量的内存占用情况
- JdbcCursorItemReader在内存足够的情况下可以使用,效率很高
2.7 Processor
2.7.1 示例代码
@Component
public class ArticleProcessor implements ItemProcessor<Article, ArticleDetail> {@Overridepublic ArticleDetail process(Article data) throws Exception {ArticleDetail articleDetail = new ArticleDetail();BeanUtils.copyProperties(data, articleDetail);articleDetail.setSource("weibo");articleDetail.setDescription("这是一条来源于微博的新闻");return articleDetail;}
}
2.7.2 说明
- processor只需要继承ItemProcessor<T1, T2>实现其中的process方法即可。
- T1是Reader读取的数据实体
- T2是要输出到Writer的数据实体,也就是Writer的输入数据实体
2.8 Writer
2.8.1 JdbcBatchItemWriter
@Component
public class ArticleJdbcWriter {private final DataSource dataSource;public ArticleJdbcWriter(DataSource dataSource) {this.dataSource = dataSource;}public JdbcBatchItemWriter<ArticleDetail> writer() {return new JdbcBatchItemWriterBuilder<ArticleDetail>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()).sql("INSERT INTO article_detail (title, content, event_occurred_time, source, description) VALUES (:title, :content, :eventOccurredTime, :source, :description)").dataSource(dataSource).build();}
}
2.8.2 自定义writer
@Slf4j
public class ArticleWriter implements ItemWriter<ArticleDetail> {@Overridepublic void write(List<? extends ArticleDetail> list) throws Exception {log.info("list的大小等于job中设置的chunkSize, size = {}", list.size());// TODO 此处可输出数据,比如输出到消息队列list.forEach(article -> log.info("输出测试,title:{}", article.getTitle()));}
}
2.8.3 说明
- 继承ItemWriter,实现writer方法即可
- T是Processor的输出
- list是Step中设置的chunkSize,也就是每次提交到writer的数据量
2.9 Step与Job
2.9.1 示例代码
@Configuration
@EnableBatchProcessing
public class ArticleBatchJob {@Autowiredpublic JobBuilderFactory jobBuilderFactory;@Autowiredpublic StepBuilderFactory stepBuilderFactory;@Autowiredprivate ArticleReaderDemo articleReader;@Autowiredprivate ArticleProcessor articleProcessor;@Autowiredprivate ArticleJdbcWriter articleJdbcWriter;@Bean(name = "articleReader")@StepScopepublic JdbcPagingItemReader<Article> batchReader(@Value("#{jobParameters['executedTime']}") String executedTime) {return articleReader.getArticlePaging(executedTime);}@Bean(name = "articleWriter")public ItemWriter<ArticleDetail> batchWriter() {
// return articleJdbcWriter.writer();return new ArticleWriter();}@Bean(name = "articleJob")public Job batchJob(JobListener listener, Step articleStep) {return jobBuilderFactory.get("articleJob").listener(listener).incrementer(new RunIdIncrementer()).flow(articleStep).end().build();}@Bean(name = "articleStep")public Step step(JdbcPagingItemReader<Article> articleReader, ItemWriter<ArticleDetail> articleWriter) {return stepBuilderFactory.get("crossHistoryStep")// 数据会累积到一定量再提交到writer.<Article, ArticleDetail>chunk(10).reader(articleReader).processor(articleProcessor).writer(articleWriter)// 默认为false(如果参数未发生变化的话,任务不会重复执行).allowStartIfComplete(true).build();}
}
2.9.1 说明
- @EnableBatchProcessing是必须的
- 每个Step中,并不是每处理一条数据都提交到Writer的,需要配置chunkSize,合理的chunkSize对于数据采集效率的提升效果很明显
- Job如果执行成功一次,下次任务启动时如果参数没有变化的话,默认情况下是不会重复执行的,如果想要执行可以传一个时间参数或者设置
allowStartIfComplete(true)
2.10 集成Quartz实现定时启动
Springboot如何集成Quartz可以看 《实战代码(一):SpringBoot集成Quartz》
2.10.1 QuartzJob
@Component
@Slf4j
@DisallowConcurrentExecution
public class ArticleQuartzJob extends QuartzJobBean {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobLocator jobLocator;@Overrideprotected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {try {Job job = jobLocator.getJob("articleJob");jobLauncher.run(job, new JobParametersBuilder().addString("executedTime", "2020-11-10 16:21:01").toJobParameters());} catch (Exception e) {e.printStackTrace();log.error("任务[articleJob]启动失败,错误信息:{}", e.getMessage());}}
}
2.10.2 初始化QuartzJob
@Component
public class QuartzJobInit implements CommandLineRunner {@Autowiredprivate QuartzUtils quartzUtils;@Overridepublic void run(String... args) throws Exception {quartzUtils.addSingleJob(ArticleQuartzJob.class, "articleJob", 60);}
}
源码地址
https://github.com/lysmile/spring-boot-demo/tree/master/spring-boot-batch-demo
附录 元数据表建表语句(MYSQL)
创建元数据表的SQL文件在
org.springframework.batch.core包中可以找到,可以针对不同的数据库进行配置
-- Autogenerated: do not edit this fileCREATE TABLE BATCH_JOB_INSTANCE (JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_NAME VARCHAR(100) NOT NULL,JOB_KEY VARCHAR(32) NOT NULL,constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;CREATE TABLE BATCH_JOB_EXECUTION (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_INSTANCE_ID BIGINT NOT NULL,CREATE_TIME DATETIME NOT NULL,START_TIME DATETIME DEFAULT NULL ,END_TIME DATETIME DEFAULT NULL ,STATUS VARCHAR(10) ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED DATETIME,JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (JOB_EXECUTION_ID BIGINT NOT NULL ,TYPE_CD VARCHAR(6) NOT NULL ,KEY_NAME VARCHAR(100) NOT NULL ,STRING_VAL VARCHAR(250) ,DATE_VAL DATETIME DEFAULT NULL ,LONG_VAL BIGINT ,DOUBLE_VAL DOUBLE PRECISION ,IDENTIFYING CHAR(1) NOT NULL ,constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_STEP_EXECUTION (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT NOT NULL,STEP_NAME VARCHAR(100) NOT NULL,JOB_EXECUTION_ID BIGINT NOT NULL,START_TIME DATETIME NOT NULL ,END_TIME DATETIME DEFAULT NULL ,STATUS VARCHAR(10) ,COMMIT_COUNT BIGINT ,READ_COUNT BIGINT ,FILTER_COUNT BIGINT ,WRITE_COUNT BIGINT ,READ_SKIP_COUNT BIGINT ,WRITE_SKIP_COUNT BIGINT ,PROCESS_SKIP_COUNT BIGINT ,ROLLBACK_COUNT BIGINT ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED DATETIME,constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
参考
-
spring-batch 官方文档
-
Spring-batch 元数据表文档