콘텐츠로 이동

2023 06 20

2023-06-20

생산자-소비자 패턴

참고: https://hamait.tistory.com/550#recentEntries

  • 1. 싱글쓰레드에서 생산자-소비자
    • 생산하는 녀석(Producer) 따로, 이를 전달받아 소비하는 녀석(Consumer) 따로
    • 호출하면 바로 반응한다
      • produce 호출시 바로 id 반환
      • Consumer 바로 받아 처리
        class Producer {
            int id = 0;
        
            int produce() {
                return nextId();
            }
        
            int nextId() {
                return id = id + 1;
            }
        }
        
        class Consumer {
            void consume(int id) {
                print("ID : " + id);
            }
        }
        
        class test() {
            public static void main(String[] args) {
                Producer p = new Producer();
                Consumer c = new Consumer();
                result = p.produce();
                c.consume(result);
            }
        }
        
  • 2. 멀티쓰레드에서 생산자-소비자
    • 싱글쓰레드와 달라지는 점은...
      1. Producer 쓰레드 따로, Consumer 쓰레드 따로. (내부에 루프를 가짐)
      2. 전달 매개체 (보통 큐)가 생김
    • Table 클래스에서 동시 접근을 막는 큐를 구현함 => 근데 멀티 쓰레드 프로그래밍이 꽤나 위험해 => 하이레벨에서 조작하자!
      • STM, Actor 등등의 등장
      • java.util.concurrent를 사용하여 막자!
        public class Main {
          public static void main(String[] args) {
            Table table = new Table(100);
            new ProducerThread(table).start();
            new ConsumerThread(table).start();
          }
        }
        
        public class ProducerThread extends Thread {
            private static int id = 0;
            Table table;
        
            public ProducerThread(Table table) {
                this.table = table;
            }
        
            public void run() {
                while(true) {
                    Thread.sleep(1000);
                    String packet = "No: " + nextId();
                    table.put(packet); // 큐에 추가하는 연산
                }
            }
        
            private static synchronized int nextId() {
                return id++;
            }
        }
        
        public class ConsumerThread extends Thread {
            private final Table table;
        
            public ConsumerThread(Table table) {
                this.table = table;
            }
        
            public void run() {
                while(true) {
                    String packet = table.take(); // 큐에서 가져옴
                    System.out.println("consumer: " + packet);
                }
            }
        }
        
        public class Table {
            private final String[] buffer;
            private int tail;
            private int head;
            private int count;
        
            public Table(int count) {
                this.buffer = new String[count];
                this.head = 0;
                this.tail = 0;
                this.count = 0;
            }
        
            public synchronized void put(String packet) {
                while (count >= buffer.length) {
                    wait();
                }
                buffer[tail] = packet;
                tail = (tail + 1) % buffer.length;
                count++;
                notifyAll();
            }
        
            public synchronized String take() {
                while (count <= 0) {
                    wait();
                }
                String packet = buffer[head];
                head = (head + 1) % buffer.length;
                count--;
                notifyAll();
                retunr packet;
            }
        }
        
  • 3. 멀티쓰레드에서 생산자-소비자 (java.util.concurrent)
    • BlockingQueue 라는 자료구조를 사용해 간단히 사용하고, 실수의 여지를 줄여보자!
    • 상태 변경에 대한 책임 소재가 명확하지 않다는데...?
      public class Table {
          private final BlockingQueue<String> buffer;
      
          public Table(int count) {
              this.buffer = new ArrayBlockingQueue<String>(count);
          }
      
          public void put(String packet) {
              Thread.sleep(1000);
              buffer.put(packet);
          }
      
          public String take() {
              return buffer.take();
          }
      }
      
  • 4. Actor
    • '행동자'
    • 능동적으로 비동기 메시지를 처리하자!!!
      • 살아 숨쉬는 살아있는 쓰레드 하나가 있음
    • 비동기적으로 메시지 처리 가능 => 메시지를 담아둘 자신만의 큐 보유
    • Actor == 객체 + Loop Thread + Queue
  • 5. Reactor
    • node.js의 기반패턴
    • Actor + EventHandler
      • Actor 안에 어떤 switch 문을 두고, 이벤트가 날라오면 알맞은 이벤트로 디스패칭
      • 단일 쓰레드로 이루어져 있기에, 이벤트 핸들러 하나가 병목이 된다면 성능 뇌절
  • 6. Proactor
    • Reactor: 어떤 이벤트 날라옴 -> 이벤트에 해당하는 행동 수행
    • Proactor: 먼저 행동 디스패치 -> 행동에 따른 결과가 날라옴
  • 7. Actor 패턴으로 생산자-소비자 (Scala)
    • 액터를 사용할 때, 특정 객체 및 값의 상태관리는 오직 하나의 액터에서 전담하도록 하자!
      • 무엇인가를 수정하고 싶다면, 해당 액터에 메시지를 전달해주세요!
    • 액터의 흐름을 막지 마세요! (ex. Thread.sleep)
      • 다른 전용 액터를 만들어 메시지를 보내고 받는게 이상적
        import scala.actors.Reactor
        
        object Test {
            case class Stop()
        
            case class Get(from: Reactor[Any])
        
            case class Put(x: Int)
        
            class UnboundedBuffer extends Reactor[Any] {
                def act(): Unit = {
                    react {
                        case Get(from) =>
                            val consumer = from
                            react {
                                case Put(x) =>
                                    consumer ! x
                                    act()
                            }
                    }
                }
            }
        
            class Producer(buf: UnboundedBuffer) extends Reactor[Any] {
                def act(): Unit = {
                    var i = 0
                    while (i < 10) {
                        i += 1
                        Thread.sleep(1000)
                        buf ! Put(i)
                    }
                }
            }
        
            class Consumer(buf: UnboundedBuffer) extends Reactor[Any] {
                def act(): Unit = {
                    Thread.sleep(1000)
                    buf ! Get(this)
                    react {
                        case res =>
                            println(res)
                            act()
                    }
                }
            }
        
            def main(args: Array[String]): Unit = {
                val parent = new Reactor[Any] {
                    def act(): Unit = {
                        val buffer = new UnboundedBuffer
                        buffer.start()
                        val producer = new Producer(buffer)
                        producer.start()
                        val consumer = new Consumer(buffer)
                        consumer.start()
                    }
                }
        
                parent.start()
            }
        }
        
  • 8. Akka 생산자-소비자 (Scala)
    object HelloWorld extends App {
        val system = ActorSystem("ProConSys")
        val con = system.actorOf(Props[ConsumerActor])
        val pro = system.actorOf(Props(new ProducerActor(con)))
        pro ! "start"
    }
    
    class ProducerActor(con: ActorRef) extends Actor {
        def receive = {
            case "start" =>
                var i = 0
                while (i < 10) {
                    con ! i
                    i += 1
                    Thread.sleep(1000)
                }
        }
    }
    
    class ConsumerActor extends Actor {
        def receive = {
            case id: Integer => println("ID : " + id)
        }
    }
    
  • Pub/Sub
    • 구독 모델로써...
      1. 데이터를 발행자(Publisher)가 특정 주제(Topic)에게 전송하고
      2. 해당 주제(Topic)을 구독하는 구독자(Subscriber)들이 데이터를 수신
      3. 발행자(Publisher)는 구체적으로 어느 구독자(Subscriber)가 받을지 신경쓰지 않음
    • Subscriber가 Publisher의 데이터를 처리할 때 중복으로 처리하지 않도록 중복 처리할 것
    • 동작 방식
      • 발행된 데이터는 topic을 구독중인 모든 Subscriber에게 복사가 된다
        • 각 Subscriber는 자신만의 독립적인 작업을 수행할 수 있으며, 서로 영향 X
    • 확장성
      • 많은 Subscriber를 가질 수 있으며, 각 Subscriber는 독립적으로 작업 수행 가능
      • Publisher는 데이터 전송만 해두 되기에, 구독자 수에 영향 받지 않음
  • Producer/Consumer
    • 구동 원리는...
      1. 생산자(Producer)가 데이터를 생성하고 큐(Queue)에 전송
      2. 소비자(Consumer)는 큐(Queue)에서 데이터를 가져와 처리
      3. 소비자(Consumer)는 명시적으로 큐(Queue)에 접근하여 데이터를 가져올 것
    • Consumer가 큐에 적재된 데이터를 어디까지 가져갔는지 커밋되어야 동시성 중복처리 막을 수 있음
    • 확장성
      • 큐의 처리 능력에 따라 확장성이 결정됨
      • 큐가 병목될 수 있기에, 큐를 분산해 처리 능력 향상도 가능