1 Producer 개념
producer는 메시지를 생성하여 브로커의 토픽 이름으로 보내는 서버 또는 애플리케이션을 말한다.producer는 데이터를 전송할 때 리더partition을 가지고 있는broker와 직접 통신을 한다.producer는 카프카broker로 데이터를 전송할 때 내부적으로partitioner, 배치 생성 단계를 거친다.producer의 주요 기능은 각각의 메시지를 토픽 파티션에 매핑하고 파티션 리더에 요청을 보내는 것- 키 값을 정해 해당 키를 가진 모든 메세지를 동일한 파티션으로 전송한다.
1.1 Producer의 동작 과정
- 레코드는 토픽, 파티션, 키, 밸류로 구성된다.
- 레코드는 시리얼라이저, 파티셔너, 배치를 거치게된다.

- 시리얼라이저가 레코드를 직렬화한다.
- 파티셔너가 레코드 키의 해시값을 토대로
partition을 지정한다. - 배치 전송을 위해 어큐뮬레이터에 파티션으로 구분된 버퍼에 레코드를 쌓는다
- sender 스레드가 어큘뮬레이터에 쌓인 배치 데이터를 카브카 브로커로 전송한다.
- 전송이 실패하면 재시도한다.
- 지정된 횟수만큼 재시도하며 최종 실패 결과를 보여준다.
- 전송이 성공하면 메타데이터를 리턴한다.
2 Partitioner
- 카프카의 토픽은 병렬 처리가 가능하도록 최소 하나 이상의 파티션으로 구성된다.
- 프로듀서가 카프카로 전송한 메시지는 해당 토픽 내 각 파티션의 로그 세그먼트에 저장된다.
- 따라서 프로듀서는 메시지를 보 낼 때 토픽의 어느 파티션으로 메시지를 보낼지 결정해야 하며 이때 사용하는 것이
Partitioner다 partitioner는record를topic의 어느partition으로 전송할 것인지 결정하는 역할을 한다.- partitioner는 메시지(레코드)의 키를 해시해 파티션을 결정한다.
- 따라서 메시지 키 값이 동일하면 같은 파티션으로 전송된다.
- 토픽의 파티션을 추가하는 경우 메시지의 키와 매핑된 해시 테이블도 변경된다.
- 따라서 동일한 메시지 키를 이용해 메시지를 전송해도 다른 파티션으로 전송될 수 있다.
- 되도록 파티션 수를 변경하지 않을 것을 권장
2.1 라운드 로빈 전략
- 프로듀서의 레코드 키값은 필수가 아니다
- 만약 키값을 지정하지 않는다면 라운드 로빈 알고리즘을 사용해 랜덤한 파티션으로 레코드를 전송한다.
- 파티셔너를 거친 후의 레코드는 배치 처리를 위해 프로듀서의 메모리 버퍼 영역에서 잠시 대기한다.
- 라운드 로빈 전략은 모든 파티션 버퍼에 레코드를 고르게 분산해 버퍼의 최소 레코드 수를 충족시키지 못해 성능이 좋지 못하다
- 물론 특정 시간을 초과하면 즉시 전송하도록 설정할 수 있지만 배치와 압축의 효과를 얻지 못한채 소수의 레코드만 전송해 비효율 적이다.
2.2 스티키 파티셔닝 전략
- 라운드 로빈 전략을 보완하기 위해 카프카 2.4 버전부터 스티키 파티셔닝 전략을 사용한다.
- 스티키 파티셔닝은 하나의 파티션에 레코드 수를 먼저 채워서 카프카로 빠르게 배치 전송하는 전략이다.
- 배치를 위한 레코드 수에 도달할 때까지 다른 파티션으로 보내지않고 동일한 파티션 버퍼에 레코드를 담는다.
3 Batch
- 카프카는 프로듀서의 처리량을 높이기 위해 배치 전송을 권장한다.
- 단건의 메시지를 전송하는 것이 아니라 다량의 메시지를 묶어서 전송해 불필요한 I/O를 줄일 수 있어 매우 효율적이다.
- 프로듀서는
partitioner에 의해 구분된record를 파티션마다 별도의 버퍼에 쌓아놓고 버퍼의 최소 레코드 개수를 충족하거나 설정된 시간이이 지나면 카프카로 버퍼에 담긴 레코드들을 한번에 전송한다.
3.1 Batch 사이즈 조절
- 카프카를 사용하는 목적에 따라 처리량을 높일지 지연 시간을 줄일지 선택해야 한다.
- 처리량을 높이려면
batch.size와linger.ms를 크게 설정해 한번에 많은 레코드를 전송하고 지연 시간을 줄일려면batch.size와linger.ms를 작게 사용해야 한다.
4 메시지 전송 방식
- 일부 서비스는 메시지 중복을 허용하지만 특정 서비스에서 메시지가 중복 처리된다면 치명적인 상황이 발생할 수 있다.
- 메시지 시스템들의 메시지 전송 방식은 아래와 같다.
- 적어도 한 번 전송
- 최대 한 번 정송
- 정확히 한 번 전송
4.1 적어도 한 번 전송(At Least Once)
- 프로듀서가 메시지를 브로커에 전송하면 메시지를 받은 브로커는 잘 받았다는 뜻으로 ACK를 프로듀서에게 응답한다.
- 프로듀서는 메시지를 보내고 브로커에게 ACK를 받지 못하면 브로커가 메시지를 받지 못했다고 판단하고 메시지를 재전송한다.
- 브로커가 메시지를 잘 받아 기록하고 ACK만 전송하지 못했다면 프로듀서는 메시지를 재전송해 메시지가 브로커에 중복저장 될 수 있다.
- 즉 적어도 한 번 전송 방식은 네트워크 회선 장애나 기타 장애 상황에 따라
일부 메시지가 중복될 수 있습니다.- 즉 적어도 한 번 전송은 멱등성이 보장되지 않는 전송 방식입니다.
메시지 손실 가능성이 없어카프카는 기본적으로 적어도 한 번 전송 방식으로 동작합니 다.- 카프카 3.0 버전부터는
exactly once전송을 기본으로 동작합니다.
- 카프카 3.0 버전부터는
4.2 최대 한 번 전송(At Most Once)
- 최대 한번 전송은 방식은 메시지를 전송한 후 ACK를 확인하지 않습니다.
- 프로듀서는 메시지의 중복 가능성을 회피하기 위해 메시지를 재전송하지 않습니다.
- 이는
일부 메시지의 손실을 감안한다는 의미입니다. - 예) 일부 메시지가 손실되더라도 높은 처리량을 필요로 하는 대량 로그 수집이나 IOT 같은 환경에서 사용합니다.
- 최대 한 번 전송은 손실 가능성이 있지만 중복은 없습니다.
4.3 중복 없는 전송
- 이번엔 중복 없는 전송 즉 멱등성을 보장하는 전송에 대해서 알아봅시다.
- 카프카 0.11 버전에서는 프로듀서가 메시지를 중복 없이 브로커로 전송하는 기능이 추가되었습니다.
- 프로듀서는 고유한 PID를 가지고 있습니다.
- PID는 사용자가 지정하는 것이 아닌 프로듀서에 의해 자동 생성됨
- 프로듀서는 메시지를 전송할 때 PID와 메시지의 시퀀스 번호를 메시지 헤더에 포함해 전송합니다.
- 메시지 시퀀스 번호는 메시지 마다 부여되면 0번부터 시작해 순차적으로 증가 한다.
- 브로커는 기록된 시퀀스 번호보다 정확히 하나 큰 시퀀스 번호를 가진 메시지만 저장합니다.
- 여기서 PID와 시퀀스 번호를 멱등키로 사용해 중복 없는 전송을 보장합니다.
중복 없는 전송 과정
- 프로듀서가 브로커에게 메시지 A를 전송한다.
- 메시지 A의 PID는 0 시퀀스 번호는 0
- 브로커는 메시지 A를 저장하고 PID와 시퀀스 번호를 메모리에 기록하고 ACK를 응답한다
- 네트워크 장애로 프로듀서가 ACK를 받지 못해 메시지 A를 다시 전송한다.
- 브로커는 메시지 A의 헤더를 확인 이미 같은 PID와 시퀀스 번호의 메시지가 있다는 것을 확인하여 중복 저장하지 않고 ACK 응답.
4.4 정확히 한 번 전송
5 Producer API
- 프로듀서를 구현하기위해 카프카 클라이언트를 라이브러리로 추가하여 자바 애플리케이션을 만들어보자
5.1 자바 프로듀서 애플리케이션
- 간단한 자바 프로듀서 애플리케이션을 만들어보자
디펜던시 추가
compile 'org.apache.kafka:kafka-clients:2.8.0'
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
SimpleProducer.java 작성
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SimpleProducer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
producer.send(record);
logger.info("{}", record);
producer.flush();
producer.close();
}
}
Properties configs = new Properties();KafkaProducer인스턴스를 생성하기 위한 프로듀서 옵션들을 key, value 값으로 선언한다.
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);- 전송하고자 하는 카프카 클러스터 서버의 host와 IP주소를 지정한다.
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());- 메시키 키를 직렬화하기 위한 직렬화 클래스를 선언한다.
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());- 메시지 값을 직렬화하기 위한 직렬화 클래스를 선언한다.
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);- Properties를
KafkaProducer의 생성 파라미터를 추가하여 인스턴스 생성 KafkaProducer인스턴스는ProducerRecord를 전송할 때 사용된다.
- Properties를
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);- 카프카 브로커로 데이터를 보내기위해 ProducerRecord를 생성한다.
- ProducerRecord의 생성자로 메시지 키, 메시지값, 토픽이름을 전달할 수 있다.
producer.send(record);- 생성한 ProducerRecord를 전송한다.
- 1프로듀서에서 send()는 즉각적인 전송이 아니라 record들을 프로듀서 내부에 가지고 있다가 배치 형태로 묶어서 브로커에 전송한다.
producer.flush();- 프로듀서 내부 버퍼에 저장된 레코드 배치를 브로커로 전송한다.
5.2 브로커 정상 전송 여부 확인하기
KafkaProducer의send()메서드는Future객체를 반환한다.Future객체는 ProducerRecord가 브로커에 정상적으로 적재되었는지에 대한 데이터가 포함되어 있다.- 포함된 데이터 : 적재된 토픽 이름, 파티션 번호, 오프셋
5.2.1 동기방식 전송 여부 확인
RecordMetadata metadata = producer.send(record).get()을 이용하면 프로듀서로 보낸 데이터의 결과를 동기적으로 가져올 수 있다.- RecordMetadata를 통해 적재된 토픽 이름과 파티션 번호, 오프셋을 알 수 있다.
- 동기적으로 전송 결과를 확인하는 것은 성능상 문제가 있다.
public class ProducerWithSyncCallback {
private final static Logger logger = LoggerFactory.getLogger(ProducerWithSyncCallback.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "23");
try {
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally {
producer.flush();
producer.close();
}
}
}
5.2.2 비동기방식 전송 여부 확인
- 프로듀서는 비동기로 결과를 확인할 수 있도록 Callback 인터페이스를 제공한다.
org.apache.kafka.clients.producer.Callback
- 사용자 정의 Callback 클래스를 생성하여 비동기로 전송 결과를 확인할 수 있다.
사용자 정의 Callback 클래스
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerCallback implements Callback {
private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class);
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null)
logger.error(e.getMessage(), e);
else
logger.info(recordMetadata.toString());
}
}
- onCompletion 메서드 는 비동기 결과를 받기위해 사용된다
- 적재시 에러가 발생할 경우 e 객체에 담겨서 메서드가 실행된다.
- 에러가 발생하지 않으면 recordMetadata에 해당 레코드가 적재된 토픽 이름과 파티션 번호, 오프셋을 알 수 있다.
비동기식으로 전송 여부를 확인하는 프로듀서
producer.send(record, new ProducerCallback());- 사용자 정의 Callback 클래스를 넘겨주어 Callback의 onCompletion메서드를 통해 전송 여부를 확인할 수 있다.
public class ProducerWithAsyncCallback {
private final static Logger logger = LoggerFactory.getLogger(ProducerWithAsyncCallback.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "23");
producer.send(record, new ProducerCallback());
producer.flush();
producer.close();
}
}