콘텐츠로 이동

2025 11 03

2025-11-03

Akka Actor의 재처리 방법

참고: https://rockthejvm.com/articles/akka-typed-how-the-pipe-pattern-prevents-anti-patterns

  • pipeTo의 기원
    • Future의 결과를 왜 수동으로 처리해야할까? 그냥 Future의 결과값을 Actor에게 메시지로 보내주면 안될까?
      • 어차피 이거 Actor에 메시지로 보내지면 Thread-Safe 하자나?
    • Future의 결과를 액터의 메일박스로 배송
    • pipeTo를 통해 Future가 완료되면, 그 결과가 self ! ... 처럼 자신에게 메시지로 도착
  • 장점
    • 캡슐화(상태를 가진 액터를 다루는 방식) 깨지지 않음. pipeTo를 쓰면 thread-safe
    • Try[Something]을 액터의 메시지로 변경해야 함
    • 참고로! onComplete의 경우, Actor의 메일박스를 통해 처리되는 것이 아닌, 쓰레드 풀에서 쓰레드 하나 줍줍해서 콜백으로 처리되는 것 (메일박스 우회)
  • 안티 패턴 예시
    • 액터는 한번에 하나의 메시지만 처리하는 철학!
    • var nPhoneCalls 같은 thread-non-safe 한 변수처럼 생긴 친구도 lock 없이 안전한 사용 가능
    • 하지만, future의 onComplete는 Actor의 Mailbox를 거치지 않음
      • 그냥 별도 쓰레드풀 (executionContext)에 있는 임의의 스레드에 의해 실행됨
      • 이러면 nPhoneCalls += 1 RaceCondition이 발생해버려...
        trait PhoneCallProtocol
        case class FindAndCallPhoneNumber(name: String) extends PhoneCallProtocol
        
        val quickPhoneCallInitiator: Behavior[PhoneCallProtocol] =
          Behaviors.setup { (context, message) =>
            var nPhoneCalls = 0
            var nFailures = 0
        
            Behaviors.receiveMessage {
              case FindAndCallPhoneNumber(name) =>
                val futureNumber: Future[Int] = asyncRetrievePhoneNumberFromDb(name)
                futureNumber.onComplete {
                  case Success(number) =>
                    // actually perform the phone call here
                    context.log.info(s"Initiating phone call to $number")
                    nPhoneCalls += 1 // please cringe here
                  case Failure(ex) =>
                    context.log.error(s"Phone call to $name failed: $ex")
                    nFailures += 1 // please cringe here
                }
                Behaviors.same
            }
          }
        
  • 좋은 패턴 예시
    val phoneCallInitiatorV2: Behavior[PhoneCallProtocol] =
        Behaviors.setup { (context, message) =>
            var nPhoneCalls = 0
            var nFailures = 0
    
            Behaviors.receiveMessage {
                case FindAndCallPhoneNumber(name) =>
                    val futureNumber: Future[Int] = asyncRetrievePhoneNumberFromDb(name)
                    // pipe makes all the difference
                    // transform the result of the future into a message
                    context.pipeToSelf(futureNumber) {
                        case Success(phoneNumber) =>
                            // messages that will be sent to myself
                            InitiatePhoneCall(phoneNumber)
                        case Failure(ex) =>
                            LogPhoneCallFailure(ex)
                    }
                    Behaviors.same
                case InitiatePhoneCall(number) =>
                    // perform the phone call
                    context.log.info(s"Starting phone call to $number")
                    nPhoneCalls += 1 // no more cringing
                    Behaviors.same
                case LogPhoneCallFailure(ex) =>
                    context.log.error(s"Calling number failed: $ex")
                    nFailures += 1 // no more cringing
                    Behaviors.same
            }
        }
    

Akka Retry

참고: https://doc.akka.io/libraries/akka-core/current/futures.html

  • akka.pattern.retry
    • () => Future[T] 형태의 작업을 받아서 해당 작업이 성공할 때 까지 정해진 횟수만큼, 정해진 시간 두고 비동기적으로 재시도하는 유틸리티 함수
    • 내부적으로 ActorSystem에서 Scheduler가져와서 Thread.sleep을 쓰는게 아니라 차단하지 않고 해결!
      val retried: Future[Int] = akka.pattern.retry(
          attempt = () => futureCall(), 
          attempts = 3, 
          delay = 1.second
      )
      
      // 그렇게 하다가...
      retried.onComplete {
          case Success(result) => println(result)
          case Failure(ex) => println(ex)
      }
      
  • 언제 retry?
    • operation이 반환하는 Future가 "Failed" 인 경우!
    • Future[WSResponse]가 성공적으로 500 에러를 받으면 -> 이건 성공이야
    • Future.failed이라 함은...
      • 네트워크 레벨의 오류 (ConnectException, TimeoutException, TLSException, DNSException...)
    • Future.successful이라 함은...
      • 200, 400, 500 반환해도 다 성공!
      • 이런식으로 할수는 있을듯!
        import scala.concurrent.Future
        
        class MySafeActor extends Actor {
            implicit val scheduler = context.system.scheduler
            import context.dispatcher
        
            var nCalls = 0 
        
            def receive = {
                case "DoWork" =>
                    val operation = () => {
                        ws.url(...).get().flatMap { response =>
                            if (response.status >= 200 && response.status < 300) {
                                // 2. 2xx 응답은 "진짜 성공"
                                Future.successful(response)
                            } else {
                                // 3. 500, 400 에러 등은 "실패"로 간주하고 수동으로 Future를 Fail 시킴
                                Future.failed(new Exception(s"HTTP Error: ${response.status} - ${response.body}"))
                            }
                        }
                    }
        
                    val retryFuture = retry(operation, 3, 1.second)
        
                    retryFuture.pipeTo(self)
        
                case response: WSResponse =>
                    nCalls += 1
                    log.info(s"최종 성공 (2xx 응답)! 총 호출: $nCalls")
        
                case Status.Failure(e) =>
                    log.error(s"총 3번의 시도 모두 최종 실패: $e")
            }
        }
        

Future.recoverWith

값 반환 (T => R, Throwable => T) 컨테이너 반환 (T => Future[R], Throwable => Future[T])
성공 처리 map flatMap
실패(예외) 복구 recover recoverWith