2025 07 31
2025-07-30¶
RabbitMQ publish 병렬 처리 문제¶
- 기본 세팅
- 다음과 같이 대상 세션을 MQ에 publish 하는 로직 - 병렬적으로 많은 세션이 동시에 publish 될 수 있도록 하는 것이 목적. (최대 5000건 까지 sessions로 들어옴)
1. 기본 쓰레드 풀 사용시 처리할 세션이 많을 수록 응답이 왜 늦게오지?
- 원래 publishAsync는 fire-and-forget 방식으로 구현해두어, API 응답은 먼저 와야 함. 백그라운드에서 알아서 MQ publish
- 근데, 거의 MQ publish 다 해야 응답이 오는 것 같은 현상이 발견됨
- 문제: import scala.concurrent.ExecutionContext.Implicits.global
- 해당 기본 쓰레드풀을 play 응답에서도, 위의 publishAsync에서도 같이쓰는데, 쓰레드를 순간적으로 왕창 publishAsync에서 가져가니까 응답 해줄 쓰레드가 고갈되어 응답이 늦었음
- 해결: publishAsync는 별도의 쓰레드 풀에서 처리하자!
2. 왜이리 MQ에 publish하는데 오래걸리지?
- 이렇게 publish를 병렬로 Future로 감싸 호출했으나, 세션이 많아질수록 비례하여 시간이 늘어남. (1000개에 30초) - 마치 전혀 병렬처리가 되지 않는 것 같은 속도. 쓰레드풀을 별도로 주었는데도, 전혀 효과가 없는 듯함. - 문제: RabbitMQ에서 단일 채널로 publish + 불필요한 중복 네트워크 통신
@Singleton
class RabbitMQClient {
private var connection: Option[Connection] = None
private var channel: Option[Channel] = None
private def getConnection = {
connection match {
case Some(conn) => conn
case None => newConnection
}
}
private def newChannel = {
val conn = getConnection
channel = Some(conn.createChannel())
channel.get
}
def getChannel = {
channel match {
case Some(ch) if ch.isOpen => ch
case _ => newChannel
}
}
def declareQueue(queueName: String, prefetchCount: Int = 10): Channel = {
val channel = getChannel
channel.queueDeclarePassive(queueName)
channel.basicQos(prefetchCount)
channel
}
def publish(queueName: String, message: String): Unit = {
val channel = declareQueue(queueName)
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes)
}
}
- 모든 발행 요청이 단일 Channel을 사용하게 됨
- RabbitMQ Channel은 thread-safe하지 않아서, 여러 쓰레드에서 동시에 Channel을 사용하려고 할 때, 내부적으로 Queueing/RaceCondition 발생 가능
- 이렇다 보니 사실상 단일 쓰레드 처럼 동작
- 하나의 커넥션에 여러 채널을 가질 수 있으나, 현재는 해당 이점을 전혀 갖고 있지 않음.
- 해결: 채널 풀링 + queueDeclarePassive의 단일 선언
3. 빨라짐
- 채널을 각 쓰레드별로 하나 쓰도록 할당하기. - channel.queueDeclarePassive() 는 큐의 유무를 파악하여 없다면 생성. - 항상 RabbitMQ와 통신하기에 호출마다 Network Round Trip Time 소요 - 참고: https://www.rabbitmq.com/client-libraries/java-api-guide#passive-declaration - 해결 방법
private val queue = configuration.get[String]("rabbitMQ.queue")
private val channels = new ConcurrentHashMap[Long, Channel]
def publish(threadId: Long, message: String): Unit = {
val channel = findChannel(threadId)
try {
channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes)
} catch {
case e: Exception =>
logger.error("Error publishing ", e)
}
}
private def findChannel(threadId: Long): Channel = {
Option(channels.get(threadId)) match {
case Some(channel) =>
channel
case None =>
val connection = rabbitMQClient.getConnection
val channel = connection.createChannel()
channel.queueDeclarePassive(queue)
channels.put(threadId, channel)
channel
}
}
- 각 채널별로 최초 생성 시, 단 한 번의 `queueDeclarePassive` 를 호출하자!
- 이후에는 publishing 만
- 테스트 결과 (1000개의 JSON MQ 발행)
# chatSessionService.publishExpiredSession(session)
2025-07-30 19:11:03,098 INFO [630ecf60246242b3] application-autoExpiredSession.dispatcher-376
[s.AutoExpiredSessionService] [AutoExpiredSessionService] publishExpiredSessionsAsync - All batches completed in 19520ms
# autoExpiredRabbitMQPublisher.publishExpiredSession(session, threadId)
2025-07-30 19:12:23,935 INFO [cd3d481277628d9a] application-autoExpiredSession.dispatcher-377
[s.AutoExpiredSessionService] [AutoExpiredSessionService] publishExpiredSessionsAsync - All batches completed in 107ms
- 멀티 쓰레드 풀로 돌리기 위해 ExecutionContext에 쓰레드풀을 넣어뒀으나, 하나만 쓰더라.
- `AutoExpiredSessionExecutionContext` 의 설정/쓰레드풀 가용성과 연관
- basicPublish 작업 자체가 매우 빠를 경우, Future를 사용했더라도, 디스패처의 쓰레드풀에 다른 쓰레드가 있더라도 하나의 쓰레드가 모든 작업을 할당받아 빠르게 처리 가능
- 다른 스레드들이 작업을 할당받고 채널을 생성하기 전에 이미 모든 발행이 끝
- 메시지당 거의 모든 오버헤드가 RTT 였던 것으로 보임. RoundTripTime이 19ms 라고 산정했을 때 합리적인 수치