1. Exchange 개념 소개
- RabbitMQ 공식 레퍼런스에서는 Exchange를 메시징 시스템의 핵심 요소로 설명합니다.
- RabbitMQ의 메시징 모델에서 가장 중요한 개념은 생산자(Producer)가 절대 큐에 직접 메시지를 보내지 않는다는 것입니다.
- 이는 AMQP(Advanced Message Queuing Protocol) 프로토콜의 핵심 설계 원칙입니다.
- 이 원칙은 메시지 라우팅의 유연성을 극대화합니다.
- 실제로 생산자는 메시지가 어떤 큐에 전달될지 알지 못하는 경우가 대부분입니다.
- 이러한 디커플링(decoupling)은 시스템의 유연성과 확장성을 높여줍니다.
- 생산자는 오직 Exchange라는 라우팅 컴포넌트에만 메시지를 전달합니다.
Exchange는 메시지 라우팅의 핵심 요소로, 마치 우체국의 역할과 유사합니다. 발신자(Producer)는 편지(메시지)를 우체국(Exchange)에 보내고, 우체국은 주소(라우 팅 규칙)에 따라 편지를 적절한 우편함(Queue)에 배달합니다.
2. Exchange의 역할과 기능
-
Exchange는 다음과 같은 핵심 기능을 수행합니다:
- 생산자로부터 메시지를 수신합니다.
- 사전 정의된 규칙에 따라 메시지를 적절한 큐로 라우팅합니다.
- 필요에 따라 메시지를 여러 큐에 복제하여 전달할 수 있습니다.
- 라우팅 조건에 맞지 않는 메시지를 폐기할 수 있습니다.
-
Exchange 선언 예제:
// Exchange 이름과 타입을 지정하여 선언
channel.exchangeDeclare("logs", "fanout");
// 추가 옵션을 포함한 Exchange 선언
channel.exchangeDeclare("persistent_logs", "direct", true); // durable=true
- Exchange 속성:
name: Exchange의 식별자type: 메시지 라우팅 방식을 결정하는 타입durable: 브로커 재시작 후에도 Exchange가 유지되는지 여부auto-delete: 모든 큐와의 바인딩이 제거되면 자동으로 삭제되는지 여부
3. Exchange 타입
- RabbitMQ는 4가지 기본 Exchange 타입을 제공합니다:
- direct: 라우팅 키를 기반으로 메시지를 정확히 일치하는 큐로 라우팅
- topic: 패턴 매칭을 통해 메시지를 라우팅
- fanout: 연결된 모든 큐에 메시지를 브로드캐스트
- headers: 메시지 헤더 속성을 기반으로 라우팅
3.1 Direct Exchange
- Direct Exchange는 정확한 라우팅 키 매칭을 통해 메시지를 전달합니다.
- 메시지의 라우팅 키가 큐의 바인딩 키와 정확히 일치하는 경우에만 해당 큐로 메 시지가 전달됩니다.
- 주로 메시지를 특정 큐로 직접 전달해야 할 때 사용합니다.
Direct Exchange 구현 예제
// Direct Exchange 선언
channel.exchangeDeclare("direct_logs", "direct");
// 메시지 전송 (라우팅 키 "error" 사용)
String message = "This is an error message";
channel.basicPublish("direct_logs", "error", null, message.getBytes());
// 수신 측에서 큐를 생성하고 Exchange에 바인딩 (error 메시지만 수신)
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "error");
- Direct Exchange 사용 사례:
- 로그 레벨별 처리: error, warning, info 등의 로그를 각각 다른 처리 로직으로
- 작업 우선순위 관리: high, medium, low 우선순위 작업을 별도 큐로 분리
- 특정 서비스 대상 메시지: 서비스 ID나 이름을 라우팅 키로 사용
기본 Exchange(이름이 빈 문자열인 Exchange)는 Direct Exchange 타입이며, 모든 큐는 자동으로 해당 큐 이름과 동일한 라우팅 키로 이 Exchange에 바인딩됩니다.
3.2 Topic Exchange
- Topic Exchange는 라우팅 키의 패턴 매칭을 통해 메시지를 전달합니다.
- 라우팅 키는 점(.)으로 구분된 단어 목록 형태로 구성됩니다.
- 바인딩 키에는 두 가지 특수 문자를 사용할 수 있습니다:
*: 정확히 한 단어를 대체#: 0개 이상의 단어를 대체
Topic Exchange 구현 예제
// Topic Exchange 선언
channel.exchangeDeclare("topic_logs", "topic");
// 메시지 전송 (라우팅 키는 <facility>.<severity> 형식)
String routingKey = "kern.critical";
String message = "A critical kernel error";
channel.basicPublish("topic_logs", routingKey, null, message.getBytes());
// 수신 측에서 특정 패턴의 메시지만 수신
String queueName = channel.queueDeclare().getQueue();
// 모든 커널 메시지 수신
channel.queueBind(queueName, "topic_logs", "kern.*");
// 모든 중요 메시지 수신
channel.queueBind(queueName, "topic_logs", "*.critical");
- Topic Exchange 사용 사례:
- 지역별, 카테고리별 메시지 처리:
region.category.action - IoT 기기의 데이터 처리:
device_type.device_id.sensor_type - 뉴스나 이벤트 알림:
sports.football.europe.champions_league
- 지역별, 카테고리별 메시지 처리:
Topic Exchange는 Direct Exchange의 기능을 포함하며 더 유연합니다. 모든 바인딩 키가 특수 문자 없이 정확한 문자열이라면, Topic Exchange는 Direct Exchange처럼 동작합니다.
3.3 Fanout Exchange
- Fanout Exchange는 라우팅 키를 완전히 무시하고, 바인딩된 모든 큐에 메시지를 브로드캐스트합니다.
- 가장 단순하면서도 강력한 Exchange 타입으로, 모든 소비자에게 동일한 메시지를 전달해야 할 때 사용합니다.
- Pub/Sub(발행/구독) 패턴 구현에 이상적입니다.
Fanout Exchange 구현 예제
// Fanout Exchange 선언
channel.exchangeDeclare("logs", "fanout");
// 메시지 전송 (라우팅 키는 무시됨)
String message = "Broadcast message to all listeners";
channel.basicPublish("logs", "", null, message.getBytes());
// 수신 측에서 임시 큐를 생성하고 Exchange에 바인딩
String queueName = channel.queueDeclare().getQueue();
// 라우팅 키는 무시되므로 빈 문자열 사용
channel.queueBind(queueName, "logs", "");
- Fanout Exchange 사용 사례:
- 실시간 알림: 모든 연결된 클라이언트에게 이벤트 알림
- 스포츠 경기 점수 업데이트: 모든 시청자에게 동일한 점수 정보 전달
- 채팅 애플리케이션: 채팅방의 모든 참가자에게 메시지 브로드캐스트
Fanout Exchange는 라우팅 키를 무시하기 때문에, 메시지 전송 시 라우팅 키로 빈 문자열을 사용하는 것이 관례입니다. 라우팅 키의 값은 중요하지 않습니다.
3.4 Headers Exchange
- Headers Exchange는 라우팅 키 대신 메시지 헤더 속성을 기반으로 라우팅을 수행합니다.
- 메시지의 헤더와 바인딩 시 지정한 인자(arguments)가 일치할 때 메시지가 라우팅됩니다.
- 특별한 인자
x-match는 다음 두 가지 값을 가질 수 있습니다:all: 모든 헤더 값이 일치해야 함 (기본값, AND 조건)any: 하나 이상의 헤더 값이 일치하면 됨 (OR 조건)
Headers Exchange 구현 예제
// Headers Exchange 선언
channel.exchangeDeclare("header_logs", "headers");
// 메시지 전송 (헤더 포함)
String message = "Message with headers";
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
headers.put("type", "report");
headers.put("priority", "high");
AMQP.BasicProperties properties = builder.headers(headers).build();
channel.basicPublish("header_logs", "", properties, message.getBytes());
// 수신 측에서 큐 생성 및 바인딩 (특정 헤더 값과 일치하는 메시지만 수신)
String queueName = channel.queueDeclare().getQueue();
Map<String, Object> bindingArgs = new HashMap<>();
bindingArgs.put("x-match", "all"); // 모든 조건 일치 필요
bindingArgs.put("format", "pdf");
bindingArgs.put("type", "report");
channel.queueBind(queueName, "header_logs", "", bindingArgs);
- Headers Exchange 사용 사례:
- 복잡한 라우팅 조건 처리: 여러 속성의 조합으로 메시지 필터링
- 다차원 라우팅: 여러 기준을 동시에 적용해야 하는 경우
- 라우팅 키로 표현하기 어려운 복잡한 라우팅 요구사항
Headers Exchange는 메시지에 추가 메타데이터를 포함해야 하므로 성능이 다른 Exchange 타입보다 약간 떨어질 수 있습니다. 단순한 라우팅 요구사항에는 다른 Exchange 타입을 고려하세요.
4. Bindings
- 바인딩(Binding)은 Exchange와 큐를 연결하는 규칙입니다.
- 바인딩은 "이 Exchange의 메시지가 이 큐로 전달되어야 한다"고 Exchange에게 알려주는 역할을 합니다.
- 바인딩 생성 시, Exchange 타입에 따라 적절한 바인딩 키나 인자를 지정해야 합니다.
4.1 기본 바인딩 구문
// 기본 바인딩 구문: queueBind(큐 이름, Exchange 이름, 바인딩 키)
channel.queueBind(queueName, exchangeName, bindingKey);
// 인자가 있는 바인딩 (Headers Exchange 용)
Map<String, Object> args = new HashMap<>();
args.put("x-match", "all");
args.put("format", "pdf");
channel.queueBind(queueName, exchangeName, "", args);
4.2 바인딩 특성
- 하나의 큐는 여러 Exchange에 바인딩될 수 있습니다.
- 하나의 Exchange는 여러 큐에 바인딩될 수 있습니다.
- 동일한 Exchange와 큐 사이에 여러 바인딩을 생성할 수 있습니다(서로 다른 바인딩 키/인자 사용).
- 바인딩은 동적으로 추가하거나 제거할 수 있어 시스템의 유연성을 높입니다.
// 동일한 큐를 여러 바인딩 키로 바인딩
channel.queueBind(queueName, "topic_logs", "kern.*");
channel.queueBind(queueName, "topic_logs", "*.critical");
// 바인딩 제거
channel.queueUnbind(queueName, "topic_logs", "kern.*");
5. 실전 활용 패턴
5.1 로깅 시스템
- 애플리케이션에서 발생하는 다양한 로그를 타입별로 처리하는 시스템을 구축할 수 있습니다.
- Direct Exchange를 사용하여 로그 레벨(error, warning, info)별로 라우팅합니다.
// 생산자 측
channel.exchangeDeclare("logs", "direct");
String severity = "error"; // 또는 "warning", "info"
String message = "This is a log message";
channel.basicPublish("logs", severity, null, message.getBytes());
// 소비자 측 (error 로그만 처리)
channel.exchangeDeclare("logs", "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "error");
// 메시지 처리 로직...
5.2 이벤트 기반 아키텍처
- 마이크로서비스 간 이벤트 전파에 Fanout Exchange를 활용할 수 있습니다.
- 하나의 서비스에서 발생한 이벤트를 모든 관심 있는 서비스에 전달합니다.
// 이벤트 발행 서비스
channel.exchangeDeclare("user_events", "fanout");
String event = "{\"type\":\"user_created\",\"data\":{\"id\":1,\"name\":\"John\"}}";
channel.basicPublish("user_events", "", null, event.getBytes());
// 이벤트 구독 서비스
channel.exchangeDeclare("user_events", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "user_events", "");
// 이벤트 처리 로직...
5.3 작업 분배 시스템
- Topic Exchange를 사용하여 작업의 특성에 따라 다른 워커에게 분배할 수 있습니다.
- 라우팅 키 패턴:
<region>.<service>.<priority>
// 작업 생산자
channel.exchangeDeclare("tasks", "topic");
String routingKey = "asia.payment.high";
String task = "{\"id\":123,\"action\":\"process_payment\"}";
channel.basicPublish("tasks", routingKey, null, task.getBytes());
// 특정 리전의 모든 고우선순위 작업 처리
channel.queueBind(queueName, "tasks", "asia.*.high");