2025 11 03
2025-11-03¶
Akka Actor의 재처리 방법¶
참고: https://rockthejvm.com/articles/akka-typed-how-the-pipe-pattern-prevents-anti-patterns
- pipeTo의 기원
- Future의 결과를 왜 수동으로 처리해야할까? 그냥 Future의 결과값을 Actor에게 메시지로 보내주면 안될까?
- 어차피 이거 Actor에 메시지로 보내지면 Thread-Safe 하자나?
Future의 결과를 액터의 메일박스로 배송- pipeTo를 통해 Future가 완료되면, 그 결과가
self ! ...처럼 자신에게 메시지로 도착
- Future의 결과를 왜 수동으로 처리해야할까? 그냥 Future의 결과값을 Actor에게 메시지로 보내주면 안될까?
- 장점
- 캡슐화(상태를 가진 액터를 다루는 방식) 깨지지 않음. pipeTo를 쓰면 thread-safe
- Try[Something]을 액터의 메시지로 변경해야 함
- 참고로! onComplete의 경우, Actor의 메일박스를 통해 처리되는 것이 아닌, 쓰레드 풀에서 쓰레드 하나 줍줍해서 콜백으로 처리되는 것 (메일박스 우회)
- 안티 패턴 예시
- 액터는 한번에 하나의 메시지만 처리하는 철학!
- var nPhoneCalls 같은 thread-non-safe 한 변수처럼 생긴 친구도 lock 없이 안전한 사용 가능
- 하지만, future의 onComplete는 Actor의 Mailbox를 거치지 않음
- 그냥 별도 쓰레드풀 (executionContext)에 있는 임의의 스레드에 의해 실행됨
- 이러면
nPhoneCalls += 1RaceCondition이 발생해버려...trait PhoneCallProtocol case class FindAndCallPhoneNumber(name: String) extends PhoneCallProtocol val quickPhoneCallInitiator: Behavior[PhoneCallProtocol] = Behaviors.setup { (context, message) => var nPhoneCalls = 0 var nFailures = 0 Behaviors.receiveMessage { case FindAndCallPhoneNumber(name) => val futureNumber: Future[Int] = asyncRetrievePhoneNumberFromDb(name) futureNumber.onComplete { case Success(number) => // actually perform the phone call here context.log.info(s"Initiating phone call to $number") nPhoneCalls += 1 // please cringe here case Failure(ex) => context.log.error(s"Phone call to $name failed: $ex") nFailures += 1 // please cringe here } Behaviors.same } }
- 좋은 패턴 예시
val phoneCallInitiatorV2: Behavior[PhoneCallProtocol] = Behaviors.setup { (context, message) => var nPhoneCalls = 0 var nFailures = 0 Behaviors.receiveMessage { case FindAndCallPhoneNumber(name) => val futureNumber: Future[Int] = asyncRetrievePhoneNumberFromDb(name) // pipe makes all the difference // transform the result of the future into a message context.pipeToSelf(futureNumber) { case Success(phoneNumber) => // messages that will be sent to myself InitiatePhoneCall(phoneNumber) case Failure(ex) => LogPhoneCallFailure(ex) } Behaviors.same case InitiatePhoneCall(number) => // perform the phone call context.log.info(s"Starting phone call to $number") nPhoneCalls += 1 // no more cringing Behaviors.same case LogPhoneCallFailure(ex) => context.log.error(s"Calling number failed: $ex") nFailures += 1 // no more cringing Behaviors.same } }
Akka Retry¶
참고: https://doc.akka.io/libraries/akka-core/current/futures.html
akka.pattern.retry() => Future[T]형태의 작업을 받아서 해당 작업이 성공할 때 까지 정해진 횟수만큼, 정해진 시간 두고 비동기적으로 재시도하는 유틸리티 함수- 내부적으로
ActorSystem에서Scheduler가져와서 Thread.sleep을 쓰는게 아니라 차단하지 않고 해결!
- 언제 retry?
- operation이 반환하는 Future가 "Failed" 인 경우!
Future[WSResponse]가 성공적으로 500 에러를 받으면 -> 이건 성공이야Future.failed이라 함은...- 네트워크 레벨의 오류 (ConnectException, TimeoutException, TLSException, DNSException...)
Future.successful이라 함은...- 200, 400, 500 반환해도 다 성공!
- 이런식으로 할수는 있을듯!
import scala.concurrent.Future class MySafeActor extends Actor { implicit val scheduler = context.system.scheduler import context.dispatcher var nCalls = 0 def receive = { case "DoWork" => val operation = () => { ws.url(...).get().flatMap { response => if (response.status >= 200 && response.status < 300) { // 2. 2xx 응답은 "진짜 성공" Future.successful(response) } else { // 3. 500, 400 에러 등은 "실패"로 간주하고 수동으로 Future를 Fail 시킴 Future.failed(new Exception(s"HTTP Error: ${response.status} - ${response.body}")) } } } val retryFuture = retry(operation, 3, 1.second) retryFuture.pipeTo(self) case response: WSResponse => nCalls += 1 log.info(s"최종 성공 (2xx 응답)! 총 호출: $nCalls") case Status.Failure(e) => log.error(s"총 3번의 시도 모두 최종 실패: $e") } }
Future.recoverWith¶
| 값 반환 (T => R, Throwable => T) | 컨테이너 반환 (T => Future[R], Throwable => Future[T]) | |
|---|---|---|
| 성공 처리 | map | flatMap |
| 실패(예외) 복구 | recover | recoverWith |