2023 06 20
2023-06-20¶
생산자-소비자 패턴¶
참고: https://hamait.tistory.com/550#recentEntries
- 1. 싱글쓰레드에서 생산자-소비자
- 생산하는 녀석(Producer) 따로, 이를 전달받아 소비하는 녀석(Consumer) 따로
- 호출하면 바로 반응한다
- produce 호출시 바로 id 반환
- Consumer 바로 받아 처리
class Producer { int id = 0; int produce() { return nextId(); } int nextId() { return id = id + 1; } } class Consumer { void consume(int id) { print("ID : " + id); } } class test() { public static void main(String[] args) { Producer p = new Producer(); Consumer c = new Consumer(); result = p.produce(); c.consume(result); } }
- 2. 멀티쓰레드에서 생산자-소비자

- 싱글쓰레드와 달라지는 점은...
- Producer 쓰레드 따로, Consumer 쓰레드 따로. (내부에 루프를 가짐)
- 전달 매개체 (보통 큐)가 생김
- Table 클래스에서 동시 접근을 막는 큐를 구현함 => 근데 멀티 쓰레드 프로그래밍이 꽤나 위험해 => 하이레벨에서 조작하자!
- STM, Actor 등등의 등장
- java.util.concurrent를 사용하여 막자!
public class Main { public static void main(String[] args) { Table table = new Table(100); new ProducerThread(table).start(); new ConsumerThread(table).start(); } } public class ProducerThread extends Thread { private static int id = 0; Table table; public ProducerThread(Table table) { this.table = table; } public void run() { while(true) { Thread.sleep(1000); String packet = "No: " + nextId(); table.put(packet); // 큐에 추가하는 연산 } } private static synchronized int nextId() { return id++; } } public class ConsumerThread extends Thread { private final Table table; public ConsumerThread(Table table) { this.table = table; } public void run() { while(true) { String packet = table.take(); // 큐에서 가져옴 System.out.println("consumer: " + packet); } } } public class Table { private final String[] buffer; private int tail; private int head; private int count; public Table(int count) { this.buffer = new String[count]; this.head = 0; this.tail = 0; this.count = 0; } public synchronized void put(String packet) { while (count >= buffer.length) { wait(); } buffer[tail] = packet; tail = (tail + 1) % buffer.length; count++; notifyAll(); } public synchronized String take() { while (count <= 0) { wait(); } String packet = buffer[head]; head = (head + 1) % buffer.length; count--; notifyAll(); retunr packet; } }
- 3. 멀티쓰레드에서 생산자-소비자 (java.util.concurrent)
- BlockingQueue 라는 자료구조를 사용해 간단히 사용하고, 실수의 여지를 줄여보자!
- 상태 변경에 대한 책임 소재가 명확하지 않다는데...?
- 4. Actor
- '행동자'
- 능동적으로 비동기 메시지를 처리하자!!!
- 살아 숨쉬는 살아있는 쓰레드 하나가 있음
- 비동기적으로 메시지 처리 가능 => 메시지를 담아둘 자신만의 큐 보유
- Actor == 객체 + Loop Thread + Queue
- 5. Reactor
- node.js의 기반패턴
- Actor + EventHandler
- Actor 안에 어떤 switch 문을 두고, 이벤트가 날라오면 알맞은 이벤트로 디스패칭
- 단일 쓰레드로 이루어져 있기에, 이벤트 핸들러 하나가 병목이 된다면 성능 뇌절
- 6. Proactor
- Reactor: 어떤 이벤트 날라옴 -> 이벤트에 해당하는 행동 수행
- Proactor: 먼저 행동 디스패치 -> 행동에 따른 결과가 날라옴
- 7. Actor 패턴으로 생산자-소비자 (Scala)
- 액터를 사용할 때, 특정 객체 및 값의 상태관리는 오직 하나의 액터에서 전담하도록 하자!
- 무엇인가를 수정하고 싶다면, 해당 액터에 메시지를 전달해주세요!
- 액터의 흐름을 막지 마세요! (ex. Thread.sleep)
- 다른 전용 액터를 만들어 메시지를 보내고 받는게 이상적
import scala.actors.Reactor object Test { case class Stop() case class Get(from: Reactor[Any]) case class Put(x: Int) class UnboundedBuffer extends Reactor[Any] { def act(): Unit = { react { case Get(from) => val consumer = from react { case Put(x) => consumer ! x act() } } } } class Producer(buf: UnboundedBuffer) extends Reactor[Any] { def act(): Unit = { var i = 0 while (i < 10) { i += 1 Thread.sleep(1000) buf ! Put(i) } } } class Consumer(buf: UnboundedBuffer) extends Reactor[Any] { def act(): Unit = { Thread.sleep(1000) buf ! Get(this) react { case res => println(res) act() } } } def main(args: Array[String]): Unit = { val parent = new Reactor[Any] { def act(): Unit = { val buffer = new UnboundedBuffer buffer.start() val producer = new Producer(buffer) producer.start() val consumer = new Consumer(buffer) consumer.start() } } parent.start() } }
- 다른 전용 액터를 만들어 메시지를 보내고 받는게 이상적
- 액터를 사용할 때, 특정 객체 및 값의 상태관리는 오직 하나의 액터에서 전담하도록 하자!
- 8. Akka 생산자-소비자 (Scala)
object HelloWorld extends App { val system = ActorSystem("ProConSys") val con = system.actorOf(Props[ConsumerActor]) val pro = system.actorOf(Props(new ProducerActor(con))) pro ! "start" } class ProducerActor(con: ActorRef) extends Actor { def receive = { case "start" => var i = 0 while (i < 10) { con ! i i += 1 Thread.sleep(1000) } } } class ConsumerActor extends Actor { def receive = { case id: Integer => println("ID : " + id) } }
- Pub/Sub & Producer/Consumer 참고: https://velog.io/@eenaa/PubSub-%EA%B3%BC-producerconsumer
- Pub/Sub
- 구독 모델로써...
- 데이터를 발행자(Publisher)가 특정 주제(Topic)에게 전송하고
- 해당 주제(Topic)을 구독하는 구독자(Subscriber)들이 데이터를 수신
- 발행자(Publisher)는 구체적으로 어느 구독자(Subscriber)가 받을지 신경쓰지 않음
- Subscriber가 Publisher의 데이터를 처리할 때 중복으로 처리하지 않도록 중복 처리할 것
- 동작 방식
- 발행된 데이터는 topic을 구독중인 모든 Subscriber에게 복사가 된다
- 각 Subscriber는 자신만의 독립적인 작업을 수행할 수 있으며, 서로 영향 X
- 발행된 데이터는 topic을 구독중인 모든 Subscriber에게 복사가 된다
- 확장성
- 많은 Subscriber를 가질 수 있으며, 각 Subscriber는 독립적으로 작업 수행 가능
- Publisher는 데이터 전송만 해두 되기에, 구독자 수에 영향 받지 않음
- 구독 모델로써...
- Producer/Consumer
- 구동 원리는...
- 생산자(Producer)가 데이터를 생성하고 큐(Queue)에 전송
- 소비자(Consumer)는 큐(Queue)에서 데이터를 가져와 처리
- 소비자(Consumer)는 명시적으로 큐(Queue)에 접근하여 데이터를 가져올 것
- Consumer가 큐에 적재된 데이터를 어디까지 가져갔는지 커밋되어야 동시성 중복처리 막을 수 있음
- 확장성
- 큐의 처리 능력에 따라 확장성이 결정됨
- 큐가 병목될 수 있기에, 큐를 분산해 처리 능력 향상도 가능
- 구동 원리는...