2025 05 29
2025-05-29¶
RabbitMQ Connection/Channel/Consumer¶
- Connection
- RabbitMQ의 물리적인 TCP 연결
- producer <-> rabbitmq
- rabbitmq <-> consumer
- 모든 trasnsport 레이어 커뮤니케이션에 대해서 담당
- RabbitMQ의 물리적인 TCP 연결
- Channel
- 논리적인 connection. 어플리케이션 레벨에서 존재
- "하나의 실제 TCP Connection"을 토대로 독립적인 여러 커뮤니케이션 스트림에 대한 멀티플렉싱 제공
- 독립적인 커넥션으로 열어, 병렬적으로 각기 다른 처리 가능 (message pub/consuming)
- 병렬적으로 동작해, TCP connection 오버헤드 줄여줌
- Connection이 있어야 Channel이 존재
- 각 channel은 독립적으로 동작해, 타 channel operation에 영향을 줄 수 없음
- 싱글 프로세스에 따른 멀티 쓰레드 동작과 유사
- connection 닫히면 channel도 다 닫힘
Connection:Channel = 1:N
- Consumer
- queue에 대한 구독 -> 메시지 receive
- Consumer는 특정한 Channel에 등록되고, RabbitMQ는 메시지를 해당 Channel을 통해 Consumer로 보냄
- Consumer는 Channel에 바운드 되어 있음
Channel: Consumer = 1:N- Channel별 Consumer는 기본 최대 100. 하지만 많으면 좋을게 없음
- Channel이 삭제되면, Consumer가 같이 사라짐
- Consumer에 여러 메시지 전송 확인 직후 Channel이 닫히면, ACK 못받은 메시지 requeue됨
[비교 분석]
- Queue별로 Channel 만들기 (멀티 채널)
- 구조)
Queue A <-> Channel A <-> Consumer AQueue B <-> Channel B <-> Consumer B
- 장점)
- Isolation: 특정 채널에 오류 발생하더라도, 해당 큐만 영향 받음
- 독립적 QoS 설정: prefetch_count 독립적으로 설정 가능
- 명확한 흐름 관리: 하나의 큐 구독은 하나의 채널
- 단점)
- Connection 하나를 추가하여 만드는 약간의 추가 자원
- channel을 Map으로 관리
- 구조)
- 동일 Channel, Queue별 Consumer 만들기
- 구조)
Queue A <-> Channel <-> Consumer AQueue B <-> Channel <-> Consumer B
- 장점)
- 최소한의 자원 사용. 간단한 코드레벨 구현?
- 단점)
- Isolation: 채널 하나 뻑나면 두개다 망가짐
- 어플리케이션에서 한쪽에서 채널 뻑나면... 양쪽다 영향.
- basicQos 메서드에서 global = false로 두면 각 consumer 별로 prefetchCount를 다르게 가져갈 수 있음
def getConsumerQueueName: String, durable: Boolean = true, prefetchCount: Int = 1, actorRef: ActorRef): DefaultConsumer = { val channel = declareQueue(queueName, durable, prefetchCount) val consumer = new CustomConsumer(queueName, channel, actorRef) consumers.add(consumer) if (consuming) { consumer.startConsuming() } consumer } private def declareQueue(queueName: String, durable: Boolean = true, prefetchCount: Int): Channel = { val channel = getChannel channel.queueDeclarePassive(queueName) channel.basicQos(prefetchCount, false) // void basicQos(int prefetchCount, boolean global) throws IOException; channel }
- 구조)
RabbitMQ Clustering & Mirroring¶
참고: https://yonghyn.tistory.com/29
- Clustering
- 다수의 RabbitMQ를 하나의 RabbitMQ로 묶어서 사용
- Cluster 안의 RabbitMQ들은 Queue를 제외한 모든 정보를 공유함
- Cluster 안의 모든 RabbitMQ는 동일한 Exchange를 가진다
- Cluster 안에는 Queue가 하나만 존재할 수 있음. Queue의 Rounting Key/Header 정보가 동일한 Queue가 Cluster 안에서 생성될 수 없기 때문
- 이를 극복하고자 Mirroring 기법이 등장
- Cluster 안의 모든 RabbitMQ는 Erlang Cookie라는 비밀키를 공유. 해당 키를 통해 상대 RabbitMQ가 Cluster 구성원인지 파악
- Client(Pub/Sub)는 RabbitMQ 중 하나의 RabbitMQ와 Connection을 맺음
- 단 연결한 MQ에 문제 발생시, 타 RabbitMQ 연결할 수 있도록 환경 구성 필요
- Client <-> Cluster 사이 LB 구성해 Client가 Cluster안의 모든 MQ 접속할 수 있는 환경 구성 필요
- Client에서 Cluster MQ IP/Port 모두 가지고 있어 이상 감지 시 갈아끼우기
- 단 연결한 MQ에 문제 발생시, 타 RabbitMQ 연결할 수 있도록 환경 구성 필요
- Cluster 구성 MQ는 Disk모드/RAM모드 사용 가능 (기본은 Disk 모드)
- 클러스터 구성시 최소 하나는 Disk 모드여야 함
- Cluster 동작 중에 RabbitMQ 추가 가능
- Peer Discovery Plugin 활용
- 한계
- Client가 Cluster의 모든 RabbitMQ 접근 가능해도, Queue는 Cluster 안에서 하나만 존재
- 해당 Queue를 가진 RabbitMQ에 장애 발생 시 손실 무조건 발생
- 이를 막고자 Mirroring
- Mirroring
- Cluster 안에 Message를 다수 RabbitMQ Queue안에 저장하는 기법
- Mirroring 구성시 Queue는 Master Queue와 Slave Queue로 구성. 1:N 관계
- Master Queue 마다 Slave Queue 갯수 설정 가능
- Client Pub/Sub 상호 작용은 Master Queue. Slave Queue는 단순 Master Queue의 미러링
- Master - Slave Mirroring은 Sync 방식
- producer --msg--> master mq --mirroring--> slave mq 이거 다음 producer에게 ACK
- slave queue 많을 수록 처리량 떨어짐
- Slave 중간 추가시, Slave 시작된 시점부터 Master 미러링
- Master Queue가진 RabbitMQ에 장애 발생시, 가장 오래된 Slave Queue가 Master로 승격