1. Exchange 개념 소개
- RabbitMQ 공식 레퍼런스에서는 Exchange를 메시징 시스템의 핵심 요소로 설명합니다.
- RabbitMQ의 메시징 모델에서 가장 중요한 개념은 생산자(Producer)가 절대 큐에 직접 메시지를 보내지 않는다는 것입니 다.
- 이는 AMQP(Advanced Message Queuing Protocol) 프로토콜의 핵심 설계 원칙입니다.
- 이 원칙은 메시지 라우팅의 유연성을 극대화합니다.
- 실제로 생산자는 메시지가 어떤 큐에 전달될지 알지 못하는 경우가 대부분입니다.
- 이러한 디커플링(decoupling)은 시스템의 유연성과 확장성을 높여줍니다.
- 생산자는 오직 Exchange라는 라우팅 컴포넌트에만 메시지를 전달합니다.
Exchange는 메시지 라우팅의 핵심 요소로, 마치 우체국의 역할과 유사합니다. 발신자(Producer)는 편지(메시지)를 우체국(Exchange)에 보내고, 우체국은 주소(라우팅 규칙)에 따라 편지를 적절한 우편함(Queue)에 배달합니다.
2. Exchange의 역할과 기능
-
Exchange는 다음과 같은 핵심 기능을 수행합니다:
- 생산자로부터 메시지를 수신합니다.
- 사전 정의된 규칙에 따라 메시지를 적절한 큐로 라우팅합니다.
- 필요에 따라 메시지를 여러 큐에 복제하여 전달할 수 있습니다.
- 라우팅 조건에 맞지 않는 메시지를 폐기할 수 있습니다.
-
Exchange 선언 예제:
// Exchange 이름과 타입을 지정하여 선언
channel.exchangeDeclare("logs", "fanout");
// 추가 옵션을 포함한 Exchange 선언
channel.exchangeDeclare("persistent_logs", "direct", true); // durable=true
- Exchange 속성:
name
: Exchange의 식별자type
: 메시지 라우팅 방식을 결정하는 타입durable
: 브로커 재시작 후에도 Exchange가 유지되는지 여부auto-delete
: 모든 큐와의 바인딩이 제거되면 자동으로 삭제 되는지 여부
3. Exchange 타입
- RabbitMQ는 4가지 기본 Exchange 타입을 제공합니다:
- direct: 라우팅 키를 기반으로 메시지를 정확히 일치하는 큐로 라우팅
- topic: 패턴 매칭을 통해 메시지를 라우팅
- fanout: 연결된 모든 큐에 메시지를 브로드캐스트
- headers: 메시지 헤더 속성을 기반으로 라우팅
3.1 Direct Exchange
- Direct Exchange는 정확한 라우팅 키 매칭을 통해 메시지를 전달합니다.
- 메시지의 라우팅 키가 큐의 바인딩 키와 정확히 일치하는 경우에만 해당 큐로 메시지가 전달됩니다.
- 주로 메시지를 특정 큐로 직접 전달해야 할 때 사용합니다.
Direct Exchange 구현 예제
// Direct Exchange 선언
channel.exchangeDeclare("direct_logs", "direct");
// 메시지 전송 (라우팅 키 "error" 사용)
String message = "This is an error message";
channel.basicPublish("direct_logs", "error", null, message.getBytes());
// 수신 측에서 큐를 생성하고 Exchange에 바인딩 (error 메시지만 수신)
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "error");
- Direct Exchange 사용 사례:
- 로그 레벨별 처리: error, warning, info 등의 로그를 각각 다른 처리 로직으로
- 작업 우선순위 관리: high, medium, low 우선순위 작업을 별도 큐로 분리
- 특정 서비스 대상 메시지: 서비스 ID나 이름을 라우팅 키로 사용
기본 Exchange(이름이 빈 문자열인 Exchange)는 Direct Exchange 타입이며, 모든 큐는 자동으로 해당 큐 이름과 동일한 라우팅 키로 이 Exchange에 바인딩됩니다.
3.2 Topic Exchange
- Topic Exchange는 라우팅 키의 패턴 매칭을 통해 메시지를 전달합니다.
- 라우팅 키는 점(.)으로 구분된 단어 목록 형태로 구성됩니다.
- 바인딩 키에는 두 가지 특수 문자를 사용할 수 있습니다:
*
: 정확히 한 단어를 대체#
: 0개 이상의 단어를 대체
Topic Exchange 구현 예제
// Topic Exchange 선언
channel.exchangeDeclare("topic_logs", "topic");
// 메시지 전송 (라우팅 키는 <facility>.<severity> 형식)
String routingKey = "kern.critical";
String message = "A critical kernel error";
channel.basicPublish("topic_logs", routingKey, null, message.getBytes());
// 수신 측에서 특정 패턴의 메시지만 수신
String queueName = channel.queueDeclare().getQueue();
// 모든 커널 메시지 수신
channel.queueBind(queueName, "topic_logs", "kern.*");
// 모든 중요 메시지 수신
channel.queueBind(queueName, "topic_logs", "*.critical");
- Topic Exchange 사용 사례:
- 지역별, 카테고리별 메시지 처리:
region.category.action
- IoT 기기의 데이터 처리:
device_type.device_id.sensor_type
- 뉴스나 이벤트 알림:
sports.football.europe.champions_league
- 지역별, 카테고리별 메시지 처리:
Topic Exchange는 Direct Exchange의 기능을 포함하며 더 유연합니다. 모든 바인딩 키가 특수 문자 없이 정확한 문자열이라면, Topic Exchange는 Direct Exchange처럼 동작합니다.
3.3 Fanout Exchange
- Fanout Exchange는 라우팅 키를 완전히 무시하고, 바인딩된 모든 큐에 메시지를 브로드캐스트합니다.
- 가장 단순하면서도 강력한 Exchange 타입으로, 모든 소비자에게 동일한 메시지를 전달해야 할 때 사용합니다.
- Pub/Sub(발행/구독) 패턴 구현에 이상적입니다.
Fanout Exchange 구현 예제
// Fanout Exchange 선언
channel.exchangeDeclare("logs", "fanout");
// 메시지 전송 (라우팅 키는 무시됨)
String message = "Broadcast message to all listeners";
channel.basicPublish("logs", "", null, message.getBytes());
// 수신 측에서 임시 큐를 생성하고 Exchange에 바인딩
String queueName = channel.queueDeclare().getQueue();
// 라우팅 키는 무시되므로 빈 문자열 사용
channel.queueBind(queueName, "logs", "");
- Fanout Exchange 사용 사례:
- 실시간 알림: 모든 연결된 클라이언트에게 이벤트 알림
- 스포츠 경기 점수 업데이트: 모든 시청자에게 동일한 점수 정보 전달
- 채팅 애플리케이션: 채팅방의 모든 참가자에게 메시지 브로드캐스트
Fanout Exchange는 라우팅 키를 무시하기 때문에, 메시지 전송 시 라우팅 키로 빈 문자열을 사용하는 것이 관례입니다. 라우팅 키의 값은 중요하지 않습니다.
3.4 Headers Exchange
- Headers Exchange는 라우팅 키 대신 메시지 헤더 속성을 기반으로 라우팅을 수행합니다.
- 메시지의 헤더와 바인딩 시 지정한 인자(arguments)가 일치할 때 메시지가 라우팅됩니다.
- 특별한 인자
x-match
는 다음 두 가지 값을 가질 수 있습니다:all
: 모든 헤더 값이 일치해야 함 (기본값, AND 조건)any
: 하나 이상의 헤더 값이 일치하면 됨 (OR 조건)
Headers Exchange 구현 예제
// Headers Exchange 선언
channel.exchangeDeclare("header_logs", "headers");
// 메시지 전송 (헤더 포함)
String message = "Message with headers";
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
headers.put("type", "report");
headers.put("priority", "high");
AMQP.BasicProperties properties = builder.headers(headers).build();
channel.basicPublish("header_logs", "", properties, message.getBytes());
// 수신 측에서 큐 생성 및 바인딩 (특정 헤더 값과 일치하는 메 시지만 수신)
String queueName = channel.queueDeclare().getQueue();
Map<String, Object> bindingArgs = new HashMap<>();
bindingArgs.put("x-match", "all"); // 모든 조건 일치 필요
bindingArgs.put("format", "pdf");
bindingArgs.put("type", "report");
channel.queueBind(queueName, "header_logs", "", bindingArgs);
- Headers Exchange 사용 사례:
- 복잡한 라우팅 조건 처리: 여러 속성의 조합으로 메시지 필터링
- 다차원 라우팅: 여러 기준을 동시에 적용해야 하는 경우
- 라우팅 키로 표현하기 어려운 복잡한 라우팅 요구사항
Headers Exchange는 메시지에 추가 메타데이터를 포함해야 하므로 성능이 다른 Exchange 타입보다 약간 떨어질 수 있습니다. 단순한 라우팅 요구사항에는 다른 Exchange 타입을 고려하세요.
4. Bindings
- 바인딩(Binding)은 Exchange와 큐를 연결하는 규칙입니다.
- 바인딩은 "이 Exchange의 메시지가 이 큐로 전달되어야 한다"고 Exchange에게 알려주는 역할을 합니다.
- 바인딩 생성 시, Exchange 타입에 따라 적절한 바인딩 키나 인자를 지정해야 합니다.
4.1 기본 바인딩 구문
// 기본 바인딩 구문: queueBind(큐 이름, Exchange 이름, 바인딩 키)
channel.queueBind(queueName, exchangeName, bindingKey);
// 인자가 있는 바인딩 (Headers Exchange 용)
Map<String, Object> args = new HashMap<>();
args.put("x-match", "all");
args.put("format", "pdf");
channel.queueBind(queueName, exchangeName, "", args);
4.2 바인딩 특성
- 하나의 큐는 여러 Exchange에 바인딩될 수 있습니다.
- 하나의 Exchange는 여러 큐에 바인딩될 수 있습니다.
- 동일한 Exchange와 큐 사이에 여러 바인딩을 생성할 수 있습니다(서로 다른 바인딩 키/인자 사용).
- 바인딩은 동적으로 추가하거나 제거할 수 있어 시스템의 유연성을 높입니다.
// 동일한 큐를 여러 바인딩 키로 바인딩
channel.queueBind(queueName, "topic_logs", "kern.*");
channel.queueBind(queueName, "topic_logs", "*.critical");
// 바인딩 제거
channel.queueUnbind(queueName, "topic_logs", "kern.*");
5. 실전 활용 패턴
5.1 로깅 시스템
- 애플리케이션에서 발생하는 다양한 로그를 타입별로 처리하는 시스템을 구축할 수 있습니다.
- Direct Exchange를 사용하여 로그 레벨(error, warning, info)별로 라우팅합니다.
// 생산자 측
channel.exchangeDeclare("logs", "direct");
String severity = "error"; // 또는 "warning", "info"
String message = "This is a log message";
channel.basicPublish("logs", severity, null, message.getBytes());
// 소비자 측 (error 로그만 처리)
channel.exchangeDeclare("logs", "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "error");
// 메시지 처리 로직...
5.2 이벤트 기반 아키텍처
- 마이크로서비스 간 이벤트 전파에 Fanout Exchange를 활용할 수 있습니다.
- 하나의 서비스에서 발생한 이벤트를 모든 관심 있는 서비스에 전달합니다.
// 이벤트 발행 서비스
channel.exchangeDeclare("user_events", "fanout");
String event = "{\"type\":\"user_created\",\"data\":{\"id\":1,\"name\":\"John\"}}";
channel.basicPublish("user_events", "", null, event.getBytes());
// 이벤트 구독 서비스
channel.exchangeDeclare("user_events", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "user_events", "");
// 이벤트 처리 로직...
5.3 작업 분배 시스템
- Topic Exchange를 사용하여 작업의 특성에 따라 다른 워커에게 분배할 수 있습니다.
- 라우팅 키 패턴:
<region>.<service>.<priority>
// 작업 생산자
channel.exchangeDeclare("tasks", "topic");
String routingKey = "asia.payment.high";
String task = "{\"id\":123,\"action\":\"process_payment\"}";
channel.basicPublish("tasks", routingKey, null, task.getBytes());
// 특정 리전의 모든 고우선순위 작업 처리
channel.queueBind(queueName, "tasks", "asia.*.high");
6. 성능과 안정성 고려사항
6.1 메시지 지속성
- 중요한 메시지는 브로커 재시작 시에도 유지되도록 설정할 수 있습니다.
- Exchange, 큐, 메시지 모두 지속성(durable/persistent) 설정이 필요합니다.
// 지속성 Exchange 선언
channel.exchangeDeclare("durable_exchange", "direct", true);
// 지속성 큐 선언
channel.queueDeclare("durable_queue", true, false, false, null);
// 지속성 메시지 발행
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 지속성 메시지 (2)
.build();
channel.basicPublish("durable_exchange", "routing_key", properties, message.getBytes());