콘텐츠로 이동

2025 07 31

2025-07-30

RabbitMQ publish 병렬 처리 문제

  • 기본 세팅
    def publishAsync(sessions: Seq[Session]) = {
        sessions.map { session =>
            Future(publish(session))
        }
    }
    

    - 다음과 같이 대상 세션을 MQ에 publish 하는 로직 - 병렬적으로 많은 세션이 동시에 publish 될 수 있도록 하는 것이 목적. (최대 5000건 까지 sessions로 들어옴)

1. 기본 쓰레드 풀 사용시 처리할 세션이 많을 수록 응답이 왜 늦게오지?

- 원래 publishAsync는 fire-and-forget 방식으로 구현해두어, API 응답은 먼저 와야 함. 백그라운드에서 알아서 MQ publish - 근데, 거의 MQ publish 다 해야 응답이 오는 것 같은 현상이 발견됨 - 문제: import scala.concurrent.ExecutionContext.Implicits.global - 해당 기본 쓰레드풀을 play 응답에서도, 위의 publishAsync에서도 같이쓰는데, 쓰레드를 순간적으로 왕창 publishAsync에서 가져가니까 응답 해줄 쓰레드가 고갈되어 응답이 늦었음 - 해결: publishAsync는 별도의 쓰레드 풀에서 처리하자!

2. 왜이리 MQ에 publish하는데 오래걸리지?

- 이렇게 publish를 병렬로 Future로 감싸 호출했으나, 세션이 많아질수록 비례하여 시간이 늘어남. (1000개에 30초) - 마치 전혀 병렬처리가 되지 않는 것 같은 속도. 쓰레드풀을 별도로 주었는데도, 전혀 효과가 없는 듯함. - 문제: RabbitMQ에서 단일 채널로 publish + 불필요한 중복 네트워크 통신

@Singleton
class RabbitMQClient {
    private var connection: Option[Connection] = None
    private var channel: Option[Channel] = None

    private def getConnection = {
        connection match {
            case Some(conn) => conn
            case None => newConnection
        }
    }

    private def newChannel = {
        val conn = getConnection
        channel = Some(conn.createChannel())
        channel.get
    }

    def getChannel = {
        channel match {
          case Some(ch) if ch.isOpen => ch
          case _ => newChannel
        }
    }

    def declareQueue(queueName: String, prefetchCount: Int = 10): Channel = {
        val channel = getChannel
        channel.queueDeclarePassive(queueName)
        channel.basicQos(prefetchCount)
        channel
    }

    def publish(queueName: String, message: String): Unit = {
        val channel = declareQueue(queueName)
        channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes)
    }
}

- 모든 발행 요청이 단일 Channel을 사용하게 됨
- RabbitMQ Channel은 thread-safe하지 않아서, 여러 쓰레드에서 동시에 Channel을 사용하려고 할 때, 내부적으로 Queueing/RaceCondition 발생 가능
  - 이렇다 보니 사실상 단일 쓰레드 처럼 동작
- 하나의 커넥션에 여러 채널을 가질 수 있으나, 현재는 해당 이점을 전혀 갖고 있지 않음.

- 해결: 채널 풀링 + queueDeclarePassive의 단일 선언

3. 빨라짐

- 채널을 각 쓰레드별로 하나 쓰도록 할당하기. - channel.queueDeclarePassive() 는 큐의 유무를 파악하여 없다면 생성. - 항상 RabbitMQ와 통신하기에 호출마다 Network Round Trip Time 소요 - 참고: https://www.rabbitmq.com/client-libraries/java-api-guide#passive-declaration - 해결 방법

private val queue = configuration.get[String]("rabbitMQ.queue")
private val channels = new ConcurrentHashMap[Long, Channel]

def publish(threadId: Long, message: String): Unit = {
    val channel = findChannel(threadId)
    try {
        channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes)
    } catch {
        case e: Exception =>
            logger.error("Error publishing ", e)
    }
}

private def findChannel(threadId: Long): Channel = {
    Option(channels.get(threadId)) match {
        case Some(channel) =>
            channel

        case None =>
            val connection = rabbitMQClient.getConnection
            val channel = connection.createChannel()
            channel.queueDeclarePassive(queue)
            channels.put(threadId, channel)
            channel
    }
}

- 각 채널별로 최초 생성 시, 단 한 번의 `queueDeclarePassive` 를 호출하자!
- 이후에는 publishing 만

- 테스트 결과 (1000개의 JSON MQ 발행) # chatSessionService.publishExpiredSession(session) 2025-07-30 19:11:03,098 INFO [630ecf60246242b3] application-autoExpiredSession.dispatcher-376 [s.AutoExpiredSessionService] [AutoExpiredSessionService] publishExpiredSessionsAsync - All batches completed in 19520ms # autoExpiredRabbitMQPublisher.publishExpiredSession(session, threadId) 2025-07-30 19:12:23,935 INFO [cd3d481277628d9a] application-autoExpiredSession.dispatcher-377 [s.AutoExpiredSessionService] [AutoExpiredSessionService] publishExpiredSessionsAsync - All batches completed in 107ms

- 멀티 쓰레드 풀로 돌리기 위해 ExecutionContext에 쓰레드풀을 넣어뒀으나, 하나만 쓰더라.
  - `AutoExpiredSessionExecutionContext` 의 설정/쓰레드풀 가용성과 연관
  - basicPublish 작업 자체가 매우 빠를 경우, Future를 사용했더라도, 디스패처의 쓰레드풀에 다른 쓰레드가 있더라도 하나의 쓰레드가 모든 작업을 할당받아 빠르게 처리 가능
  - 다른 스레드들이 작업을 할당받고 채널을 생성하기 전에 이미 모든 발행이 끝
- 메시지당 거의 모든 오버헤드가 RTT 였던 것으로 보임. RoundTripTime이 19ms 라고 산정했을 때 합리적인 수치