콘텐츠로 이동

2024 07 16

2024-07-16

Executor, ExecutorService, ExecutorContext

참고: https://hamait.tistory.com/768

  • 개요
    • 자바 Executor, ExecutorService를 활용해 쓰레드 제어가 가능하나, 스칼라에서는 이를 보강한 ExecutorContext가 있음!
    • 자바 Executor
      • Runnable 타입을 내부 쓰레드풀을 이용해서 실행
      • Executor 구현으로는
        • JDK5 : ThreadPoolExecutor
        • JDK7 : ForkJoinPool
    • 자바 ExecutorService
      • Executor의 서브 인터페이스로, 라이프사이클 관리 + Future + Callable 이용해 리턴값 받기 추가됨
  • 스칼라 ExecutorContext

    1. scala.concurrent.ExecutionContext는 자바 Executor와 비슷한 기능을 제공
      • 주로 implicit 매개변수로 사용
      • 2개의 추상 메서드
        • execute
        • reportFailure
    2. ExecutionContext에는 Java Executor/ExecutorService에서 ExecutionContext 객체를 만들기 위한 몇가지 메서드가 추가된 companion 객체 있음

      def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
          impl.ExecutionContextImpl.fromExecutor(e, reporter)
      

    3. ExecutionContext의 companion 객체는 내부적으로 ForkJoinPool 사용하는 global이라는 실행 컨텍스트 포함!

      import scala.concurrent.ExecutionContext.Implicits.global
      

  • Thread -> Java ExecutorService -> Java ForkJoinPool -> Scala ExecutorContext

    1. 기본 쓰레드 생성

      val hello = new Thread(new Runnable {
          def run(): Unit = {
              println("Hello, World!")
          }
      })
      
      hello.start()
      

    2. Java ExecutorService 사용

      import java.net.{Socket, ServerSocket}
      import java.util.concurrent.{ExecutorService, Executors}
      import java.util.Date
      
      class NetworkService(port: Int, poolSize: Int) extends Runnable {
          val serverSocket = new ServerSocket(port)
          val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)
      
          def run(): Unit = {
              try {
                  while (true) {
                      val socket = serverSocket.accept()
                      pool.execute(new Handler(socket))
                  }
              } finally {
                  pool.shutdown()
              }
          }
      }
      
      class Handler(socket: Socket) extends Runnable {
          def message = (Thread.currentThread.getName() + "\n").getBytes
      
          def run(): Unit = {
              socket.getOutputStream.write(message)
              socket.getOutputStream.close()
          }
      }
      
      (new NetworkService(2020, 2)).run
      

    3. Java ForkJoinPool

      import scala.concurrent._
      
      val executor = new java.util.concurrent.ForkJoinPool
      executor.execute(new Runnable {
          def run() = println("hello fjp!")
      })
      

    4. Scala ExecutorContext.global

      import scala.concurrent._
      
      val ec = ExecutionContext.global
      ec.execute(new Runnable {
          def run() = println("hello ec!")
      })
      

    5. ExecutorContext 생성

      import scala.concurrent._
      
      val ec = ExecutionContext.fromExecutorService(new forkjoin.ForkJoinPool)
      ec.execute(new Runnable {
          def run() = println("hello ec!")
      })
      

Future - Scala/Java

Java Future

참고: https://kangmoo.github.io/posts/Java-Future/

  • Java에서 Future는 비동기 계산의 아직 계산되지 않은 결과를 표현하는 인터페이스
  • 메서드
    • get(): 연산의 결과 반환. 완료되지 않았다면 blocking 되어 대기 후 반환. Future의 제네릭 타입 객체 반환
    • get(long timeout, TimeUnit unit): 지정한 시간 동안 결과 대기, 이후 TimeoutException 발생
    • isDone(): 연산이 완료되었는지 확인
    • cancel(boolean mayInterruptIfRunning): 연산 취소
    • isCancelled(): 연산이 취소되었는지 확인
  • 예시
    ExecutorService executor = Executors.newFixedThreadPool(10);
    
    Future<Integer> future = executor.submit(() -> {
        return 123;
    });
    
    executor.shutdown();
    
  • 장점
    • 비동기 작업 결과 쉽게 관리 가능. 결과 필요할 때 가져옴. isDone()으로 작업 완료 확인 가능
  • 단점
    • Future 작업 완료되었는지 알려주는 isDone()이 polling

Java CompletableFuture

참고: https://mangkyu.tistory.com/263

  • 기존 Future - 블로킹, Future 조합 불가능, 예외처리 불가능, 타임아웃 거는거만 가능
  • CompletableFuture
    • 외부에서 작업을 완료시킬 수 있을 뿐만 아니라, 콜백 등록 및 Future 조합 가능
  • 기본적으로 Java7에 추가된 ForkJoinPool의 commonPool() 사용해 작업 실행 쓰레드를 쓰레드풀로 부터 얻어 실행
    • 원하는 쓰레드 풀을 쓸라면 ExecutorService를 파라미터로 넘김
  • 제공 함수들 (예시는 블로그에서)
    • 작업 콜백
      • thenApply
      • thenAccept
      • thenRun
    • 작업 조합
      • thenCompose
      • thenCombine
      • allOf
      • anyOf
    • 예외 처리
      • exceptionally
      • handle
      • handleAsync

Scala Future

참고: https://hamait.tistory.com/763 참고: https://www.baeldung.com/scala/futures

  • Future 값
    • trait Future[T]
    • 실제 T 타입의 객체를 리턴받기 원함
  • Future 계산
    • def apply[T](b :=> T)(implicit e: ExecutionContext): Future[T]
    • 실제 계산을 수행하는 함수를 매개변수로 넣어줌
  • Future란
    • 퓨쳐: 있을지 없을지 모르는 비동기 계산 결과
    • ExecutionContext가 있어야 퓨처가 동작 (Executor/ExecutorService로 부터 구현가능)
    • global 이라는 스칼라 빌트인 ExecutionContext가 있음 (ForkJoinPool 사용)
      • import scala.concurrent.ExecutionContext.Implicits.global
    • 함수형 합성이 가능함. map, flatMap, Future.sequence(), for-comprehension, recover, andThen 등등,,,

쓰레드풀과 ForkJoinPool

참고: https://hamait.tistory.com/612

쓰레드풀

  • Java5 에서는 쓰레드풀 만들어주기 시작 (미리 만들어서 쓰레드 큐에 넣어두자~) 1. newScheduledThreadPool(int corePoolSize) - 1분에 한번씩 임무를 수행시키기 위함 2. newFixedThreadPool(int nThreads) - 풀장에 고정적으로 쓰레드를 몇개 둘것인가? 3. newCachedThreadPool() - 유기적으로 쓰레드의 숫자가 증감

ForkJoinPool

  • Java7에 추가된 ForkJoinPool (그냥 쓰레드풀 만드는 방식이여)
  • 일을 작은 업무로 나누어 배분 -> 일을 한 후에 일을 취함
  • 놀지 않기 위해 각각의 쓰레드가 큐를 가지고 있고 서로의 업무를 훔쳐오기도 함
    • 멍청하게 노는 쓰레드 방지