public interface ItemStream {// step执行之前执行void open(ExecutionContext var1) throws ItemStreamException;// 成功处理每一批chunk之后执行void update(ExecutionContext var1) throws ItemStreamException;// 整个step执行完之后才会执行void close() throws ItemStreamException;
}
一:user.csv
1,monday,10,上海市,浦东新区
2,zhangsan,20,北京市,朝阳区
3,xxx,30,深圳市,宝安区
4,wangwu,31,上海市,浦东新区
5,huihui,32,上海市,浦东新区
二:ItemStreamReader
/*** 异常处理和重启*/
@Component
public class MyItemReader implements ItemStreamReader<UserInfo> {private FlatFileItemReader<UserInfo> flatFileItemReader = new FlatFileItemReader();private int currentLine = 0;// 是否允许重启private boolean restart = false;private ExecutionContext executionContext;public MyItemReader() {flatFileItemReader.setResource(new ClassPathResource("static/user.csv"));DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();tokenizer.setNames("id", "username", "age", "city", "area");BeanWrapperFieldSetMapper<UserInfo> fieldSetMapper = new BeanWrapperFieldSetMapper<>();fieldSetMapper.setTargetType(UserInfo.class);DefaultLineMapper<UserInfo> defaultLineMapper = new DefaultLineMapper<>();defaultLineMapper.setLineTokenizer(tokenizer);defaultLineMapper.setFieldSetMapper(fieldSetMapper);defaultLineMapper.afterPropertiesSet();flatFileItemReader.setLineMapper(defaultLineMapper);}@Overridepublic void open(ExecutionContext executionContext) throws ItemStreamException {this.executionContext = executionContext;if (executionContext.containsKey("currentLine")) {// 6.作业第二次执行this.currentLine = executionContext.getInt("currentLine");this.restart = true;} else {// 1.作业第一次先执行this.currentLine = 0;executionContext.put("currentLine", this.currentLine);System.err.println("start reading from line:" + this.currentLine + 1);}System.err.println("step执行之前");}@Overridepublic UserInfo read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {UserInfo userInfo = null;this.currentLine++;if (restart) {// 7.作业第二次执行:跳过成功的行flatFileItemReader.setLinesToSkip(this.currentLine - 1);restart = false;System.err.println("restart from line:" + this.currentLine);}// 2.作业第一次执行读数据// 8.作业第二次执行读(从成功的行开始读)flatFileItemReader.open(this.executionContext);userInfo = flatFileItemReader.read();if (userInfo != null && userInfo.getUsername().equals("xxx")) {// 4.作业第一次 下一轮Chunk遇到xxx报错throw new RuntimeException("read userinfo id=" + userInfo.getId() + " exception");}return userInfo;}@Overridepublic void update(ExecutionContext executionContext) throws ItemStreamException {// 3.读完一轮chunkSize后执行// 4.读第二轮ChunkSize后执行executionContext.put("currentLine", this.currentLine);System.err.println("读完一个ChunkSize执行,读的过程中报错也会执行");}@Overridepublic void close() throws ItemStreamException {// 5.步骤执行结束执行System.err.println("Step最后执行");}
}
三:config
@Configuration
public class RestartJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate MyItemReader myItemReader;@Autowiredprivate MyItemWriter myItemWriter;@Beanpublic Job restartJob() {return jobBuilderFactory.get("myRestartJob2").start(restartStep()).build();}@Beanpublic Step restartStep() {return stepBuilderFactory.get("restartStep").<UserInfo, UserInfo>chunk(2).reader(myItemReader).writer(myItemWriter).build();}
}
四:controller
@RestController
@RequestMapping("/job")
public class JobController {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate Job restartJob;@RequestMapping("/start")public ExitStatus start() throws Exception {JobExecution jobExecution = jobLauncher.run(restartJob, new JobParameters());return jobExecution.getExitStatus();}
}
第一次执行完,将第3条数据的用户名xxx随便改成其它值,然后重启服务,尝试第二次继续执行。