콘텐츠로 이동

2025 05 29

2025-05-29

RabbitMQ Connection/Channel/Consumer

  • Connection
    • RabbitMQ의 물리적인 TCP 연결
      • producer <-> rabbitmq
      • rabbitmq <-> consumer
    • 모든 trasnsport 레이어 커뮤니케이션에 대해서 담당
  • Channel
    • 논리적인 connection. 어플리케이션 레벨에서 존재
    • "하나의 실제 TCP Connection"을 토대로 독립적인 여러 커뮤니케이션 스트림에 대한 멀티플렉싱 제공
      • 독립적인 커넥션으로 열어, 병렬적으로 각기 다른 처리 가능 (message pub/consuming)
      • 병렬적으로 동작해, TCP connection 오버헤드 줄여줌
    • Connection이 있어야 Channel이 존재
    • 각 channel은 독립적으로 동작해, 타 channel operation에 영향을 줄 수 없음
      • 싱글 프로세스에 따른 멀티 쓰레드 동작과 유사
      • connection 닫히면 channel도 다 닫힘
    • Connection:Channel = 1:N
  • Consumer
    • queue에 대한 구독 -> 메시지 receive
    • Consumer는 특정한 Channel에 등록되고, RabbitMQ는 메시지를 해당 Channel을 통해 Consumer로 보냄
    • Consumer는 Channel에 바운드 되어 있음
    • Channel: Consumer = 1:N
      • Channel별 Consumer는 기본 최대 100. 하지만 많으면 좋을게 없음
    • Channel이 삭제되면, Consumer가 같이 사라짐
    • Consumer에 여러 메시지 전송 확인 직후 Channel이 닫히면, ACK 못받은 메시지 requeue됨

[비교 분석]

  • Queue별로 Channel 만들기 (멀티 채널)
    • 구조)
      • Queue A <-> Channel A <-> Consumer A
      • Queue B <-> Channel B <-> Consumer B
    • 장점)
      • Isolation: 특정 채널에 오류 발생하더라도, 해당 큐만 영향 받음
      • 독립적 QoS 설정: prefetch_count 독립적으로 설정 가능
      • 명확한 흐름 관리: 하나의 큐 구독은 하나의 채널
    • 단점)
      • Connection 하나를 추가하여 만드는 약간의 추가 자원
    • channel을 Map으로 관리
      val channels = new ConcurrentHashMap[String, Channel]
      
      def createChannel(conn: Connection, prefetchCount: Int, channelKey: String) = {
          channels.remove(channelKey)
      
          val channel = conn.createChannel
          channel.basicQos(prefetchCount)
          channels.put(channelKey, channel)
      
          channel
      }
      
  • 동일 Channel, Queue별 Consumer 만들기
    • 구조)
      • Queue A <-> Channel <-> Consumer A
      • Queue B <-> Channel <-> Consumer B
    • 장점)
      • 최소한의 자원 사용. 간단한 코드레벨 구현?
    • 단점)
      • Isolation: 채널 하나 뻑나면 두개다 망가짐
      • 어플리케이션에서 한쪽에서 채널 뻑나면... 양쪽다 영향.
    • basicQos 메서드에서 global = false로 두면 각 consumer 별로 prefetchCount를 다르게 가져갈 수 있음
      def getConsumerQueueName: String, durable: Boolean = true, prefetchCount: Int = 1, actorRef: ActorRef): DefaultConsumer = {
          val channel = declareQueue(queueName, durable, prefetchCount)
          val consumer = new CustomConsumer(queueName, channel, actorRef)
          consumers.add(consumer)
          if (consuming) {
              consumer.startConsuming()
          }
          consumer
      }
      
      private def declareQueue(queueName: String, durable: Boolean = true, prefetchCount: Int): Channel = {
          val channel = getChannel
          channel.queueDeclarePassive(queueName)
          channel.basicQos(prefetchCount, false)  // void basicQos(int prefetchCount, boolean global) throws IOException;
          channel
      }
      

RabbitMQ Clustering & Mirroring

참고: https://yonghyn.tistory.com/29

  • Clustering
    • 다수의 RabbitMQ를 하나의 RabbitMQ로 묶어서 사용
    • Cluster 안의 RabbitMQ들은 Queue를 제외한 모든 정보를 공유함
      • Cluster 안의 모든 RabbitMQ는 동일한 Exchange를 가진다
    • Cluster 안에는 Queue가 하나만 존재할 수 있음. Queue의 Rounting Key/Header 정보가 동일한 Queue가 Cluster 안에서 생성될 수 없기 때문
      • 이를 극복하고자 Mirroring 기법이 등장
    • Cluster 안의 모든 RabbitMQ는 Erlang Cookie라는 비밀키를 공유. 해당 키를 통해 상대 RabbitMQ가 Cluster 구성원인지 파악
    • Client(Pub/Sub)는 RabbitMQ 중 하나의 RabbitMQ와 Connection을 맺음
      • 단 연결한 MQ에 문제 발생시, 타 RabbitMQ 연결할 수 있도록 환경 구성 필요
        1. Client <-> Cluster 사이 LB 구성해 Client가 Cluster안의 모든 MQ 접속할 수 있는 환경 구성 필요
        2. Client에서 Cluster MQ IP/Port 모두 가지고 있어 이상 감지 시 갈아끼우기
    • Cluster 구성 MQ는 Disk모드/RAM모드 사용 가능 (기본은 Disk 모드)
      • 클러스터 구성시 최소 하나는 Disk 모드여야 함
    • Cluster 동작 중에 RabbitMQ 추가 가능
      • Peer Discovery Plugin 활용
    • 한계
      • Client가 Cluster의 모든 RabbitMQ 접근 가능해도, Queue는 Cluster 안에서 하나만 존재
      • 해당 Queue를 가진 RabbitMQ에 장애 발생 시 손실 무조건 발생
      • 이를 막고자 Mirroring
  • Mirroring
    • Cluster 안에 Message를 다수 RabbitMQ Queue안에 저장하는 기법
    • Mirroring 구성시 Queue는 Master Queue와 Slave Queue로 구성. 1:N 관계
      • Master Queue 마다 Slave Queue 갯수 설정 가능
      • Client Pub/Sub 상호 작용은 Master Queue. Slave Queue는 단순 Master Queue의 미러링
    • Master - Slave Mirroring은 Sync 방식
      • producer --msg--> master mq --mirroring--> slave mq 이거 다음 producer에게 ACK
      • slave queue 많을 수록 처리량 떨어짐
    • Slave 중간 추가시, Slave 시작된 시점부터 Master 미러링
    • Master Queue가진 RabbitMQ에 장애 발생시, 가장 오래된 Slave Queue가 Master로 승격