SpringBatch从入门到实战(六):ItemReader

news/2025/1/11 21:59:28/

一:ListItemReader

用于简单的开发测试。

@Bean
public ItemReader<String> listItemReader() {return new ListItemReader<>(Arrays.asList("a", "b", "c"));
}

二:FlatFileItemReader

1.1 完全映射

当文件里的字段值和实体类的属性完全一样时,可以直接使用targetType(Class)来完成映射。常用的分割符如逗号, “\u001B” 表示ESC,

1,monday,10,上海市,浦东新区
2,zhangsan,20,北京市,朝阳区
3,lisi,30,深圳市,宝安区
4,wangwu,31,上海市,浦东新区
5,huihui,32,上海市,浦东新区
@Getter
@Setter
@ToString
public class UserInfo {private Long id;private String username;private Integer age;private String city;private String area;
}
@Configuration
public class HelloWorldChunkJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job helloWorldChunkJob() {return jobBuilderFactory.get("helloWorldChunkJob").start(step1()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").<UserInfo, UserInfo>chunk(3).reader(itemReader()).writer(itemWriter()).build();}@Beanpublic ItemReader<UserInfo> itemReader() {return new FlatFileItemReaderBuilder<UserInfo>().encoding("UTF-8").name("userItemReader").resource(new ClassPathResource("static/user.csv"))//.resource(new PathResource("/a/b/c/user.csv")).delimited().delimiter(",").names("id", "username", "age", "city", "area").targetType(UserInfo.class).build();}@Beanpublic ItemWriter<UserInfo> itemWriter() {return new ItemWriter<UserInfo>() {@Overridepublic void write(List<? extends UserInfo> items) throws Exception {System.out.println("itemWriter=" + items);}};}
}

1.2 自定字段映射 fieldSetMapper

@Getter
@Setter
@ToString
public class User {private Long id;private String username;private Integer age;private String address;
}
@Bean
public Step step1() {return stepBuilderFactory.get("step1").<UserInfo, User>chunk(3).reader(itemReader()).writer(itemWriter()).build();
}@Bean
public ItemReader<UserInfo> itemReader() {return new FlatFileItemReaderBuilder<UserInfo>().encoding("UTF-8").name("userItemReader").resource(new ClassPathResource("static/user.csv"))//.resource(new PathResource("/a/b/c/user.csv")).delimited().delimiter(",").names("id", "username", "age", "city", "area").fieldSetMapper(fieldSetMapper()).build();
}@Bean
public FieldSetMapper fieldSetMapper() {return new UserFieldSetMapper();
}
public class UserFieldSetMapper implements FieldSetMapper<User> {@Overridepublic User mapFieldSet(FieldSet fieldSet) throws BindException {User user = new User();user.setId(fieldSet.readLong("id"));user.setUsername(fieldSet.readString("username"));user.setAge(fieldSet.readInt("age"));// 字段处理user.setAddress(fieldSet.readString("city") + fieldSet.readString("area"));return user;}
}

1.3 行映射 lineMapper

public ItemReader<UserInfo> itemReader() {//ESCDelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer("\u001B");// 行结束标志0tokenizer.setQuoteCharacter('\u001A');tokenizer.setFieldSetFactory(new DefaultFieldSetFactory());tokenizer.setNames("id", "username", "age", "city", "area");BeanWrapperFieldSetMapper<UserInfo> fieldSetMapper = new BeanWrapperFieldSetMapper<>();fieldSetMapper.setTargetType(UserInfo.class);DefaultLineMapper<UserInfo> lineMapper = new DefaultLineMapper<>();lineMapper.setLineTokenizer(tokenizer);lineMapper.setFieldSetMapper(fieldSetMapper);return new FlatFileItemReaderBuilder<UserInfo>().encoding("UTF-8").name("userItemReader").resource(new ClassPathResource("static/user.dat")).lineMapper(lineMapper).build();
}

三:JsonItemReader

[{"id":1, "username":"a", "age":18},{"id":2, "username":"b", "age":17},{"id":3, "username":"c", "age":16},{"id":4, "username":"d", "age":15},{"id":5, "username":"e", "age":14}
]
@Bean
public JsonItemReader<UserInfo> itemReader() {ObjectMapper objectMapper = new ObjectMapper();JacksonJsonObjectReader<UserInfo> jsonObjectReader = new JacksonJsonObjectReader<>(UserInfo.class);jsonObjectReader.setMapper(objectMapper);return new JsonItemReaderBuilder<UserInfo>().name("jsonItemReader").resource(new ClassPathResource("static/user.json")).jsonObjectReader(jsonObjectReader).build();
}

四:数据库

3.1 JdbcCursorItemReader

游标一次读一条。
在这里插入图片描述

@Getter
@Setter
@ToString
public class User {private Long id;private String name;private int age;
}
public class UserRowMapper implements RowMapper<User> {@Overridepublic User mapRow(ResultSet rs, int rowNum) throws SQLException {User user = new User();user.setId(rs.getLong("id"));user.setName(rs.getString("name"));user.setAge(rs.getInt("age"));return user;}
}
@Configuration
public class CursorDBReaderJob {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate DataSource dataSource;@Beanpublic UserRowMapper userRowMapper(){return new UserRowMapper();}@Beanpublic JdbcCursorItemReader<User> userItemReader(){return new JdbcCursorItemReaderBuilder<User>().name("userCursorItemReader").dataSource(dataSource).sql("select * from user where age > ?").rowMapper(userRowMapper())//拼接参数.preparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{16})).build();}@Beanpublic ItemWriter<User> itemWriter(){return new ItemWriter<User>() {@Overridepublic void write(List<? extends User> items) throws Exception {items.forEach(System.err::println);}};}@Beanpublic Step step(){return stepBuilderFactory.get("step1").<User, User>chunk(1).reader(userItemReader()).writer(itemWriter()).build();}@Beanpublic Job job(){return jobBuilderFactory.get("cursor-db-reader-job").start(step()).build();}
}

3.2 JdbcPagingItemReader 分页

一次性读一页。

在这里插入图片描述

@Configuration
public class PageDBReaderJob {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate DataSource dataSource;@Beanpublic UserRowMapper userRowMapper(){return new UserRowMapper();}@Beanpublic PagingQueryProvider pagingQueryProvider() throws Exception {SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();factoryBean.setDataSource(dataSource);factoryBean.setSelectClause("select *");   //查询列factoryBean.setFromClause("from user");    //查询的表factoryBean.setWhereClause("where age > :age"); //where 条件factoryBean.setSortKey("id");   //结果排序return factoryBean.getObject();}@Beanpublic JdbcPagingItemReader<User> userItemReader() throws Exception {HashMap<String, Object> param = new HashMap<>();param.put("age", 16);return new JdbcPagingItemReaderBuilder<User>().name("userPagingItemReader").dataSource(dataSource)  //数据源.queryProvider(pagingQueryProvider())  //分页逻辑.parameterValues(param)   //条件.pageSize(10) //每页显示条数.rowMapper(userRowMapper())  //映射规则.build();}@Beanpublic ItemWriter<User> itemWriter(){return new ItemWriter<User>() {@Overridepublic void write(List<? extends User> items) throws Exception {items.forEach(System.err::println);}};}@Beanpublic Step step() throws Exception {return stepBuilderFactory.get("step1").<User, User>chunk(1).reader(userItemReader()).writer(itemWriter()).build();}@Beanpublic Job job() throws Exception {return jobBuilderFactory.get("page-db-reader-job1").start(step()).build();}
}

3.3 MyBatisPagingItemReader

@Configuration
public class HelloWorldChunkJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate SqlSessionFactory sqlSessionFactory;private static int PAGE_SIZE = 3;@Beanpublic Job helloWorldChunkJob() {return jobBuilderFactory.get("helloWorldChunkJob").start(step1()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").<User, User>chunk(PAGE_SIZE).reader(itemReader()).writer(itemWriter()).build();}@Beanpublic MyBatisPagingItemReader<User> itemReader() {Map<String, Object> map = new HashMap<>();map.put("id", 1);MyBatisPagingItemReader<User> itemReader = new MyBatisPagingItemReader<>();itemReader.setSqlSessionFactory(sqlSessionFactory);itemReader.setQueryId("com.example.batch.mapper.UserMapper.selectUserList");itemReader.setPageSize(PAGE_SIZE);itemReader.setParameterValues(map);return itemReader;}@Beanpublic ItemWriter<User> itemWriter() {return new ItemWriter<User>() {@Overridepublic void write(List<? extends User> items) throws Exception {System.out.println("itemWriter=" + items);}};}
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.example.batch.mapper.UserMapper"><select id="selectUserList" resultType="com.example.batch.entity.User">select * from tbl_userwhere id > #{id}limit #{_pagesize} offset #{_skiprows}</select></mapper>

五:多线程读

  • userItemReader() 加上saveState(false) Spring Batch 提供大部分的ItemReader是有状态的,作业重启基本通过状态来确定作业停止位置,而在多线程环境中,如果对象维护状态被多个线程访问,可能存在线程间状态相互覆盖问题。所以设置为false表示关闭状态,但这也意味着作业不能重启了。

  • step() 方法加上 .taskExecutor(new SimpleAsyncTaskExecutor()) 为作业步骤添加了多线程处理能力,以块为单位,一个块一个线程,观察上面的结果,很明显能看出输出的顺序是乱序的。改变 job 的名字再执行,会发现输出数据每次都不一样。

@Bean
public FlatFileItemReader<User> userItemReader(){System.out.println(Thread.currentThread());FlatFileItemReader<User> reader = new FlatFileItemReaderBuilder<User>().name("userItemReader").saveState(false) //防止状态被覆盖.resource(new ClassPathResource("static/user.csv")).delimited().delimiter("#").names("id", "username", "age").targetType(User.class).build();return reader;
}@Bean
public Step step(){return stepBuilderFactory.get("step1").<User, User>chunk(2).reader(userItemReader()).writer(itemWriter()).taskExecutor(new SimpleAsyncTaskExecutor()).build();}

六:多步骤并行执行

@Bean
public Job parallelJob(){//线程1-读user-parallel.txtFlow parallelFlow1 = new FlowBuilder<Flow>("parallelFlow1").start(flatStep()).build();//线程2-读user-parallel.jsonFlow parallelFlow2 = new FlowBuilder<Flow>("parallelFlow2").start(jsonStep()).split(new SimpleAsyncTaskExecutor()).add(parallelFlow1).build();return jobBuilderFactory.get("parallel-step-job").start(parallelFlow2).end().build();
}

parallelJob() 配置job,需要指定并行的flow步骤,先是parallelFlow1然后是parallelFlow2 , 2个步骤间使用**.split(new SimpleAsyncTaskExecutor())** 隔开,表示线程池开启2个线程,分别处理parallelFlow1, parallelFlow2 2个步骤。

七:异常处理

方式一:设置跳过异常次数

@Bean
public Step step() throws Exception {return stepBuilderFactory.get("step1").<User, User>chunk(1).reader(userItemReader()).writer(itemWriter()).faultTolerant() //容错.skip(Exception.class)  //跳过啥异常.noSkip(RuntimeException.class)  //不能跳过啥异常.skipLimit(10)  //跳过异常次数.throttleLimit(10).skipPolicy(new SkipPolicy() {@Overridepublic boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {//定制跳过异常与异常次数return false;}}).build();}

方式二:记录错误信息

public class ErrorItemReaderListener implements ItemReadListener {@Overridepublic void beforeRead() {}@Overridepublic void afterRead(Object item) {}@Overridepublic void onReadError(Exception ex) {System.out.println("记录读数据相关信息...");}
}

方式三:直接跳过不处理

@Bean
public Step step1() {return stepBuilderFactory.get("step1").<User, User>chunk(PAGE_SIZE).reader(itemReader()).writer(itemWriter()).faultTolerant().skip(Exception.class).build();
}

http://www.ppmy.cn/news/380878.html

相关文章

计算机无法访问iTunes,PC端itunes识别不了iphone怎么办 itunes无法识别插入iPhone解决方法...

我们经常用到itunes连接电脑。有朋友反应&#xff0c;当iPhone连接新的电脑后显示充电&#xff0c;但是却无法识别。小编也遇到过这个问题&#xff0c;专门研究了一下&#xff0c;得出了2个原因。一起来看看吧。也许对你有帮助。 有需要的朋友可以点击下载&#xff1a;新版itun…

为何电脑不识别iPhone

电脑不识别iPhone主要原因如下&#xff1a; 一、手机能开机的情况下 1、数据线坏了 2、手机USB接口异常&#xff08;包括的损坏、接触不良、有异物等&#xff09; 3、电脑系统有问题识别不出来&#xff08;包括没有装iTunes及相关补丁&#xff09; .......................…

计算机无法识别苹果手机,如何解决电脑无法识别iphone的问题?

吴川 华南区技术负责人 概要 当我们想要给iPhone做一个全面的备份时,就需要连接手机到PC上。但有一些用户可能会遇到电脑无法识别iphone的问题,这该如何解决呢?不用着急,本文将分析PC无法识别iPhone的原因,并提供相应的解决办法。 一些用户在把iPhone设备连接到电脑上进行…

iphone win7无法识别_大神为你解决win7系统电脑不能识别iphone苹果设备的措施

很多人都懂一些简单的电脑系统问题的解决方案&#xff0c;但是win7系统电脑不能识别iphone苹果设备的解决思路却鲜为人知&#xff0c;小编前几天就遇到了win7系统电脑不能识别iphone苹果设备的问题&#xff0c;于是准备整理一些win7系统电脑不能识别iphone苹果设备的解决思路&a…

iphone7 无法连接计算机看照片,教你iphone7一键导出照片至电脑方法及iPhone7连接电脑itunes没反应怎么办...

iphone7照片怎么导入电脑呢?在iphone7上面拍好照片了&#xff0c;想把iphone7照片导到电脑上去看&#xff0c;那么iphone7照片如何导入到电脑上面?许多刚刚使用iphoen7的朋友可能还不懂得如何导入吧!下面就来看看iphone7照片导入电脑教程。 iphone7一键导出照片至电脑方法 步…

iphone win7无法识别_Win7电脑不能识别iphone苹果设备怎么解决

如今iphone电脑来给iphone设备进行一些管理等&#xff0c;但是可能会遇到这样一个情况&#xff0c;就是iphone连接到电脑的时候&#xff0c;却出现无法识别的问题&#xff0c;该怎么解决这个问题呢&#xff0c;下面给大家分享一下Win7电脑不能识别iphone苹果设备的具体 第一步&…

mac itunes 未能连接到服务器,iPhone 11无法在Mac上连接到iTunes吗?解决方法

一些iPhone 11&#xff0c;iPhone 11 Pro和iPhone 11 Pro Max用户发现iTunes无法识别连接到Mac的新iPhone。相反&#xff0c;使用通过USB电缆连接的iPhone 11或iPhone 11 Pro启动iTunes不会执行任何操作&#xff0c;并且iPhone不会显示在iTunes中&#xff0c;它不会同步&#x…

计算机无法识别苹果6,苹果手机接入电脑itunes无法连接iPhone怎么办?

有网友询问了小尚这个问题,“苹果手机接入电脑itunes无法连接iPhone怎么办?”,所以国美小编总结了苹果手机接入电脑itunes无法连接iPhone的相关信息,现在和大家一起分享。 出现提示:尚未安装iPad/iPhone所需要的软件,请卸载本程序后,重新进行安装程序。情况下可以用以下…