Consumer
1 Consumer 개념
- 프로듀서가 토픽으로 메시지를 전송하면 해당 메시지들은 브로커들의 로컬 디스크에 저장된다.
- 그리고 컨슈머는 토픽에 저장된 메시지를 가져올 수 있다.
1.1 컨슈머 그룹
- 컨슈머 그룹은 하나 이상의 컨슈머가 모여있는 그룹을 의미한다.
- 컨슈머는 반드시 컨슈머 그룹에 속하게 된다.
- 컨슈머 그룹은 각 파티션의 리더에게 카프카 토픽에 저장된 메시지를 가져오기 위한 요청을 보낸다.
1.2 파티션과 컨슈머
- 컨슈머 그룹으로 묶인
컨슈머는 1개 이상의 파티션을 할당받아 데이터를 가져갈 수 있다. - 반대로
파티션은 최대 1개의 컨슈머에게 할당이 가능하다. - 이러한 특징으로 컨슈머 그룹의 컨슈머의 수는 토픽의 파티션의 개수보다 작거나 같다.
- 컨슈머의 수가 파티션의 수를 넘어가면
컨슈머의 수 - 파티션의 수만큼의 컨슈머가 유휴 상태가 된다.- 따라서 파티션의 수와 컨슈머의 수가 일대일로 매핑되는 것이 이상적이다.
- 컨슈머 그룹내에서 리밸런싱 동작을 통해 장애가 발생한 컨슈머의 역할을 동일한 그룹에 있는 다른 컨슈머에게 할당한다
- 따라서 굳이 장애 대비를 위해 파티션 보다 많은 컨슈머를 이용할 이유가 없다.
1.3 Rebalancing
- 컨슈머 그룹의 한 컨슈머에서 장애가 발생하면 어떤일이 벌어진까?
- 장애간 발생한 컨슈머에 할당된 파티션은 정상 작동하는 같은 컨슈머 그룹의 다른 컨슈머로 소유권이 넘어간다.
- 이를
rebalancing이라 한다. - rebalancing 은 자주 일어나서는 안 된다.
- rebalancing이 발생 할 때 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없기 때문
- group coordinator는 rebalancing을 발동시키는 역할을 한다.
- broker중 하나가 group coordinator역할을 한다.
rebalancing은 두가지 상황에서 발생한다.
- 그룹에
consumer가 추가되는 상황 - 그룹에
consumer가 제외되는 상황
1.4 Commit
consumer는 카프카broker로부터 데이터를 어디까지 가져갔는지commit을 통해 기록한다.- 특정
topic의partition을 어떤consumer그룹이 몇 번째 가져갔는지broker내부에 기록된다. - 오프셋
commit은 명시적 또는 비명시적으로 수행할 수 있다. - 기본 옵션은
poll()메서드가 실행될 때 일정 간격마다 오프셋을commit하도록enable.auto.commit=ture로 설정되어 있다.- 이것이 비명시적인 오프셋
commit이다. auto.commit.interval.ms옵션으로 설정된 시간 이상이 지나면 현재까지 읽은 레코드의 오프셋을commit한다.poll()메서드가 실행될 때commit도 수행되므로 따로commit을 위한 코드를 작성할 필요는 없다.
- 이것이 비명시적인 오프셋
1.4.1 비명시적 오프셋 commit
- 비명시 오프셋
commit은 편리하지만 데이터 중복 또는 유실될 가능성이 있으므로 선택시 고려가 필요합니다. - 데이터 중복이 발생하는 경우
- consumer가 메시지를 가져와서 처리하는 도중
- auto.commit.interval.ms 시간이 아직 지나지 않아 커밋이 되지 않은 상태에서
- consumer가 크래시되거나 리밸런싱이 발생하면
- 다음 consumer가 이전에 처리했던 메시지부터 다시 가져오게 되어 중복 처리가 발생합니다
- 이런 경우 컨슈머가 멱등성을 보장하도록 조치합니다.
- 데이터 유실이 발생하는 경우:
- consumer가 메시지를 가져온 후
- 실제로 메시지를 처리하기 전에 auto.commit.interval.ms 시간이 지나서 자동 커밋이 발생하고
- 메시지 처리 도중 오류가 발생하면
- 해당 메시지는 처리되지 않았지만 이미 커밋되었으므로 유실됩니다
- 이러한 경우를 방지하기 위해서 메시지 처리 후에 명시적으로 커밋을 수행하는 것이 좋습니다.
2 컨슈머 오프셋 관리
- 컨슈머의 동작 중 가장 핵심은 오프셋 관리다.
- 컨슈머가 메시지를 어디까지 가져왔는지 표시하기 위해 읽은 메시지의 바로 다음 위치를 나타내는 오프셋을 사용한다.
- 오프셋은 숫자 형태로 나타낸다
- 컨슈머 그룹은 자신의 오프셋 정보를 카프카에서 가장 안전한 저장소인 토픽에 저장한다.
__consumer_offsets토픽에 각 컨슈머 그룹별로 오프셋 위치 정보가 기록된다.
2.1 오프셋 관리 동작 과정
- 컨슈머들은 지정된 토픽에서 메시지를 읽을 뒤 읽어온 위치의 오프셋 정보를
__consumer_offsets에 기록합니다. - 이 때 컨슈머 그룹, 토픽, 파티션 등의 내용을 통합해 기록한다.
- 이 기록으로 자신이 속한 컨슈머 그룹의 컨슈머 변경이 일어나면 해당 컨슈머가 어느 위치 까지 읽었는지를 추적할 수 있다.
2.2 __consumer_offsets 토픽
- 모든 컨슈머 그룹의 정보가 저장되는
__consumer_offsets토픽은 브로커 설정 파일인 server.properties에서 변경 가능하다.
offsets.topic.num.partitions
- 기본값 50
offsets.topic.replication.factor
- 기본값 3
3 컨슈머 API
producer가 전송한 데이터는broker에 적재된다.consumer는 적재된 데이터를 사용하기 위해broker로부터 데이터를 가져와 처리한다.
3.1 자바 컨슈머 애플리케이션
- 자바 애플리케이션으로 간단한 카프카 컨슈머를 구현해보자.
디펜던시 추가
compile 'org.apache.kafka:kafka-clients:2.8.0'
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
SimpleConsumer.java 작성
import com.google.gson.Gson;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
}
}
}
}
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
- 토픽 데이터를 가져올 카프카 클러스터의 IP, Port를 입력한다
ConsumerConfig.GROUP_ID_CONFIG
- 컨슈머 그룹을 선언한다.
컨슈머 그룹
컨슈머 그룹을 통해 컨슈머의 목적을 구분할 수 있다. 컨슈머 그룹을 기준으로 컨슈머 오프셋을 관리하기 때문에 subscribe() 메서드를 사용해 토픽을 구독하는 경우에는 컨슈머 그룹을 선언해야한다. 컨슈머가 중단되거나 재시작되더라도 컨슈머 그룹의 컨슈머 오프셋을 기준으로 이후 데이터를 처리한다. 컨슈머 그룹을 선언하지 않으면 어떤 그룹에도 속하지 않은 컨슈머로 동작한다.
consumer.subscribe(Arrays.asList(TOPIC_NAME));
- 컨슈머에게 토픽을 할당하기 위해 subscribe() 메서드를 사용한다.
- 1개 이상의 토픽 이름을 받을 수 있다.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
- 컨슈머는 poll() 메서드를 통해 ConsumerRecords를 반환한다
- Duration 타입을 넘겨주는 이유는 브로커로부터 데이터를 가져올 때 컨슈머 버퍼에 데이터를 기다리기 위한 타임 아웃 간격을 설정하기 위함이다.
3.2 동기 오프셋 commit
poll()메서드 이후에commitSync()메서드를 호출하여 오프셋을 명시적으로commit할 수 있다.- 동기 방식은 속도는 느리지만 메시지 손실을 거의 발생하지 않는다.
- 메시지 손실이란 실제 토픽에는 메시지가 존재하지만 잘못된 오프세 커밋으로 인한 위치 변경으로 컨슈머가 메시지를 가져오지 못하는 경우를 말한다.
- 메시지 손실을 용납할 수 없는 중요한 처리라면 동기 방식을 권장한다.
동기 오프셋 commit
public class ConsumerWithSyncCommit {
private final static Logger logger = LoggerFactory.getLogger(ConsumerWithSyncCommit.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
}
consumer.commitSync();
}
}
}
consumer.commitSync();
commitSync()는poll()메서드로 받은 가장 마지막record의 오프셋을 기준으로commit한다.- 동기 오프셋 커밋을 사용할 경우
poll()메서드로 받은 모든 레코드의 처리가 끝난 이후commitSync()을 호출해야한다. - 동기 커밋의 경우 브로커로 커밋을 요청한 이후 커밋이 완료되기까지 기다린다.
consumer는 데이터를 더 처리하지 않고 기다리기 때문에 자동 커밋이나 비동기 오프셋 커밋보다 동일 시간당 처리량이 적다.
commitSync()에 파라미터를 넣지 않으면poll()메서드로 반환된 가장 마지막 레코드의 오프셋을 기준으로commit한다.
3.3 비동기 오프셋 commit
- 동기 오프셋 커밋을 사용할 경우 커밋 응답을 기다리는 동안 데이터 처리가 일시적으로 중단되는 단점이있다.
- 이 때 비동기 오프셋 커밋을 사용할 수 있다.
- 비동기 오프셋 커밋은
commitAsync()메서드를 호출해 사용한다.
public class ConsumerWithASyncCommit {
private final static Logger logger = LoggerFactory.getLogger(ConsumerWithASyncCommit.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null)
System.err.println("Commit failed");
else
System.out.println("Commit succeeded");
if (e != null)
logger.error("Commit failed for offsets {}", offsets, e);
}
});
}
}
}
- 동기 커밋과 마찬가지로
poll()메서드로 반환된 가장 마지막 레코드의 오프셋을 기준으로commit한다.
OffsetCommitCallback 인터페이스
public interface OffsetCommitCallback {
void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception);
}
commitAsync()의 응답을 받을 수 있도록 도와주는 콜백 인터페이스이다.onComplete()메서드를 통해 비동기 커밋의 응답을 확인할 수 있다.- 정상적으로 커밋되었다면 exception은
null이다. - 커밋 완료된 오프셋 정보가 offsets에 포함되어있다.