1. ItemProcessor 소개
- Spring Batch에서 ItemProcessor는 ItemReader와 ItemWriter 사이에서 비즈니스 로직을 처리하는 핵심 컴포넌트입니다.
- ItemReader로 읽어온 데이터를 ItemWriter로 쓰기 전에 변환, 검증, 필터링 등의 작업을 수행합니다.
- ItemProcessor는 Step에서 선택적으로 사용할 수 있으며, 복잡한 비즈니스 로직을 깔끔하게 분리할 수 있습니다.
1.1 ItemProcessor의 필요성
- ItemReader와 ItemWriter만으로는 단순한 읽기/쓰기만 가능합니다.
- 실제 배치 처리에서는 데이터 변환, 검증, 필터링 등의 비즈니스 로직이 필요합니다.
- Composite 패턴을 사용하여 ItemWriter나 ItemReader를 감싸는 방법도 있지만, ItemProcessor가 더 명확하고 효율적입니다.
Composite 패턴 예시
ItemWriter를 감싸서 비즈니스 로직을 추가하는 방법도 있지만, 단순한 변환의 경우 ItemProcessor가 더 적합합니다.
2. ItemProcessor 기본 인터페이스
2.1 인터페이스 구조
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
- 입력 타입 I와 출력 타입 O를 제네릭으로 지정합니다.
- process 메서드에서 하나의 객체를 받아 변환된 객체를 반환합니다.
- 입력과 출력 타입이 다를 수 있어 유연한 데이터 변환이 가능합니다.
2.2 기본 구현 예시
Foo를 Bar로 변환하는 예시
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
// 간단한 변환 로직: Foo를 Bar로 변환
return new Bar(foo);
}
}
public class BarWriter implements ItemWriter<Bar> {
public void write(Chunk<? extends Bar> bars) throws Exception {
// Bar 객체들을 처리
}
}
이 예시에서 FooProcessor는 Foo 타입의 객체를 받아 Bar 타입으로 변환합니다.
Step 설정에서 ItemProcessor 사용
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Bar>chunk(2, transactionManager)
.reader(fooReader())
.processor(fooProcessor())
.writer(barWriter())
.build();
}
- chunk 처리에서 입력 타입 Foo와 출력 타입 Bar를 명시합니다.
- processor() 메서드로 ItemProcessor를 설정합니다.
ItemProcessor는 선택사항
ItemReader와 ItemWriter와 달리 ItemProcessor는 Step에서 선택적으로 사용할 수 있습니다. 단순한 읽기/쓰기만 필요한 경우 생략 가능합니다.
3. ItemProcessor 체이닝
3.1 체이닝의 필요성
- 하나의 변환만으로는 복잡한 비즈니스 로직을 처리하기 어려운 경우가 있습니다.
- 여러 개의 ItemProcessor를 연결하여 단계적으로 데이터를 변환할 수 있습니다.
- CompositeItemProcessor를 사용하여 여러 프로세서를 체인처럼 연결합니다.
3.2 체이닝 구현 예시
다단계 변환 클래스 정의
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class Foobar {
public Foobar(Bar bar) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
// Foo를 Bar로 변환
return new Bar(foo);
}
}
public class BarProcessor implements ItemProcessor<Bar, Foobar> {
public Foobar process(Bar bar) throws Exception {
// Bar를 Foobar로 변환
return new Foobar(bar);
}
}
public class FoobarWriter implements ItemWriter<Foobar>{
public void write(Chunk<? extends Foobar> items) throws Exception {
// 최종 결과 처리
}
}
이 예시에서는 Foo → Bar → Foobar로 2단계 변환을 수행합니다.
CompositeItemProcessor 설정
CompositeItemProcessor<Foo,Foobar> compositeProcessor =
new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);
- 여러 개의 ItemProcessor를 리스트로 만들어 순서대로 실행됩니다.
- 첫 번째 프로세서의 출력이 두 번째 프로세서의 입력이 됩니다.
Spring Configuration으로 CompositeItemProcessor 설정
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Foobar>chunk(2, transactionManager)
.reader(fooReader())
.processor(compositeProcessor())
.writer(foobarWriter())
.build();
}
@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(new FooProcessor());
delegates.add(new BarProcessor());
CompositeItemProcessor processor = new CompositeItemProcessor();
processor.setDelegates(delegates);
return processor;
}
최종적으로 Foo 타입을 입력받아 Foobar 타입으로 출력하는 복합 프로세서가 완성됩니다.
4. 레코드 필터링
4.1 필터링과 스킵의 차이
- 필터링: 레코드가 유효하지만 처리하지 않아도 되는 경우 (예: 삭제 대상 레코드)
- 스킵: 레코드가 잘못되어 처리할 수 없는 경우 (예: 형식 오류)
4.2 필터링 구현 방법
- ItemProcessor에서
null
을 반환하면 해당 레코드는 다음 단계(ItemProcessor 또는 ItemWriter)로 전달되지 않고, 즉시 필터링되어 처리 대상에서 제외됩니다. - ItemReader가
null
을 반환하면 입력 데이터가 끝났다는 의미이지만, ItemProcessor에서null
을 반환해도 나머지 아이템들은 계속 정상적으로 처리됩니다. - Spring Batch는 ItemProcessor의 결과가
null
이면 자동으로 ItemWriter에 넘기지 않고, 해당 레코드를 건너뜁니다.
필터링 예시
public class RecordFilterProcessor implements ItemProcessor<Record, Record> {
@Override
public Record process(Record record) throws Exception {
// 삭제 타입 레코드는 필터링
if ("DELETE".equals(record.getRecordType())) {
return null; // 필터링됨
}
// INSERT, UPDATE 레코드만 통과
return record;
}
}
예외 처리 주의사항
ItemProcessor에서 예외가 발생하면 스킵으로 처리됩니다. 의도적인 필터링을 위해서는 반드시 null을 반환해야 합니다.
5. 입력 데이터 검증
5.1 검증의 필요성
- Spring Batch의 ItemReader와 ItemWriter는 구조적 검증은 수행합니다.
- FixedLengthTokenizer: 고정 길이 데이터 범위 누락 시 예외 발생
- RowMapper/FieldSetMapper: 존재하지 않는 인덱스 접근 시 예외 발생
- 하지만 이는 데이터가 올바른 형식인지만 확인할 뿐, 비즈니스 규칙은 검증하지 않습니다.
- 즉 ItemReader에서 파싱이 성공해도 비즈니스 로직 관점에서 유효하지 않을 수 있습니다.
- 예: 나이 필드가 음수인 경우, 필수 필드가 비어있는 경우 등
- ItemReader에서 유형과 포맷 관련된 데이터 유효성 검증을 수행할 수 있지만, 아이템 구성 이후 수행하는 비즈니스 규칙에 따를 유효성 검은은 ItemProcessor에서 수행하는 것이 권장됩니다.
- Spring Batch는 별도의 검증 프레임워크를 제공하지 않고, 기존 검증 프레임워크와 연동할 수 있는 인터페이스를 제공합니다.
- 따라서 어떤 검증 프레임워크든 연결할 수 있으며, 이것이 바로
Validator<T>
인터페이스입니다.
- 따라서 어떤 검증 프레임워크든 연결할 수 있으며, 이것이 바로
5.2 Validator 인터페이스
public interface Validator<T> {
void validate(T value) throws ValidationException;
}
- 객체가 유효하지 않으면 예외를 던지고, 유효하면 정상적으로 반환합니다.
- 기존 검증 프레임워크를 활용하여 구현할 수 있습니다.
5.3 ValidatingItemProcessor 사용
- Spring Batch는 이 검증 로직을 ItemProcessor 안에 넣어주는 ValidatingItemProcessor를 제공합니다.
- 이 프로세서는 아래와 같은 방식으로 작동합니다.
- 각 아이템을 받아서
- 설정된 Validator로 검증하고
- 검증 통과하면 그대로 다음 단계로 전달
- 검증 실패하면 ValidationException 발생
5.3.1 Spring Validator와 연동
@Bean
public ValidatingItemProcessor itemProcessor() {
ValidatingItemProcessor processor = new ValidatingItemProcessor();
processor.setValidator(validator());
return processor;
}
@Bean
public SpringValidator validator() {
SpringValidator validator = new SpringValidator();
validator.setValidator(new TradeValidator());
return validator;
}
- Spring Validator를 검증기로 사용하여 ValidatingItemProcessor를 설정합니다.
5.4 Bean Validation (JSR-303) 활용
- 추가적으로 BeanValidatingItemProcessor를 사용하여 JSR-303 어노테이션 기반의 검증을 수행할 수도 있습니다.
Bean Validation 어노테이션 사용 예시
class Person {
@NotEmpty
private String name;
public Person(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
- Person 클래스의 name 필드에 @NotEmpty 어노테이션을 적용하여 빈 값을 검증합니다.
BeanValidatingItemProcessor 설정
@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true);
return beanValidatingItemProcessor;
}
- BeanValidatingItemProcessor는 JSR-303 어노테이션을 자동으로 검증합니다.
- setFilter(true)를 설정하면 검증 실패 시 예외 대신 필터링됩니다.
6. 장애 허용성 (Fault Tolerance)
6.1 장애 허용성의 중요성
- 배치 처리 중 오류가 발생하면 청크가 롤백될 수 있습니다.
- 롤백 시 읽기 단계에서 캐시된 아이템들이 재처리될 수 있습니다.
- ItemProcessor는 동일한 입력에 대해 동일한 결과를 보장하는 멱등성(idempotent)을 가져야 합니다.
6.2 멱등성 구현 방법
- 입력 아이템을 변경하지 않고, 결과 객체만 업데이트합니다.
- 외부 시스템 호출이나 상태 변경을 최소화합니다.
- 가능한 한 순수 함수 형태로 구현합니다.
멱등성을 고려한 구현 예시
public class SafeItemProcessor implements ItemProcessor<InputData, OutputData> {
@Override
public OutputData process(InputData input) throws Exception {
// 입력 객체는 변경하지 않음
// 새로운 결과 객체만 생성하여 반환
OutputData output = new OutputData();
output.setProcessedValue(input.getRawValue() * 2);
output.setTimestamp(System.currentTimeMillis());
return output;
}
}
상태 변경 주의사항
재처리될 수 있으므로 ItemProcessor에서 입력 아이템을 직접 수정하거나 외부 상태를 변경하는 것은 위험합니다. 항상 새로운 결과 객체를 생성하여 반환하세요.
7. 실제 활용 사례
7.1 데이터 변환 파이프라인
@Component
public class CustomerDataProcessor implements ItemProcessor<RawCustomer, ProcessedCustomer> {
@Override
public ProcessedCustomer process(RawCustomer rawCustomer) throws Exception {
// 데이터 정제 및 변환
ProcessedCustomer processed = new ProcessedCustomer();
processed.setName(rawCustomer.getName().trim().toUpperCase());
processed.setEmail(rawCustomer.getEmail().toLowerCase());
processed.setAge(calculateAge(rawCustomer.getBirthDate()));
return processed;
}
private int calculateAge(LocalDate birthDate) {
return Period.between(birthDate, LocalDate.now()).getYears();
}
}
7.2 조건부 처리와 필터링
@Component
public class OrderProcessor implements ItemProcessor<Order, ProcessedOrder> {
@Override
public ProcessedOrder process(Order order) throws Exception {
// 취소된 주문은 필터링
if ("CANCELLED".equals(order.getStatus())) {
return null;
}
// 유효한 주문만 처리
ProcessedOrder processed = new ProcessedOrder();
processed.setOrderId(order.getId());
processed.setTotalAmount(calculateTotal(order));
processed.setProcessedDate(LocalDateTime.now());
return processed;
}
private BigDecimal calculateTotal(Order order) {
return order.getItems().stream()
.map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
}
8. 마치며
- ItemProcessor는 Spring Batch에서 비즈니스 로직을 처리하는 핵심 컴포넌트입니다.
- 데이터 변환, 검증, 필터링 등 다양한 용도로 활용할 수 있습니다.
- CompositeItemProcessor를 통해 복잡한 처리 과정을 단계별로 나눌 수 있습니다.
- 장애 허용성을 위해 멱등성을 고려한 구현이 중요합니다.
- 적절한 검증과 필터링을 통해 안정적인 배치 처리 시스템을 구축할 수 있습니다.