티스토리 툴바



'Spring 지식 공유하기/batch'에 해당되는 글 30건

  1. 2009/08/31 [Batch Inside] Job
  2. 2009/08/31 [Batch Inside] JobLauncher - 2
  3. 2009/08/27 [Batch Inside] JobLauncher
  4. 2009/03/12 Spring batch 1.x에서 2.0 변경된 내용 요약 (2)
  5. 2009/01/06 Spring Batch 1.0에서 2.0으로 진화하기- 5. Configuration
  6. 2009/01/05 Spring Batch 1.0에서 2.0으로 진화하기- 3. JobExecutionLisneter & 4. ItemProcessor
  7. 2009/01/02 Spring Batch 1.0에서 2.0으로 진화하기- 1. ItemReader/ItemWriter(2)
  8. 2009/01/02 Spring Batch 1.0에서 2.0으로 진화하기- 1. ItemReader/ItemWriter(1)
  9. 2008/12/29 Spring batch의 커스텀 네임스페이스 마련과 SS enterprise batch? (2)
  10. 2008/11/27 [Beta 3.0] Spring Batch 프레임웍 레퍼런스 한글 편역 버전. (6)
2009/08/31 21:00

[Batch Inside] Job

이번에는 org.springframework.batch.core.Job 클래스를 살펴보겠습니다.

Job 클래스는 이름에서 볼 수 있듯이, 배치 잡을 표현하는 배치 도메인 클래스입니다. 하지만 개발 시에 Job을 직접 개발하는 것은 아닙니다.
실제 배치 업무는 각 Step으로 구현해서 등록하게 되며, Job은 실행되는 잡의 유형에 따라 선택되게 됩니다.(물론 확장이 가능합니다.)

Job 인터페이스에는 총 네 개의 메소드가 정의되어 있습니다.
public interface Job {
	String getName();
	boolean isRestartable();
	void execute(JobExecution execution);
	JobParametersIncrementer getJobParametersIncrementer();
}


execute() 메소드가 잡을 실행시켜주는 진입점이 됩니다. 여기서 한 가지 주의할 부분은 execute()에서는 잡이나 스텝(step)의 실패나 성공에 관계 없이 아무런 예외를 던지지 않는다는 것입니다. 성공 여부를 판단하려면, 예외를 처리하는 것이 아니라 JobExecution의 상태(status)를 기준으로 판단해야 합니다.

Job의 구현 클래스로는 SimpleJob과 FlowJob이 있습니다.(AbstractJob은 제외)

org.springframework.batch.core.job.SimpleJob은 Job을 구현하는 기본 클래스이고, org.springframework.batch.core.job.flow.FlowJob은 복잡한 Step들의 흐름(flow) 처리를 지원하는 클래스입니다.
Job 객체는 직접 (bean으로) 설정도 가능하며, 별도의 Repository에서 찾게 할 수도 있습니다. Job을 결정하는 부분은 JobLauncher 외부에서 이루어 지기 때문에 이 부분은 손쉽게 확장할 수 있는 부분으로 생각됩니다.

언제 SimpleJob을 사용하는지, FlowJob을 사용하는지는 batch 네임스페이스를 찾아 보면 조금 더 정확하게 알 수 있을 것 같습니다. 다음에 더 찾아보도록 하겠습니다.
일단은 SimpleJob을 기준으로 내용을 더 살펴보도록 하겠습니다.

JobLauncher로부터 호출되는 Job 실행의 진입지점인 execute() 메소드에서는 크게 다음과 같은 순서로 진행되게 됩니다.
try{
    if(잡이 아직 끝나지 않았나?){    
        잡 선행 처리 수행
        잡 실행
    }else(){
        잡이 끝났으니 잡 상태를 끝났음으로 변경하고 지나감
    }
}catch(JobInterruptedException){
    잡이 중단 됐으니 상태를 잡 중단으로 설정하고 끝냄
}catch(){
    잡 처리 중 예외가 발생했으니 잡 상태를 실패로 설정하고 끝냄
}finally{
    잡 후행 처리 수행
    잡 정보 저장소로 갱신
}


선, 후행 처리는 org.springframework.batch.core.JobExecutionListener 인터페이스를 구현해서 처리하게 됩니다.
public interface JobExecutionListener {
	void beforeJob(JobExecution jobExecution);
	void afterJob(JobExecution jobExecution);
}

주의할 점은 이 리스너는 Job 객체에 멤버 변수로 지정되어 있기 때문에, 리스너 구현 시 쓰레드-안전하게(thread safety) 구현해야 합니다.

잡 실행 부분에서는 다시 doExecute() 메소드를 호출하게 됩니다. 여기까지는 org.springframework.batch.core.job.AbstractJob 클래스의 구현부분이며, 이제부터 SimpleJob과 FlowJob이 구분되게 됩니다.
역시나 SimpleJob으로 내용을 계속 살펴보도록 하겠습니다.

SimpleJob에서는 등록되어 있는 Step 별로 다시 handleStep() 메소드를 호출해서 이 메소드 내에서 Job에 등록되어 있는 Step의 정보를 파악하여 Step을 호출해주게 됩니다.
protected final StepExecution handleStep(Step step, JobExecution execution) throws JobInterruptedException,
		JobRestartException, StartLimitExceededException {
	StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());//(1)
	StepExecution currentStepExecution = lastStepExecution;

	if (shouldStart(lastStepExecution, jobInstance, step)) {//(2)

		currentStepExecution = execution.createStepExecution(step.getName());
		boolean isRestart = (lastStepExecution != null && !lastStepExecution.getStatus().equals(
				BatchStatus.COMPLETED));

		jobRepository.add(currentStepExecution);//(3)

		step.execute(currentStepExecution);//(4)

		jobRepository.updateExecutionContext(execution);//(5)
	} else {
		// currentStepExecution.setExitStatus(ExitStatus.NOOP);
	}
	return currentStepExecution;
}

먼저 JobRepository에서 해당 Step의 마지막 StepExecution 찾아보게 됩니다.
그리고 나서 StepExecution 정보를 기준으로 Job을 시작할 수 있는지를 판단하게 됩니다. 이 때는 BatchStatus와 Step의 StepExecution 횟수가 Step의 startLimit를 넘지 않았는지를 판단의 기준으로 사용하게 됩니다.

BatchStatus는 enum 클래스로 COMPLETED, STARTING, STARTED, STOPPING, STOPPED, FAILED, ABANDONED, UNKNOWN의 상태를 제공합니다. enum 클래스 구현 예제로 참조할만 합니다.

시작해도 된다 판단되면, JobRepository에 현재 StepExecution 정보를 저장하고(2), 스텝을 실행하게 됩니다.(3)
스텝 실행 후에 ExecutionContext의 정보를 업데이트하면서 마무리를 합니다.(5)


크리에이티브 커먼즈 라이선스
Creative Commons License
Trackback 0 Comment 0
2009/08/31 08:59

[Batch Inside] JobLauncher - 2

처음 JobLauncher 분석에서 실제로 JobLauncher에 대해 자세히 살펴보지를 못해서 조금더 살펴보도록 하겠습니다.

이전 글에서도 설명했듯이, 기본적으로 제공되는 JobLauncher 인터페이스의 구현 클래스는 SimpleJobLauncher가 있습니다.

이 클래스의 역할을 요약해보면,
1. JobExecution 정보 얻어오기
2. (비)동기로 Job 수행

입니다.

먼저 JobExecution 정보를 얻어오는 부분을 보면,
public JobExecution run(final Job job, final JobParameters jobParameters)
		throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {

	JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
	if (lastExecution != null) {
		if (!job.isRestartable()) {
			throw new JobRestartException("JobInstance already exists and is not restartable");
		}
		else {
			/*
			 * There is a very small probability that a non-restartable job
			 * can be restarted, but only if another process or thread
			 * manages to launch and fail a job execution for this
			 * instance between the last assertion and the next method
			 * returning successfully.
			 */
			jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
		}
	}
	else {
		jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
	}
	//...
}

JobExecution은 JobRepository를 통해서 얻게 됩니다.
JobRepository 구현 클래스로는 SimpleJobRepository가 있으며, 테스트 등의 목적으로 데이터베이스 없이 Map 형태로 돌아가는 SimpleJobRepository를 생성해주는 org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean과 실제 데이터베이스 스키마 구조에서 맞춰 수행되는 DAO를 제공하는 org.springframework.batch.core.repository.support.JobRepositoryFactoryBean이 있습니다.

여기서 한 가지 주의해서 볼 점은 재시작(restart)입니다. 코드가 단순하니, 코드를 살펴보면 재시작의 조건을 JobExecution 정보로 판단하고 있습니다.

마지막(last) JobExecutio이 있는지를 기준으로 판단하게 되는데, 여기서 드는 궁금점은 여기서 마지막(last)의 의미입니다.

이를 이해하기 위해서는 JobRepository 구현 클래스를 코드를 살펴보면 되겠죠. SimpleJobRepository은 JobInstance 인스턴스의 유/무를 통해서 판단하게 됩니다.

그 다음으로는 Job을 실제로 실행하는 부분입니다.
여기서는 TaskExecutor를 사용하고 있습니다. 주입된 TaskExecutor가 SyncTaskExecutor라면 동기 처리가 되고, 반대로 비동기 용 TaskExecutor들이 주입되면 비동기 처리가 되게 됩니다.
코드도 어렵지 않게 이해할 수 있습니다.
taskExecutor.execute(new Runnable() {

	public void run() {
		try {
			logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters + "]");
			job.execute(jobExecution);
			logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
					+ "] and the following status: [" + jobExecution.getStatus() + "]");
		}
		catch (Throwable t) {
			logger.info("Job: [" + job + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters + "]", t);
			rethrow(t);
		}
	}

	private void rethrow(Throwable t) {
		if (t instanceof RuntimeException) {
			throw (RuntimeException) t;
		}
		throw new RuntimeException(t);
	}
});

두 번째 살펴 봤지만, JobLauncher는 아주 간단한 구조로 되어 있습니다.

어제, 오늘 살펴본 구조를 도식화 해보면 다음과 같습니다.

크리에이티브 커먼즈 라이선스
Creative Commons License
Trackback 0 Comment 0
2009/08/27 07:00

[Batch Inside] JobLauncher


Spring Batch에서는 org.springframework.batch.core.launch.JobLauncher를 사용해서 잡(Job)을 실행하게 됩니다.

JobLauncher에는 run()이라는 하나의 메소드가 존재합니다. 즉, 이 run() 메소드를 통해서 잡을 실행하게 되는 것이죠.
run() 메소드의 명세를 보면서 이 메소드의 동작을 조금더 유추해볼 수 있습니다.
public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException;

일단 메소드 인자로는 Job과 JobParameters를 받게 됩니다.
먼저 Job을 보면, JobLauncher가 잡을 실행해주지만, Job을 얻는 행위는 요청자가 직접 해야한다는 것을 알 수있습니다. 즉, Job을 어디선가 얻어다가 JobLauncher에게 실행해달라고 요청해야 한다는 점이죠.

Job 인스턴스는 org.springframework.batch.core.configuration.JobLocator를 통해서 얻게 됩니다. 역시나 이 인터페이스에도 Job을 얻을 수 있는 하나의 메소드를 정의하고 있습니다.
Job getJob(String name) throws NoSuchJobException;

여기서 잡의 이름을 주면 해당 Job이 튀어 나오게 됩니다. 실제로 이 JobLocator는 그 하위에 JobRegistry 인터페이스가 상속을 하게 되며, 그 구현체로는 기본적으로 ClassPathXmlJobRegistry와 MapJobRegistry가 제공됩니다.
단순히 테스트 목적이라면 MapJobRegistry를 사용하셔도 됩니다. 실제로는 이 정보를 데이터베이스에 저장하며, 이때는 org.springframework.batch.core.repository.support.JobRepositoryFactoryBean 클래스를 사용하게 됩니다.

run() 메소드의 두 번째 인자는 org.springframework.batch.core.JobParameters 입니다. 이 클래스는 단지 org.springframework.batch.core.JobParameter의 Map을 가지고 있는 데이터의 홀더(holder)역할만 하게 됩니다. JobParameter는 잡의 유일성을 보장하기 위해 지정하거나, 참조 데이터를 저장하는데 사용하는 클래스입니다. JobParameter의 값으로는 String, Long, Date, Double만 들어갈 수 있으며, 이 타입은 org.springframework.batch.core.JobParameter.ParameterType으로 정의되어 있습니다.(enum 타입입니다.)

메소드 인자 타입에 대해서 살펴 봤구요, 그 다음으로는 반환 타입인 JobExecution입니다.

org.springframework.batch.core.JobExecution은 Job의 실행 시점에 대한 정보를 유지하는 도메인 클래스 입니다. JobExecution에 대한 이해를 위해서는 약간의 개념적인 이해가 필요합니다.

그림을 보면 알 수 있듯이, Job은 저희가 정의한 Job 자체에 대한 정보를 의미합니다. XML에 정의한 Job 정보라고 이해하면 되겠고, JobInstance는 특정 시점에 실행하기 위해 생성한 Job의 인스턴스를 의미합니다.
JobExeuction은 이 특정 시점의 Job을 실행했을 때의 정보를 의미합니다. 그림에서 예를 들고 있듯이, 2007/05/05에 실행되는 것으로 예약되어 있는 Job이 성공적으로 실행된다면, 한 번에 마무리 될 수 있지만, 중간에 예외가 발생하여, 재시작을 하거나, 재시도를 하게될 수 도 있는데, 이때는 두 번째, 세 번째 JobExecution이 생성되게 됩니다.

JobExecution을 살펴보면 좀더 살펴보면,
public class JobExecution extends Entity {
   private JobInstance jobInstance;
   private volatile Collection stepExecutions = new LinkedHashSet();
   private volatile BatchStatus status = BatchStatus.STARTING;
   private volatile Date startTime = null;
   private volatile Date createTime = new Date(System.currentTimeMillis());
   private volatile Date endTime = null;
   private volatile Date lastUpdated = null;
   private volatile ExitStatus exitStatus = ExitStatus.UNKNOWN;
   private volatile ExecutionContext executionContext = new ExecutionContext();
   private transient volatile List failureExceptions = new ArrayList();
}

이런 속성을 가지고 있습니다. 여기서 Job의 성공 여부를 판단하는 ExistStatus는 비동기 처리시에는 성공 여부에 관계없이 ExitStatus.UNKNOWN로 반환된다니 주의해야 합니다.

이렇게 run() 메소드의 선언부를 살펴봤습니다.

그럼 이번에는 실제로 JobLauncher를 사용해서 Job을 실행시켜 주는 코드를 살펴보겠습니다. 사용하는 방식을 보면 이해가 더 잘됩니다.

Spring Batch에는 실제로 잡 실행을 해주는 기능을 제공하는 클래스는 단 하나 제공합니다. 바로 org.springframework.batch.core.launch.support.CommandLineJobRunner입니다. 커맨드 라인 명령어로 잡을 실행할 수 있게 해주는 기능을 제공하는 클래스로, main() 메소드로 실행시켜 준다라고 생각하시면 됩니다.(실제로도 그렇습니다^^)
ConfigurableApplicationContext context = null;
try {
   context = new ClassPathXmlApplicationContext(jobPath);
   context.getAutowireCapableBeanFactory().autowireBeanProperties(this, AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE, false);

   Assert.notNull(launcher,"A JobLauncher must be provided.  Please add one to the configuration.");

   Job job;
   if (jobLocator != null) {
       job = jobLocator.getJob(jobName);
   }
   else {
       job = (Job) context.getBean(jobName);
   }

   JobParameters jobParameters = jobParametersConverter.getJobParameters(StringUtils.splitArrayElementsIntoProperties(parameters, "="));
   JobExecution jobExecution = launcher.run(job, jobParameters);
   return exitCodeMapper.intValue(jobExecution.getExitStatus().getExitCode());
}catch (Throwable e) {
   logger.error("Job Terminated in error:", e);
   return exitCodeMapper.intValue(ExitStatus.FAILED.getExitCode());
}finally {
   if (context != null) {
       context.close();
   }
}

다음 순서로 잡을 실행시켜 줍니다.
1. java 인자로 넘어온 정보를 받아서 Spring 컨테이너 생성
2. JobLocator를 사용해서 호출할 Job을 받아 옵니다.
3. java 인자로 받은 파라미터 정보를 org.springframework.batch.core.converter.JobParametersConverter를 사용해서 JobParameters로 변환합니다. 기본은 org.springframework.batch.core.converter.DefaultJobParametersConverter 클래스입니다.
4. JobLauncher를 사용해서 Job을 실행합니다.

API에 나오는 사용 예제를 살펴보면,

java org.springframework.batch.execution.bootstrap.support.CommandLineJobRunner testJob.xml testJob schedule.date=2008/01/24 vendor.id=3902483920

JobLauncher를 설정하는 방법은 간단합니다. JobLauncher와 Job 정보를 받아올 때 사용할 JobRegistry(위에서 설명한 JobLocator의 하위 인터페이스)의 구현 클래스나 JobRepositoryFactoryBean와 같은 팩토리 빈을 사용하시면 됩니다.

	




지금까지 배치 Job 실행과 관련된 JobLauncher 클래스를 살펴봤습니다.
크리에이티브 커먼즈 라이선스
Creative Commons License
Trackback 0 Comment 0
2009/03/12 23:17

Spring batch 1.x에서 2.0 변경된 내용 요약

 

# 주요 주제
- 자바 5 + Spring 3.0!
- 비 연속성 실행(Non-sequential execution )
- 가용성(Scalability)
- 설정 개선: 애노테이션과 XML 네임스페이스

- 프로젝트의 물리적인 구조는 동일
- 1.x에서 모자랐던 부분(immature enough)을 보완하고, 많은 기능 추가

# 자바 5

- Type Safety
2.0.0.M1에서 도입해서, generic과 parameterised type의 장점을 취하게 됐다.
1.x에 존재하던 mark()와 reset() 메소드는 사용자가 잘못 이해해서 사용되는 경우가 많았는데, 2.x에서는 인터페이스에서 제외하고 Step 구현 내부, 즉 프레임웍에서 제공

public interface ItemReader<S> {
    S read();
}

- 청크기반 처리(chunk-oriented processing)
1.x에서는 아이템 기반 처리(item-oriented processing)으로 flush()와 clear()가 있었지만, 청크 기반 처리 방법으로 변경되면서 없어졌다.
성능에 입각해볼 때 청크 기반이 배치에서는 더 자연스러 방법이다.

public interface ItemWriter<T> {
    void write(List<? extends T> items);
}

- Step 팩토리 빈의 변화
청크 기반으 처리 방법으로 변경되면서, 부가적으로 스텝 팩토리 빈에도 변경이 일어났다.
기존에는 SkipLimitStepFactoryBean에서 FaultTolerantStepFactoryBean으로 변경됐다.
새로운 ItemReader, ItemWriter는 트랜잭션 처리를 하지 않는 입력 자원들을 적절하게 처리 가능하며, 롤백 후에 아이템을 다시 처리하고 싶다면 팩토리 빈에서 플래그(isReaderTransactional - 2.0.0.M2에 등장)를 설정해야 한다.

- 비즈니스 처리
ItemProcessor가 새로 등장.

public interface ItemProcessor<S,T> {
    T process(S item);
}

1.x에도 변환(tranformation)이라는 개념으로 존재하기는 했지만, 2.x에서는 이 관점을 더 일반화해서, ItemReader나 ItemWriter와 동등한 수준에서 사용할 수 있도록 중요하게 다뤘다.


- 더 유용해진 Tasklet 인터페이스
읽기나 쓰기가 필요없는 비즈니스 로직을 수행하려면 Tasklet을 사용하자.
2.0에서는 훨씬 더 유연하면서, 유용해졌다.

public interface Tasklet {
    ExitStatus execute(StepContribution contribution,
        AttributeAccessor attributes);
}

StepContribution은 1.x부터 있던 API지만 숨어지내 왔다. 이 클래스의 역할은 개발자가 동기화 걱정없이 현재 StepExecution에서 갱신된 내용을 수집해오는 역할을 한다.
AttributeAccessor는 키-값 쌍으로 된 청크 범위의 가방이다.

- 잡과 스텝 속성의 바인딩 늦게 하기(Late Binding of Job and Step Attributes)
스프링 EL # 패턴을 사용해서 실제로 사용할 때 프로퍼티를 바인딩. 1.x에서 StepExecutionResourceProxy가 있었지만 이를 더 일반화 했다.

쓰레드 안전한 문제에 대한 좋은 대안이 될 수 있다. scope="step"으로 된 컴포넌트에 late binding을 적용하면 좋을 듯. 그러면 스텝 당 바인딩이 되면서 쓰레드 안전 문제도 해결.

<bean id="step1" parent="simpleStep">
  <property name="itemReader">
    <bean class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property value="#{jobParameters[inputFile]}" />
        ...
    </bean>
  </property>
  ...
</bean>


# 스프링 3.0
스프링 배치 2.0은 스프링 2.5.6을 기반으로 하며, 스프링 배치 2.1에서 스프링 3.0을 도입할 계획이다.

# 비 연속성 실행
1.x 모델은 연속되는 흐름을 갖는 스템으로 잡을 구성했으며, 하나의 스텝이 실패하면 잡이 실패하는 구조였다.
여전히 이 구조가 옳기는 하지만, 2.0에서는 이 제약에서 벗어날 수 있는 방법을 마련했다.
이 방법은 M3에서 나올 예정.

- 조건에 따른 실행(conditional execution): ExistStatus에 따라 다른 스텝으로 나뉨
- 진행이 명확해질 때까지 실행을 멈추고 기다림: 핵심적인 데이터를 처리할 때 좋음
- 다수의 스텝을 병렬로 실행

이를 XML 설정으로 가능하다.

# 가용성
1.x는 단일 VM에서 멀티 쓰레드 기반 모델로 동작했다. 그러나 많은 프로젝트들이 멀티 프로세스 기반으로 성공적으로 수행됐다.
그러나 정확한 흐름으로 실행됐을 때만 기능의 품질 보증이 가능하다.
2.0에서는 두 가지 방안을 제시한다.

(1) 원격 청크처리(remote chunking)
데이터 구조에 대한 명확한 지식 없이 스텝의 작업을 나눌 수 있는 기술.
동일 프로세스에서 읽어들인 데이터를 동적으로 나눠서 다수의 원격 작업 프로세스로 보낸다.
리스너 패턴을 구현. 이 패턴에서는 요청과 응답 전송에 신뢰성이 있어야 하는데, 이는 JMS 구현체가 이미 제공해준다.
스프링 배치는 Spring Integration과 연계해서 이를 제공한다.

(2) 파티셔닝
반대로 파티셔닝은 주키의 범위나 처리되는 파일 이름과 같이 입력 데이터의 구조에 대한 지식에 의존한다.
이 방법의 장점은 마치 하나의 스텝으로 처리되는 것처럼 행동한다는 점이다.
별다른 구현이 필요없고, 테스트도 쉽다. 더군다나 한 번에 입력 데이터를 모두 읽어와 직렬화 처리해야 하는 병목 현상이 없다는 장점이 있다.
2.0에서는 PartitionHandler와 StepExecutionSplitter 두 인터페이스를 제공.
PartitionHandler는 나뉜 실행을 아는 유일한 존재면서, 구현 기술에 관계 없이 스텝에 요청을 보내고, 결과를 수집하는 역할을 수행
PartitionHandler는 SPI며, 로컬에서 실행되는 TaskExecutor로 구현한 구현체 하나는 제공한다. 이 구현체도 IO가 상당한 경우에 병렬 처리를 적용해 상당한 효과를 거둘 수 있다.
더 나아가 그리드 제품들을 사용해도 된다.

# 설정 개선: 애노테이션과 XML 네임스페이스
@MVC에서 아이디어를 얻어 다양한 수준의 애노테이션 제공할 예정.

XML 네임스페이스 예.

<job>
  <step name="gamesLoad" next="playerLoad"/>
  <step name="playerLoad">
    <next on="*" to="summarize"/>
    <next on="FAILED" to="compensate"/>
  </step>
  <step name="compensate"/>
  <step name="summarize"/>
</job>

# 데이터베이스 스키마 변경
애석하게도 메타 데이터 스키마가 변경됨. 하지만 1.x에서 2.0으로 마이그레이션 스크립트를 제공하니 걱정할 필요는 없다.
ExecutionContext 변경이 가장 중요. 이제 JSON에 컨텍스트 값을 저장했으니 더 이해하기 쉬울 것.
또한 통계를 대비한 구조 추가.

크리에이티브 커먼즈 라이선스
Creative Commons License
Trackback 0 Comment 2
2009/01/06 17:27

Spring Batch 1.0에서 2.0으로 진화하기- 5. Configuration

이전 글에서도 살펴 봤듯이 Spring Batch 2.x에서는 커스텀 네임스페이스가 전격적으로(?) 적용됐습니다. 현재는 org/springframework/batch/core/configuration/xml/spring-batch-2.0.xsd 스키마가 등록되어 있습니다. 네임스페이스는 http://www.springframework.org/schema/batch/spring-batch-2.0.xsd 입니다.

구조를 간단히 살펴보면 아래와 같습니다.

커스텀 네임스페이스에 대한 예제나 레퍼런스가 아직은 공식적으로 나온게 없습니다. 2.0 정식 버전이 나오면서 공개하고자 하는 것 같습니다.

테스트 코드 중 유일하게 예제 코드가 하나 있습니다.
<job id="job">
    <step name="step1">
        <stop on="COMPLETED" to="decision"/>
    </step>
    <decision id="decision" decider="decider">
        <next on="FOO" to="step2"/>
        <end on="*" status="FAILED"/>
    </decision>
    <step name="step2" />
</job>

jobRepository에 대한 예제는 없내요.. Spring One 자료를 살펴보면 그 구성을 좀 더 예측해볼 수 있습니다.
// Tasklet 등록
<job id="job>
  <step name="step1">
    <simple-task tasklet="tasklet" />
  </step>
</job>

// JobRepository 구성
<job-repository id="jobRepo1" />

// 복잡한 Step 구성


커스텀 네이스페이스 등장으로 확실히 설정은 좀더 단순화 될것 같습니다. 설정 자체가 단순화 되는 점도 장점이긴 하지만, 보다 명확한 구성요소(element)를 사용함으로써 얻는 이득도 클것 같습니다.

또한 2.x에서는 XML 기반 설정에서 벗어나 최신 트렌드에 따라 Annotation 기반 설정도 지원하게 될 것 같습니다. 지원되는 애노테이션은 org.springframework.batch.core.annotation 패키지에 포함되어 있습니다.


프레임웍 내부에서는 지원되는 애노테이션들을 Enum(JobListenerMetaData, StepListenerMetaData)으로 등록해서 사용하도록 되어 있습니다. 설정 방법 자체와는 관련이 없지만, Enum을 사용하는 사례로 살펴보면 좋을듯 합니다. Enum의 클래스명을 살펴보면 알 수 있듯이, 지원되는 애노테이션은 현재로 리스너의 JoinPoint 정도입니다. @Job이나 @Step과 같은 배치 잡 구성 자체에 대한 애노테이션은 아직 지원되지 않고 있습니다. Job이나 Step 같은 경우 어차피 Spring Batch에서 제공하는 구현 클래스를 사용하거나 직접 구현하는 경우에도 인터페이스를 구현하거나 추상 클래스를 상속해야 하니 굳이 별도의 애노테이션을 지원할 필요는 없어서 그럴 수도 있을 것 같습니다. 기존 Spring 코어의 애노테이션(@Component)로 사용하면 되니까요.. 차후에는 추가될 수도 있겠지만, POJO 기반 Job이나 Step이 등장할 수 있는지는 아직 잘 모르겠습니다..

설정에 대한 개선 내용은 본격적으로 관련 레퍼런스나 자료가 갱신되면 더 자세히 살펴봐야할 것 같습니다.
크리에이티브 커먼즈 라이선스
Creative Commons License
Trackback 0 Comment 0
2009/01/05 13:14

Spring Batch 1.0에서 2.0으로 진화하기- 3. JobExecutionLisneter & 4. ItemProcessor

3. JobExecutionListener
먼저 1.x 리스너를 보면,
public interface JobExecutionListener {
   void beforeJob(JobExecution jobExecution);
   void afterJob(JobExecution jobExecution);
   void onError(JobExecution jobExecution, Throwable e);
   void onInterrupt(JobExecution jobExecution);
}

총 네 개의 메소드가 선언되어 있습니다.

2.x를 살펴보면,
public interface JobExecutionListener {
   void beforeJob(JobExecution jobExecution);
   void afterJob(JobExecution jobExecution);
}

두 개의 메소드로 줄었습니다.

1.x에 있던 onError()와 onInterrupt()는 afterJob() 내에서 별도의 로직으로 처리하도록 유도하고 있습니다.
사용자 삽입 이미지

Job 실행 후 여러 가지 상황에 대해서 API로 정의하지 않고, 단순화된 API 내에서 개별적으로 처리를 하도록 변경 됐습니다. API를 단순화하면서, 좀더 유연한 구조로 변경됐다고 생각됩니다. AOP의 개념으로 보자면 JoinPoint가 4개에서 2개로 줄어들게 됐습니다. 곰곰히 생각해보면 JoinPoint는 실제로 잡 실행 전과 실행 후 두 개가 맞습니다.

4. ItemTransformer ==> ItemProcessor
ItemTransformer는 클래스 이름이 ItemProcessor로 변경 됐습니다. 1.x를 학습하던 저도 이름이 부적절하다는 생각을 했었는데, 적절히 변경된 것 같습니다. 굳이 클래스의 역할을 '변환(Transform)'에만 종속 시킬 필요도 없으며, 실제로도 변환외에도 다른 여러 방향으로 사용하기 때문입니다.

다음 코드를 통해서 사용법에 대해서 감을 잡을 수 있을 것 같습니다.

사용자 삽입 이미지

클래스 이름을 변경함과 동시에 클래스의 역할과 위상에 대한 명확한 정의가 함께 이루어 졌습니다. ItemProcessor는 ItemReader/Writer와 동일한 수준에서 아이템 처리의 목적으로 사용함이 명확해졌습니다. 거기에 제네릭을 도입해서 타입 안정성까지 강화됐습니다.

사용자 삽입 이미지

참조: Spring Batch 2.0 Overview at Spring One America 2009 (Lucas Ward)
크리에이티브 커먼즈 라이선스
Creative Commons License
Trackback 0 Comment 0
2009/01/02 11:53

Spring Batch 1.0에서 2.0으로 진화하기- 1. ItemReader/ItemWriter(2)

ItemWriter는 처리 흐름을 재설계하면서, API 개선과 구현 효율성을 이끌어온 경우라 볼 수 있습니다.

기존 ItemWriter의 처리 흐름을 살펴보면 아래와 같습니다.

사용자 삽입 이미지

read() - wirte()의 호출이 1 : 1 였습니다. 한 번 읽고, 한 번 쓰기죠. 하지만 이런 처리 흐름이 대용량 데이터 처리 시에는 지나치게 작은 쓰기 작업의 처리 단위가 됐기 때문에, 이를 보완하기 위해서 flush라는 개념을 사용했습니다. 그래서 write()에서는 실제 쓰기 작업을 수행하는게 아니라, 단지 버퍼에 해당 아이템을 기록해 두었고, 실제로 쓰기 작업은 flush가 발생했을 때, 즉 flush() 메소드가 호출되는 시점에 하도록 설계가 됐습니다.

하지만 2.x에서는 이 처리 흐름이 1 : 1에서 모두 : 1 로 변경 됐습니다.
사용자 삽입 이미지
즉, read()로 한 번에 모든 아이템을 불러 온 다음, 쓰기 작업 해야 하는 아이템의 목록을 한 번에 받아서 wirte() 내에서 처리하도록 변경 됐습니다. (read()에서 한 번에 모든 데이터를 받아오는 것이 좋지 않지만, 이는 한 번에 처리하는 chunk의 아이템 개수를 제한하는 방법과 함께 사용하면 됩니다.)

# 1.x ItemWrite
public interface ItemWriter {
   void write(Object item) throws Exception;
   void flush() throws FlushFailedException;
   void clear() throws ClearFailedException;
}

# 2.x ItemWrite
public interface ItemWriter<T> {
   void write(List<? extends T> items) throws Exception;
}

그러므로 쓰기 작업 대상이 되는 아이템을 버퍼에 저장해뒀다 한 번에 써버리는 일은 이제 필요 없게 됐습니다.

FlatFileItemWriter 구현을 보면 1.x에서는 write()가 단지 버퍼에 아이템을 추가하고, flush()에서 실제 쓰기 작업을 하던 것을, 2.x에서는 write() 메소드 내이 실제 쓰기 작업의 구현이 되어 있는 것을 볼 수 있습니다.

# 1.x
public void write(Object data) throws Exception {
   if(getOutputState().isInitialized()){
       FieldSet fieldSet = fieldSetCreator.mapItem(data);
       lineBuffer.add(lineAggregator.aggregate(fieldSet) + lineSeparator);
   }
   else{
       throw new WriterNotOpenException("Writer must be open before it can be written to");
   }
}

public void flush() throws FlushFailedException {
   OutputState state = getOutputState();
   for (Iterator iterator = lineBuffer.listIterator(); iterator.hasNext();) {
       String line = (String) iterator.next();
       try {
           state.write(line);
       }
       catch (IOException e) {
           throw new FlushFailedException("Failed to write line to output file: " + line, e);
       }
   }
   lineBuffer.clear();
   state.mark();
}


# 2.x
public void write(List<? extends T> items) throws Exception {

    if (!getOutputState().isInitialized()) {
       throw new WriterNotOpenException("Writer must be open before it can be written to");
   }

    OutputState state = getOutputState();

    StringBuilder lines = new StringBuilder();
   int lineCount = 0;
   for (T item : items) {
       lines.append(lineAggregator.aggregate(item) + lineSeparator);
       lineCount++;
   }
   try {
       state.write(lines.toString());
   }
   catch (IOException e) {
       throw new FlushFailedException("Could not write data.  The file may be corrupt.", e);
   }
   state.linesWritten += lineCount;
}

1.x의 write()+flush() 구현 내용이 2.x에서는 write()로 통합된 것이죠.

1.x와 2.x의 ItemWriter를 비교해보면 상당 부분 많은 클래스들이 변경 된것을 알 수 있습니다.

# 1.x ItemWriter hierarchy
사용자 삽입 이미지

# 2.x ItemWriter hierarchy
사용자 삽입 이미지

물리적인 리소스 파일을 갖게 되는 두 개의 구현클래스가 Resource를 직접 제어할 수 있도록 리소스 파일을 참조할 수 있도록 해주는 인터페이스도 도입이 됐네요..

한 가지 궁금했던 점은 1.x에 있던 HibernateAwareItemWriter 클래스 였는데, 2.x에서는 아예 사라져버렸습니다. 이제 처리할 아이템의 목록이 한 번에 넘어오니 굳이 기본 구현체를 제공할 필요가 없다고 생각했나 봅니다^^;
크리에이티브 커먼즈 라이선스
Creative Commons License
Trackback 0 Comment 0
2009/01/02 11:26

Spring Batch 1.0에서 2.0으로 진화하기- 1. ItemReader/ItemWriter(1)

Spring Batch는 SpringSource와 Accenture가 합심해서 함께 진행하고 있는 프로젝트로, Spring portfolio 중 가장 빠르게 보급되고 있고, 앞으로도 보급될 프로젝트로 주목 받고 있습니다.

현재는 2.0M3까지 나온 상태고 4월에 2.0 정식 버전이 릴리스 될 예정입니다.

2.0에서는 내부, 외부적으로 많은 부분이 개선될(이미 개선 된) 것 같습니다. 이미 적용된 부분이 많은데요, 자세한 내용은 change log(2.0m1, 2.0m2, 2.0m3)를 참고하시면 됩니다.

몇 번에 걸쳐서 2.0에서 개선된 기능이나 추가된 내용, 또는 내부 구현 방법이 어떻게 변경됐는지 살펴볼 생각입니다.

우선 배치 잡 구성에 있어서 핵심이 되는 ItemReader와 ItemWriter를 살펴보겠습니다.

1.x와 2.x의 인터페이스를 비교해보면 바로 차이점을 느낄 수가 있습니다.

# 1.x
- ItemReader
public interface ItemReader {
   Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException;
   void mark() throws MarkFailedException;
   void reset() throws ResetFailedException;
}

public interface ItemWriter {
   void write(Object item) throws Exception;
   void flush() throws FlushFailedException;
   void clear() throws ClearFailedException;
}

# 2.x
public interface ItemReader<T> {
   T read() throws Exception, UnexpectedInputException, ParseException;
}

public interface ItemWriter<T> {
   void write(List<? extends T> items) throws Exception;
}

ItemReader와 ItemWriter 모두 3개이던 메소드가 하나로 줄었습니다. 각 인터페이스의 존재에 딱 맞는 메소드 하나씩만 남게 됐습니다. ItemReader에 있는 mark(), reset()과 ItemWriter에 있는 flush()와 clear() 모두 꼭 필요한 기능은 아니면서, 오히려 사용자에게 혼란을 주어서 2.x에서는 제거하기로 했다고 합니다.

이들 기능이 필요한 없는 ItemReader와 ItemWriter를 위해 1.x에서는 AbstractItemReader와 AbstractItemWriter라는 클래스를 두고, 사용하지 않는 이들 메소드의 빈 구현체를 작성해두고, read()와 write()만 구현해서 사용하는 방법을 쓰기도 했습니다.

사용자 삽입 이미지
2.x에서는 이 두 클래스가 아예 사라졌습니다.

ItemReader의 mark()와 reset()은 chunk item 처리시 버퍼 기능을 구현을 하는데 사용되던 API 였습니다. 2.x에서 두 메소드가 없어지면서 이 기능 자체도 함께 사라지게 됐습니다. 그래서 기존 1.x에 존재하던 AbstractBufferedItemReaderItemStream클래스는 AbstractItemCountingItemStreamItemReader로 리팩터링 됐습니다. 이름에서도 알 수 있듯이, 1.x의 AbstractBufferedItemReaderItemStream클래스가 ItemReader의 mark()와 reset()을 구현한 버퍼 기능을 제공하는 추상 클래스였지만, 2.x의 AbstractItemCountingItemStreamItemReader는 버퍼 기능이 아닌 아이템을 읽은 횟수에 따른 재시도 처리만을 제공해주고 있습니다. 1.x의 AbstractBufferedItemReaderItemStream클래스는 다음 그림에서 볼 수있듯이, 주요 ItemReader 구현체들이 모두 상속하는 중요한 추상 클래스 중 하나였습니다.

사용자 삽입 이미지

이 클래스에서 버퍼 기능을 제거함에 따라서 해당 클래스와 이를 상속받는 클래스들의 구현 자체도 더욱 명료해졌습니다.

# 1.x AbstractBufferedItemReaderItemStream.read()
public Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException {
   currentItemCount++;

    if (shouldReadBuffer) {
       if (itemBufferIterator.hasNext()) {
           return itemBufferIterator.next();
       }
       else {
           // buffer is exhausted, continue reading from file
           shouldReadBuffer = false;
           itemBufferIterator = null;
       }
   }

    Object item = doRead();
   itemBuffer.add(item);

    return item;
}

# 2.x AbstractItemCountingItemStreamItemReader.read()
public T read() throws Exception, UnexpectedInputException, ParseException {
   currentItemCount++;
   return doRead();
}

멤버변수의 수도 9개에서 4개로 줄었습니다. 불필요한 기능을 제거하고, API를 명확하게 했으며, 내부 구현도 단순화한 좋은 리팩터링 사례로 생각됩니다.
크리에이티브 커먼즈 라이선스
Creative Commons License
Trackback 0 Comment 0
2008/12/29 18:13

Spring batch의 커스텀 네임스페이스 마련과 SS enterprise batch?

Spring batch는 주목받는 스프링 포트폴리오 중 하나지만, 아직 2.0 정식버전이 나오지 않아서 개인적으로는 좀더 기다려야 실제로 적용 전략을 짤 수 있지 않나 생각합니다.

그런 이유 중 가장 큰 하나는 아직 Job이나 Step에 대한 설정이 직관적이지 않고, 복잡한 면이 많기 때문입니다. 일일히 bean으로 등록해서 설정해야 하기 때문에, Job에 Step 몇 개 설정하다보면 XML 페이지가 넘어가기 일쑤 입니다.;;

요즘 한 창 상종가(?)를 달리고 있는 Annotation 주도 설정까지는 아니더라도(결국 annotation이 등장하지 않을까 생각합니다..) custom namespace를 활용하면 상당 부분 개선할 수 있겠다는 생각을 하던 중... SpringSource에 있는 친구들이 그 생각을 못하지 않았을텐데 하는 생각에 조금 찾아 봤더니 이미 구현을 완료했더라구요. 2.0M3에 포함되어 있습니다. 4.5일 걸려서 구현했습니다. 처음에 4일 구현했다가, 10일 뒤에 무슨 일이 있었는지 0.5일을 더 개발했네요..작업자는 Thomas(토마스^^) Risberg.

레퍼런스에는 아직 정리되어 있지 않았습니다. 정식버전에 소개하려고 마음 먹었는지..

문제는 얼마나 구성을 잘해서 설정을 편리하게 해줄까 입니다. 구성하기에 따라 설정이 더 복잡해질 수도 있고, 정말 나이스 해질 수도 있을 것 같아서요..

여기를 참조해보면 네임스페이스를 'batch-core'와 'batch-item' 두 개로 가져가고 있는 것을 확인할 수 있습니다. batch-item 부분을 보면 눈에 아직 안 익어서 그런지 추상화한 네임스페이스 조차도 조금 복잡해 보입니다.(물론 기존에 직접 bean으로 모두 등록하는 것과는 비교할 수 없겠지만요;;)

조금 더 개선이 있던지 아니면, 레퍼런스가 나오면 좀더 명확히 이해할 수 있을 것 같습니다.

2.0 정식 버전이 나오면 정확한 모습을 확인할 수 있을 것 같습니다. SpringOne 2008 자료를 보다 보니 Dave Syer(Spring batch 창시자 및 리더)가  SpringSource Enterprise Batch(!!)를 언급했는데요, 실제 어떻게 될지 모르겠지만, admin console, cluster deployment, scheduling 등에 대한 서비스를 제공할 거라고 하는데.. 릴리스 일정이 'jersey'로 되어 있군요! 무슨 말일까요('')?  확실이 엔터프라이즈 서비스는 계속 강화될 것 같습니다.
크리에이티브 커먼즈 라이선스
Creative Commons License
Trackback 0 Comment 2
2008/11/27 09:21

[Beta 3.0] Spring Batch 프레임웍 레퍼런스 한글 편역 버전.


Spring Batch 레퍼런스 편역 버전 3.0 입니다. 오늘은 스프링 배치 2.0 M3가 릴리스 됐습니다. 이것저것 변경되고, 추가된게 있는 것 같은데, 크게 변경된 부분은 없었습니다.

이번 레퍼런스 편역 버전은 2.0 M2 레퍼런스를 대상으로 작업이 됐습니다. 이전 편역 버전 2.0에서 했던 내용 중 스프링 배치 M2에서 추가되거나 변경 된 부분만 수정했습니다.

M2가 나왔을 때 작업은 했었는데, 못한 부분 채우고 올리자 싶다가...스프링 배치 버전만 계속 올라가다 보면 이미 작업해둔 문서가 '똥'될 까봐 바로 올립니다^^;

변경 내역은 아래와 같습니다.

버전 변경 여부 변경 대상
1.0 추가 1, 2, 3, 4 장 전체 혹은 일부
2.0 추가 3.11. 아이템 변환하기
2.0 추가 5. 반복하기(전체)
2.0 추가 6. 재시도하기(전체)
2.0 수정 3.6 XML 아이템 reader와 writer
2.0 수정 3.9 데이터베이스
3.0 수정 5. 반복하기(전체)
3.0 수정 4. Job 구성하고 실행하기(일부)
3.0 수정 1. 스프링 배치 소개(+2.0M2에서 추가된 내용)
3.0 추가 2.10. Item Processor (+2.0M2)
3.0 추가 3.5.2.9. 플랫 파일에서 예외 처리하기 (+2.0M2)
3.0 추가 3.9.2. 페이처 저리가 적용된 ItemReader (+2.0M2)
3.0 삭제 3.5.3.2. FieldSetCreator(-2.0M2)

그럼 많은 도움 되시길 바랍니다~.

ps. 근래 들어 스프링 배치 키워드로 검색해서 들어오시는 분이 많으시더군요~. 여러모로 공유할 수 있는 기회가 많아 졌으면 합니다^^
크리에이티브 커먼즈 라이선스
Creative Commons License
Trackback 0 Comment 6