Spring Batch
- 로깅/추적, 트랜잭션 관리, 작업 처리 통계, 작업 재시작, 건너뛰기, 리소스 관리 등 대용량 레코드 처리에 필수적 기능 제공
- 최적화 및 파티셔닝 기술을 통해 대용량 및 고성능 배치 작업을 가능하게 한다
- 배치가 실패하여 작업 재시작을 하면 처음부터가 아닌 실패한 시점부터 실행하게 된다.
- 중복 실행을 막기 위해 성공한 이력이 있는 Batch 는 동일한 Parameters로 실행 시 Exception 발생한다
Spring Batch는 Scheduler가 아니다.
Batch Job을 관리하지만, Job을 구동하거나 발생시키는 기능은 지원하지 않기 때문에, Quartz, Scheduler, Jenkins 등 을 사용해야 한다.
Spring Batch 용어
- Job
- Job은 배치처리 과정을 하나의 단위로 만들어 놓은 객체
- 배치처리 과정에 있어 전체 계층 최상단에 위치하고 있습니다.
- JobInstance
- Job의 실행의 단위
- Job을 실행시키게 되면 하나의 JobInstance가 생성되게 됩니다.
- EX) 1월 1일 실행, 1월 2일 실행을 하게 되면 각각의 JobInstance가 생성되며 1월 1일 실행한 JobInstance가 실패하여 다시 실행을 시키더라도 이 JobInstance는 1월 1일에 대한 데이터만 처리하게 됩니다.
- JobParameters
- Job에 대한 실행단위인 JobInstance를 JobParameters 객체를 통해 구분
- 추가로 개발자 JobInstacne에 전달되는 매개변수 역할도 하고 있습니다.
- String, Double, Long, Date 4가지 형식만을 지원하고 있습니다.
- JobExecution
- JobInstance에 대한 실행 시도에 대한 객체입니다.
- 1월 1일에 실행한 JobInstacne가 실패하여 재실행을 하여도 동일한 JobInstance를 실행시키지만 이 2번에 실행에 대한 JobExecution은 개별로 생기게 됩니다.
- JobExecution는 이러한 JobInstance 실행에 대한 상태,시작시간, 종료시간, 생성시간 등의 정보를 담고 있다.
- Step
- Job의 배치처리를 정의하고 순차적인 단계를 캡슐화
- Job은 최소한 1개 이상의 Step을 가져야 하며 Job의 실제 일괄 처리를 제어하는 모든 정보가 들어있다
- StepExecution
- JobExecution과 동일하게 Step 실행 시도에 대한 객체
- Job이 여러개의 Step으로 구성되어 있을 경우 이전 단계의 Step이 실패하게 되면 다음 단계가 실행되지 않음으로 실패 이후 StepExecution은 생성되지 않습니다. StepExecution 또한 JobExecution과 동일하게 실제 시작이 될 때만 생성됩니다. StepExecution에는 JobExecution에 저장되는 정보 외에 read 수, write 수, commit 수, skip 수 등의 정보들도 저장이 됩니다.
- ExecutionContext
- Job에서 데이터를 공유 할 수 있는 데이터 저장소
- ExecutionContext는 JobExecutionContext, StepExecutionContext 2가지 종류로 이 두가지는 지정되는 범위가 다르다
- JobExecutionContext의 경우 Commit 시점에 저장되는 반면 StepExecutionContext는 실행 사이에 저장
- ExecutionContext를 통해 Step간 Data 공유가 가능하며 Job 실패시 ExecutionContext를 통한 마지막 실행 값을 재구성 할 수 있다
- JobRepository
- 모든 배치 처리 정보를 담고있는 매커니즘
- Job이 실행되게 되면 JobRepository에 JobExecution과 StepExecution을 생성하게 되며 JobRepository에서 Execution 정보들을 저장하고 조회하며 사용
- JobLauncher
- Job과 JobParameters를 사용하여 Job을 실행하는 객체
- ItemReader
- Step에서 Item을 읽어오는 인터페이스
- ItemReader에 대한 다양한 인터페이스가 존재하며 다양한 방법으로 Item을 읽어 올 수 있다
- ItemWriter
- ItemWriter는 처리 된 Data를 Writer 할 때 사용
- Writer는 처리 결과물에 따라 Insert가 될 수도 Update가 될 수도 Queue를 사용한다면 Send가 될 수 있다
- Writer 또한 Read와 동일하게 다양한 인터페이스가 존재하며 기본적으로 Item을 Chunk로 묶어 처리
- ItemProcessor
- Reader에서 읽어온 Item을 데이터를 처리하는 역할을 하고 있다.
- Processor는 배치를 처리하는데 필수 요소는 아니며 Reader, Writer, Processor 처리를 분리하여 각각의 역할을 명확하게 구분
Job Example
- Job은 여러가지 Step의 모음으로 구성되어 있으며 순차적으로 Step을 수행하며 Batch를 수행한다.
- 단일 Step 구성 예제
@Slf4j @Configuration @EnableBatchProcessing public class SingleStepExampleConfig { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Bean public Job exampleJob() { Job exampleJob = jobBuilderFactory.get("exampleSingleStepJob") .start(step()) .build(); return exampleJob; } @Bean public Step step() { return stepBuilderFactory.get("singleStep") .tasklet((contribution, chunkContext) -> { log.info("Single Step!"); return RepeatStatus.FINISHED; }).build(); } }
- 다중 step 사용 예제
@Slf4j @Configuration @EnableBatchProcessing public class MultiStepExampleConfig { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Bean public Job exampleJob() { Job exampleMultiStepJob = jobBuilderFactory.get("exampleMultiStepJob") .start(startStep()) .next(nextStep()) .next(lastStep()) .build(); return exampleMultiStepJob; } @Bean public Step lastStep() { return stepBuilderFactory.get("lastStep") .tasklet(((stepContribution, chunkContext) -> { log.info("last step!"); return RepeatStatus.FINISHED; })).build(); } @Bean public Step nextStep() { return stepBuilderFactory.get("nextStep") .tasklet(((stepContribution, chunkContext) -> { log.info("next step!"); return RepeatStatus.FINISHED; })).build(); } @Bean public Step startStep() { return stepBuilderFactory.get("startStep") .tasklet(((stepContribution, chunkContext) -> { log.info("Start Step!"); return RepeatStatus.FINISHED; })).build(); } }
- Flow를 통한 Step 구성
@Slf4j @Configuration @EnableBatchProcessing public class FlowStepExampleConfig { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Bean public Job exampleJob() { Job flowStepJob = jobBuilderFactory.get("flowStepJob") .start(startStep()) .on("FAILED") // start step의 ExistStatus.FAILED인 경우 .to(failOverStep())// failover step 실행 .on("*") // failover step의 결과와 상관없이 .to(writeStep())// write step 실행 .on("*") // write step 결과와 상관 없이 .end() // flow 종료 .from(startStep()) .on("COMPLETED") // startStep에서 FAILED가 아닌 COMPLETED인 경우 .to(processStep()) .on("*") .to(writeStep()) .on("*") .end() .from(startStep()) .on("*")// startStep이 FAILED, COMPLETED가 아닌 경우 .to(writeStep()) .end() .build(); return flowStepJob; } @Bean public Step processStep() { return stepBuilderFactory.get("processStep") .tasklet(((stepContribution, chunkContext) -> { log.info("process step"); return RepeatStatus.FINISHED; })).build(); } @Bean public Step writeStep() { return stepBuilderFactory.get("writeStep") .tasklet(((stepContribution, chunkContext) -> { log.info("Write Step"); return RepeatStatus.FINISHED; })).build(); } @Bean public Step failOverStep() { return stepBuilderFactory.get("nextStep") .tasklet(((stepContribution, chunkContext) -> { log.info("FAILOVER STEP"); return RepeatStatus.FINISHED; })).build(); } @Bean public Step startStep() { return stepBuilderFactory.get("startFlowStep") .tasklet(((stepContribution, chunkContext) -> { log.info("start step!"); String result = "COMPLETED"; // COMPLETED, FAILED, UNKNOWN // Flow 에서 on 은 RepeatStatus가 아닌 ExitStatus를 바라본다 if(result.equals("COMPLETED")) { stepContribution.setExitStatus(ExitStatus.COMPLETED); } else if(result.equals("FAILED")) { stepContribution.setExitStatus(ExitStatus.FAILED); } else if(result.equals("UNKNOWN")) { stepContribution.setExitStatus(ExitStatus.UNKNOWN); } return RepeatStatus.FINISHED; })).build(); } }
Step 구성
- tasklet
- Functional Interface로 실패를 알리기 위해 예외를 throw할 때까지 execute를 반복적으로 호출한다
- lambda를 사용하여 구현하는 방법, Tasklet, StepExecutionListner 구현체 사용, MethodInvokingTaskletAdapter 를 통해 실행할 수 있다.
@Bean public Step taskStep() { return stepBuilderFactory().get("taskletStep") .tasklet(myTasklet()).build(); } @Bean public MethodInvokingTaskletAdapter myTasklet() { MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter(); adapter.setTargetObject(customerService); adapter.setTargetMethod("buisinessLogic"); return adapter; }
- chunk
- 처리되는 commit row 수를 의미
- Batch 처리에서 커밋되는 row 수는 chunk 단위로 Transaction을 수행하기 때문에 실패 시 chunk 단위 만큼 rollback하게 된다.
- chunk 시나리오
- 읽기(read) - database에서 배치처리할 데이터를 읽어온다.
- 처리(processing) - 읽어온 데이터를 가공, 처리한다 (필수X)
- 쓰기(writing) - 가공, 처리한 데이터를 database에 저장
- chunk size
Setting a fairly large page size and using a commit interval that matches the page size should provide better performance. 페이지 크기를 상당히 크게 설정하고 페이지 크기와 일치하는 커밋 간격을 사용하면 성능이 향상된다.
- read 쿼리 수행 시 1번의 transaction을 위해 두 설정의 값을 일치 시키는게 가장 좋은 성능 향상 방법이다.
Step 설정
- startLimit
@Bean @JobScope public Step step() throws Exception { return stepBuilderFactory.get("StepEx") .startLimit(3) .<Member, Member>chunk(10) .reader(reader(null)) .processor(processor(null)) .writer(writer(null)) .build(); }
- 해당 Step 이 실패 후 재시작 가능 횟수를 의미
- startLimit 이후 실행은 Exception이 발생한다.
- skip
@Bean @JobScope public Step Step() throws Exception { return stepBuilderFactory.get("Step") .<Member,Member>chunk(10) .reader(reader(null)) .processor(processor(null)) .writer(writer(null)) .faultTolerant() .skipLimit(1) // skip 허용 횟수, 해당 횟수 초과시 Error 발생, Skip 사용시 필수 설정 .skip(NullPointerException.class)// NullPointerException에 대해선 Skip .noSkip(SQLException.class) // SQLException에 대해선 noSkip //.skipPolicy(new CustomSkipPolilcy) // 사용자가 커스텀하며 Skip Policy 설정 가능 .build(); }
- retry
@Bean @JobScope public Step Step() throws Exception { return stepBuilderFactory.get("Step") .<Member,Member>chunk(10) .reader(reader(null)) .processor(processor(null)) .writer(writer(null)) .faultTolerant() .retryLimit(1) //retry 횟수, retry 사용시 필수 설정, 해당 Retry 이후 Exception시 Fail 처리 .retry(SQLException.class) // SQLException에 대해선 Retry 수행 .noRetry(NullPointerException.class) // NullPointerException에 no Retry //.retryPolicy(new CustomRetryPolilcy) // 사용자가 커스텀하며 Retry Policy 설정 가능 .build(); }
- noRollback
@Bean @JobScope public Step Step() throws Exception { return stepBuilderFactory.get("Step") .<Member,Member>chunk(10) .reader(reader(null)) .processor(processor(null)) .writer(writer(null)) .faultTolerant() .noRollback(NullPointerException.class) // NullPointerException 발생 rollback이 되지 않게 설정 .build(); }
@JobScope, @StepScode
- @JobScope
- Step 선언문에 사용 가능
- @StepScope
- Step을 구성하는 ItemREader, ItemProcessor, ItemWriter에 사용
- 특징
- singleton 패턴이 아닌 annotation이 명시된 메소드의 실행 시점에 bean이 생성
- @JobScope, @StepScope 빈이 생성될 때 JobParameter가 생성되기 때문에 JobParameter 사용하기 위해 scope을 지정해주어야 한다.
- 유연한 설계, 병렬 실행을 위해 LateBinding을 하여 JobParameter를 비즈니스 로직 단계에서 할당
- JobLauncher 로 실행
@Configuration @Slf4j public class JobScheduler { @Autowired private JobLauncher jobLauncher; @Autowired private Job ExampleJob; @Scheduled(cron = "1 * * * * *") public void jobSchduled() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { Map<String, JobParameter> jobParametersMap = new HashMap<>(); SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); Date time = new Date(); String time1 = format1.format(time); jobParametersMap.put("date",new JobParameter(time1)); JobParameters parameters = new JobParameters(jobParametersMap); JobExecution jobExecution = jobLauncher.run(ExampleJob, parameters); while (jobExecution.isRunning()) { log.info("..."); } log.info("Job Execution: " + jobExecution.getStatus()); log.info("Job getJobConfigurationName: " + jobExecution.getJobConfigurationName()); log.info("Job getJobId: " + jobExecution.getJobId()); log.info("Job getExitStatus: " + jobExecution.getExitStatus()); log.info("Job getJobInstance: " + jobExecution.getJobInstance()); log.info("Job getStepExecutions: " + jobExecution.getStepExecutions()); log.info("Job getLastUpdated: " + jobExecution.getLastUpdated()); log.info("Job getFailureExceptions: " + jobExecution.getFailureExceptions()); } }
- JobParameters 사용
```java
/**
- 전체 금액이 10,000원 이상인 회원들에게 1,000원 캐시백을 주는 배치 */
@Slf4j @Configuration @EnableBatchProcessing public class ExampleJobConfig {
@Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired public EntityManagerFactory entityManagerFactory; @Bean public Job ExampleJob() throws Exception { Job exampleJob = jobBuilderFactory.get("exampleJob") .start(Step()) .build(); return exampleJob; } @Bean @JobScope public Step Step() throws Exception { return stepBuilderFactory.get("Step") .<Member,Member>chunk(10) .reader(reader(null)) .processor(processor(null)) .writer(writer(null)) .build(); } @Bean @StepScope public JpaPagingItemReader<Member> reader(@Value("#{jobParameters[date]}") String date) throws Exception { log.info("jobParameters value : " + date); Map<String,Object> parameterValues = new HashMap<>(); parameterValues.put("amount", 10000); return new JpaPagingItemReaderBuilder<Member>() .pageSize(10) .parameterValues(parameterValues) .queryString("SELECT p FROM Member p WHERE p.amount >= :amount ORDER BY id ASC") .entityManagerFactory(entityManagerFactory) .name("JpaPagingItemReader") .build(); } @Bean @StepScope public ItemProcessor<Member, Member> processor(@Value("#{jobParameters[date]}") String date){ return new ItemProcessor<Member, Member>() { @Override public Member process(Member member) throws Exception { log.info("jobParameters value : " + date); //1000원 추가 적립 member.setAmount(member.getAmount() + 1000); return member; } }; } @Bean @StepScope public JpaItemWriter<Member> writer(@Value("#{jobParameters[date]}") String date){ log.info("jobParameters value : " + date); return new JpaItemWriterBuilder<Member>() .entityManagerFactory(entityManagerFactory) .build(); } } ```
Spring Meta Table
- Spring Batch 에는 6개의 meta table 과 3개의 sequence table이 존재한다.
- Spring Batch Job이 실행될 때마다 실행된 job에 대한 정보 저장 목적
- sequence
- BATCH_JOB_INSTANCE, BATCH_JOB_EXECUTION및 BATCH_STEP_EXECUTION의 Primary Key는 시퀀스에 의해 생성
- table
- BATCH_JOB_INSTANCE
- JobInstance에 관련된 모든 정보 포함
- 전체 계층 구조의 최상위 역할
- BATCH_JOB_EXECUTION_PARAMS
- Job을 실행 시킬 때 사용했던 JobParameters에 대한 정보 저장
- BATCH_JOB_EXECUTION
- JobExcution에 관련된 모든 정보 저장
- JobInstance가 실행 될 때마다 시작시간, 종료시간, 종료코드 등 다양한 정보를 가지고 있다.
- BATCH_STEP_EXECUTION
- StepExecution에 대한 정보 저장
- STEP을 EXECUTION 정보인 읽은 수, 커밋 수, 스킵 수 등 다양한 정보를 추가로 담고 있다.
- BATCH_JOB_EXECUTION_CONTEXT
- JobExecution의ExecutionContext 정보 저장
- JobInstance가 실패 시 중단된 위치에서 다시 시작할 수 있는 정보를 저장하고 있다.
- BATCH_STEP_EXECUTION_CONTEXT
- tepExecution의 ExecutionContext 정보 저장
- JobInstance가 실패 시 중단된 위치에서 다시 시작할 수 있는 정보를 저장하고 있다.
- BATCH_JOB_INSTANCE
출처
- https://khj93.tistory.com/entry/Spring-Batch%EB%9E%80-%EC%9D%B4%ED%95%B4%ED%95%98%EA%B3%A0-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0