一:ListItemReader
用于简单的开发测试。
@Bean
public ItemReader<String> listItemReader() {return new ListItemReader<>(Arrays.asList("a", "b", "c"));
}
二:FlatFileItemReader
1.1 完全映射
当文件里的字段值和实体类的属性完全一样时,可以直接使用targetType(Class)
来完成映射。常用的分割符如逗号, “\u001B” 表示ESC,
1,monday,10,上海市,浦东新区
2,zhangsan,20,北京市,朝阳区
3,lisi,30,深圳市,宝安区
4,wangwu,31,上海市,浦东新区
5,huihui,32,上海市,浦东新区
@Getter
@Setter
@ToString
public class UserInfo {private Long id;private String username;private Integer age;private String city;private String area;
}
@Configuration
public class HelloWorldChunkJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job helloWorldChunkJob() {return jobBuilderFactory.get("helloWorldChunkJob").start(step1()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").<UserInfo, UserInfo>chunk(3).reader(itemReader()).writer(itemWriter()).build();}@Beanpublic ItemReader<UserInfo> itemReader() {return new FlatFileItemReaderBuilder<UserInfo>().encoding("UTF-8").name("userItemReader").resource(new ClassPathResource("static/user.csv"))//.resource(new PathResource("/a/b/c/user.csv")).delimited().delimiter(",").names("id", "username", "age", "city", "area").targetType(UserInfo.class).build();}@Beanpublic ItemWriter<UserInfo> itemWriter() {return new ItemWriter<UserInfo>() {@Overridepublic void write(List<? extends UserInfo> items) throws Exception {System.out.println("itemWriter=" + items);}};}
}
1.2 自定字段映射 fieldSetMapper
@Getter
@Setter
@ToString
public class User {private Long id;private String username;private Integer age;private String address;
}
@Bean
public Step step1() {return stepBuilderFactory.get("step1").<UserInfo, User>chunk(3).reader(itemReader()).writer(itemWriter()).build();
}@Bean
public ItemReader<UserInfo> itemReader() {return new FlatFileItemReaderBuilder<UserInfo>().encoding("UTF-8").name("userItemReader").resource(new ClassPathResource("static/user.csv"))//.resource(new PathResource("/a/b/c/user.csv")).delimited().delimiter(",").names("id", "username", "age", "city", "area").fieldSetMapper(fieldSetMapper()).build();
}@Bean
public FieldSetMapper fieldSetMapper() {return new UserFieldSetMapper();
}
public class UserFieldSetMapper implements FieldSetMapper<User> {@Overridepublic User mapFieldSet(FieldSet fieldSet) throws BindException {User user = new User();user.setId(fieldSet.readLong("id"));user.setUsername(fieldSet.readString("username"));user.setAge(fieldSet.readInt("age"));// 字段处理user.setAddress(fieldSet.readString("city") + fieldSet.readString("area"));return user;}
}
1.3 行映射 lineMapper
public ItemReader<UserInfo> itemReader() {//ESCDelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer("\u001B");// 行结束标志0tokenizer.setQuoteCharacter('\u001A');tokenizer.setFieldSetFactory(new DefaultFieldSetFactory());tokenizer.setNames("id", "username", "age", "city", "area");BeanWrapperFieldSetMapper<UserInfo> fieldSetMapper = new BeanWrapperFieldSetMapper<>();fieldSetMapper.setTargetType(UserInfo.class);DefaultLineMapper<UserInfo> lineMapper = new DefaultLineMapper<>();lineMapper.setLineTokenizer(tokenizer);lineMapper.setFieldSetMapper(fieldSetMapper);return new FlatFileItemReaderBuilder<UserInfo>().encoding("UTF-8").name("userItemReader").resource(new ClassPathResource("static/user.dat")).lineMapper(lineMapper).build();
}
三:JsonItemReader
[{"id":1, "username":"a", "age":18},{"id":2, "username":"b", "age":17},{"id":3, "username":"c", "age":16},{"id":4, "username":"d", "age":15},{"id":5, "username":"e", "age":14}
]
@Bean
public JsonItemReader<UserInfo> itemReader() {ObjectMapper objectMapper = new ObjectMapper();JacksonJsonObjectReader<UserInfo> jsonObjectReader = new JacksonJsonObjectReader<>(UserInfo.class);jsonObjectReader.setMapper(objectMapper);return new JsonItemReaderBuilder<UserInfo>().name("jsonItemReader").resource(new ClassPathResource("static/user.json")).jsonObjectReader(jsonObjectReader).build();
}
四:数据库
3.1 JdbcCursorItemReader
游标一次读一条。
@Getter
@Setter
@ToString
public class User {private Long id;private String name;private int age;
}
public class UserRowMapper implements RowMapper<User> {@Overridepublic User mapRow(ResultSet rs, int rowNum) throws SQLException {User user = new User();user.setId(rs.getLong("id"));user.setName(rs.getString("name"));user.setAge(rs.getInt("age"));return user;}
}
@Configuration
public class CursorDBReaderJob {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate DataSource dataSource;@Beanpublic UserRowMapper userRowMapper(){return new UserRowMapper();}@Beanpublic JdbcCursorItemReader<User> userItemReader(){return new JdbcCursorItemReaderBuilder<User>().name("userCursorItemReader").dataSource(dataSource).sql("select * from user where age > ?").rowMapper(userRowMapper())//拼接参数.preparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{16})).build();}@Beanpublic ItemWriter<User> itemWriter(){return new ItemWriter<User>() {@Overridepublic void write(List<? extends User> items) throws Exception {items.forEach(System.err::println);}};}@Beanpublic Step step(){return stepBuilderFactory.get("step1").<User, User>chunk(1).reader(userItemReader()).writer(itemWriter()).build();}@Beanpublic Job job(){return jobBuilderFactory.get("cursor-db-reader-job").start(step()).build();}
}
3.2 JdbcPagingItemReader 分页
一次性读一页。
@Configuration
public class PageDBReaderJob {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate DataSource dataSource;@Beanpublic UserRowMapper userRowMapper(){return new UserRowMapper();}@Beanpublic PagingQueryProvider pagingQueryProvider() throws Exception {SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();factoryBean.setDataSource(dataSource);factoryBean.setSelectClause("select *"); //查询列factoryBean.setFromClause("from user"); //查询的表factoryBean.setWhereClause("where age > :age"); //where 条件factoryBean.setSortKey("id"); //结果排序return factoryBean.getObject();}@Beanpublic JdbcPagingItemReader<User> userItemReader() throws Exception {HashMap<String, Object> param = new HashMap<>();param.put("age", 16);return new JdbcPagingItemReaderBuilder<User>().name("userPagingItemReader").dataSource(dataSource) //数据源.queryProvider(pagingQueryProvider()) //分页逻辑.parameterValues(param) //条件.pageSize(10) //每页显示条数.rowMapper(userRowMapper()) //映射规则.build();}@Beanpublic ItemWriter<User> itemWriter(){return new ItemWriter<User>() {@Overridepublic void write(List<? extends User> items) throws Exception {items.forEach(System.err::println);}};}@Beanpublic Step step() throws Exception {return stepBuilderFactory.get("step1").<User, User>chunk(1).reader(userItemReader()).writer(itemWriter()).build();}@Beanpublic Job job() throws Exception {return jobBuilderFactory.get("page-db-reader-job1").start(step()).build();}
}
3.3 MyBatisPagingItemReader
@Configuration
public class HelloWorldChunkJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate SqlSessionFactory sqlSessionFactory;private static int PAGE_SIZE = 3;@Beanpublic Job helloWorldChunkJob() {return jobBuilderFactory.get("helloWorldChunkJob").start(step1()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").<User, User>chunk(PAGE_SIZE).reader(itemReader()).writer(itemWriter()).build();}@Beanpublic MyBatisPagingItemReader<User> itemReader() {Map<String, Object> map = new HashMap<>();map.put("id", 1);MyBatisPagingItemReader<User> itemReader = new MyBatisPagingItemReader<>();itemReader.setSqlSessionFactory(sqlSessionFactory);itemReader.setQueryId("com.example.batch.mapper.UserMapper.selectUserList");itemReader.setPageSize(PAGE_SIZE);itemReader.setParameterValues(map);return itemReader;}@Beanpublic ItemWriter<User> itemWriter() {return new ItemWriter<User>() {@Overridepublic void write(List<? extends User> items) throws Exception {System.out.println("itemWriter=" + items);}};}
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.example.batch.mapper.UserMapper"><select id="selectUserList" resultType="com.example.batch.entity.User">select * from tbl_userwhere id > #{id}limit #{_pagesize} offset #{_skiprows}</select></mapper>
五:多线程读
-
userItemReader() 加上
saveState(false)
Spring Batch 提供大部分的ItemReader是有状态的,作业重启基本通过状态来确定作业停止位置,而在多线程环境中,如果对象维护状态被多个线程访问,可能存在线程间状态相互覆盖问题。所以设置为false表示关闭状态,但这也意味着作业不能重启了。 -
step() 方法加上
.taskExecutor(new SimpleAsyncTaskExecutor())
为作业步骤添加了多线程处理能力,以块为单位,一个块一个线程,观察上面的结果,很明显能看出输出的顺序是乱序的。改变 job 的名字再执行,会发现输出数据每次都不一样。
@Bean
public FlatFileItemReader<User> userItemReader(){System.out.println(Thread.currentThread());FlatFileItemReader<User> reader = new FlatFileItemReaderBuilder<User>().name("userItemReader").saveState(false) //防止状态被覆盖.resource(new ClassPathResource("static/user.csv")).delimited().delimiter("#").names("id", "username", "age").targetType(User.class).build();return reader;
}@Bean
public Step step(){return stepBuilderFactory.get("step1").<User, User>chunk(2).reader(userItemReader()).writer(itemWriter()).taskExecutor(new SimpleAsyncTaskExecutor()).build();}
六:多步骤并行执行
@Bean
public Job parallelJob(){//线程1-读user-parallel.txtFlow parallelFlow1 = new FlowBuilder<Flow>("parallelFlow1").start(flatStep()).build();//线程2-读user-parallel.jsonFlow parallelFlow2 = new FlowBuilder<Flow>("parallelFlow2").start(jsonStep()).split(new SimpleAsyncTaskExecutor()).add(parallelFlow1).build();return jobBuilderFactory.get("parallel-step-job").start(parallelFlow2).end().build();
}
parallelJob() 配置job,需要指定并行的flow步骤,先是parallelFlow1然后是parallelFlow2 , 2个步骤间使用**.split(new SimpleAsyncTaskExecutor())** 隔开,表示线程池开启2个线程,分别处理parallelFlow1, parallelFlow2 2个步骤。
七:异常处理
方式一:设置跳过异常次数
@Bean
public Step step() throws Exception {return stepBuilderFactory.get("step1").<User, User>chunk(1).reader(userItemReader()).writer(itemWriter()).faultTolerant() //容错.skip(Exception.class) //跳过啥异常.noSkip(RuntimeException.class) //不能跳过啥异常.skipLimit(10) //跳过异常次数.throttleLimit(10).skipPolicy(new SkipPolicy() {@Overridepublic boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {//定制跳过异常与异常次数return false;}}).build();}
方式二:记录错误信息
public class ErrorItemReaderListener implements ItemReadListener {@Overridepublic void beforeRead() {}@Overridepublic void afterRead(Object item) {}@Overridepublic void onReadError(Exception ex) {System.out.println("记录读数据相关信息...");}
}
方式三:直接跳过不处理
@Bean
public Step step1() {return stepBuilderFactory.get("step1").<User, User>chunk(PAGE_SIZE).reader(itemReader()).writer(itemWriter()).faultTolerant().skip(Exception.class).build();
}