直接上代码
■ 共通部分:
1. 代码结构
2. pom.xml
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency>
3. framework/BatchAnnotation.java
package roy.springbatch.framework;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScans;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.PropertySource;import java.lang.annotation.*;@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
@Import({SimpleBatchConfiguration.class})
@EnableBatchProcessing
@ComponentScan
@ComponentScans({@ComponentScan("roy.springbatch.framework")})
@PropertySource(value = "classpath:config/jdbc-dev.properties")
public @interface BatchAnnotation {
}
4. framework/BaseModule.java
package roy.springbatch.framework;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;import java.util.HashMap;
import java.util.Map;public abstract class BaseModule implements CommandLineRunner {private static final Logger log = LoggerFactory.getLogger(BaseModule.class);public static void run(Class<? extends BaseModule> module, String batchName, String[] args)throws Exception {SpringApplication app = new SpringApplication(module);Map<String, Object> param = retriveArgs(batchName, args);app.setDefaultProperties(param);app.run(args);}private static Map<String, Object> retriveArgs(String batchName, String[] args){Map<String, Object> param = new HashMap<>();param.put("argsLength", args.length);if (args.length>0){param.put("targetDate", args[0]);}return param;}@Overridepublic void run(String... args) throws Exception{if (null != args){for(String arg : args){log.info("execute module with argument : " + arg);}}}
}
5. framework/BaseWriter.java
package roy.springbatch.framework;import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;public abstract class BaseWriter<T> implements ItemWriter<T> {protected StepExecution stepExecution;@BeforeSteppublic void saveStepExecution(StepExecution stepExecution){this.stepExecution = stepExecution;}@Overridepublic void write(List<? extends T> items) throws Exception {JobParameters params = stepExecution.getJobParameters();ExecutionContext stepContext = stepExecution.getExecutionContext();for(T item : items){doWrite(item, params, stepContext);}}public abstract void doWrite(T item, JobParameters params, ExecutionContext stepContext) throws Exception;
}
一. Tasklet
1. batTasklet/BatTasklet.java
package roy.springbatch.batTasklet;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import roy.springbatch.framework.BaseModule;
import roy.springbatch.framework.BatchAnnotation;@BatchAnnotation
public class BatTasklet extends BaseModule {private static final Logger log = LoggerFactory.getLogger(BatTasklet.class);private static final String MODULE_NAME = "BATCHTASKLET";public static void main(String[] args) {try {run(BatTasklet.class, MODULE_NAME, args);} catch (Exception e) {log.error(MODULE_NAME + " failed.");System.exit(1);}}
}
2. batTasklet/BatTaskletConfiguration.java
package roy.springbatch.batTasklet;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import roy.springbatch.framework.BaseModule;@Configuration
@EnableBatchProcessing
public class BatTaskletConfiguration extends BaseModule {private static final Logger log = LoggerFactory.getLogger(BatTaskletConfiguration.class);@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate Tasklet tasklet1;@Autowiredprivate Tasklet tasklet2;@Beanpublic Step step1(){return stepBuilderFactory.get("step1").allowStartIfComplete(true).tasklet(tasklet1).build();}@Beanpublic Step step2(){return stepBuilderFactory.get("step2").allowStartIfComplete(true).tasklet(tasklet2).build();}@Beanpublic Job job(){return jobBuilderFactory.get("step-tasklet-job").incrementer(new RunIdIncrementer()).start(step1()).next(step2()).build();}}
3. batTasklet/Task1.java (具体想要做的事情写在Task里面)
package roy.springbatch.batTasklet;import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class Task1 {@Beanpublic Tasklet tasklet1(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("--->Tasklet 1 Execute:" + System.currentTimeMillis());return RepeatStatus.FINISHED;}};}
}
4. batTasklet/Task2.java
package roy.springbatch.batTasklet;import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class Task2 {@Beanpublic Tasklet tasklet2(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("--->Tasklet 2 Execute:" + System.currentTimeMillis());return RepeatStatus.FINISHED;}};}
}
5. 测试:
二. Chunk
1. batChunk/listener/BatChunkListener.java
package roy.springbatch.batChunk.listener;import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.stereotype.Component;@Component
public class BatChunkListener extends JobExecutionListenerSupport {@Overridepublic void beforeJob(JobExecution jobExecution){System.out.println("--->BeforeJob Execute");}@Overridepublic void afterJob(JobExecution jobExecution){System.out.println("--->AfterJob Execute");}
}
2. batChunk/reader/BatChunkReader.java
package roy.springbatch.batChunk.reader;import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;import java.util.HashMap;
import java.util.Map;public class BatChunkReader implements ItemReader<Map<String, String>> {private int stepCount = 0;private Map<String, String> listMap = new HashMap<>();@Overridepublic Map<String, String> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {if(stepCount<3){stepCount++;System.out.println("--->Reader Execute: read from DB");listMap.put("A1", "1");listMap.put("A2", "2");listMap.put("A3", "3");return listMap;}else {return null;}}
}
3. batChunk/writer/BatChunkWriter.java
package roy.springbatch.batChunk.writer;import org.springframework.batch.core.JobParameters;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import roy.springbatch.framework.BaseWriter;import java.util.Map;@Component
@Scope("prototype")
public class BatChunkWriter extends BaseWriter<Map<String, String>> {@Overridepublic void doWrite(Map<String, String> map, JobParameters params,ExecutionContext stepContext) throws Exception {System.out.println("--->Writer Execute: write to DB:" + map.toString());}
}
4. batChunk/BatChunk.java
package roy.springbatch.batChunk;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import roy.springbatch.framework.BaseModule;
import roy.springbatch.framework.BatchAnnotation;@BatchAnnotation
public class BatChunk extends BaseModule {private static final Logger log = LoggerFactory.getLogger(BatChunk.class);private static final String MODULE_NAME = "BATCHCHUNK";public static void main(String[] args) {try {run(BatChunk.class, MODULE_NAME, args);} catch (Exception e) {log.error(MODULE_NAME + " failed.");System.exit(1);}}
}
5. batChunk/BatChunkConfiguration.java
package roy.springbatch.batChunk;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.transaction.PlatformTransactionManager;
import roy.springbatch.batChunk.listener.BatChunkListener;
import roy.springbatch.batChunk.reader.BatChunkReader;
import roy.springbatch.batChunk.writer.BatChunkWriter;import java.util.Map;@Configuration
@EnableBatchProcessing
public class BatChunkConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate BatChunkWriter batChunkWriter;@Autowiredprivate PlatformTransactionManager dbWriteManager;@Bean@Scope("prototype")public ItemReader<Map<String, String>> readerByMyBatis() {return new BatChunkReader();}@Beanpublic Job insertJob(BatChunkListener listener){String jobName = "InsertJob:" + System.currentTimeMillis();return jobBuilderFactory.get(jobName).start(stepInsert()).listener(listener).build();}@Beanpublic Step stepInsert(){return stepBuilderFactory.get("stepInsert").<Map<String, String>, Map<String, String>>chunk(1).reader(readerByMyBatis()).writer(batChunkWriter).transactionManager(dbWriteManager).allowStartIfComplete(true).build();}
}
6. 测试:
三. 代码下载:SpringBatch Sample