Hilos del productor / consumidor usando una cola

Me gustaría crear algún tipo de aplicación de subprocesamiento Producer/Consumer . Pero no estoy seguro de cuál es la mejor manera de implementar una cola entre los dos.

Así que tengo un par de ideas con dos (las cuales podrían estar completamente equivocadas). Me gustaría saber cuál sería mejor y si ambos apestan, ¿cuál sería la mejor forma de implementar la cola? Es principalmente mi implementación de la cola en estos ejemplos lo que me preocupa. Extiendo una clase Queue que es una clase interna y es segura para subprocesos. A continuación hay dos ejemplos con 4 clases cada uno.

Clase principal-

 public class SomeApp { private Consumer consumer; private Producer producer; public static void main (String args[]) { consumer = new Consumer(); producer = new Producer(); } } 

Clase del consumidor

 public class Consumer implements Runnable { public Consumer() { Thread consumer = new Thread(this); consumer.start(); } public void run() { while(true) { //get an object off the queue Object object = QueueHandler.dequeue(); //do some stuff with the object } } } 

Clase de productor

 public class Producer implements Runnable { public Producer() { Thread producer = new Thread(this); producer.start(); } public void run() { while(true) { //add to the queue some sort of unique object QueueHandler.enqueue(new Object()); } } } 

Clase de cola

 public class QueueHandler { //This Queue class is a thread safe (written in house) class public static Queue readQ = new Queue(100); public static void enqueue(Object object) { //do some stuff readQ.add(object); } public static Object dequeue() { //do some stuff return readQ.get(); } } 

O

Clase principal-

 public class SomeApp { Queue readQ; private Consumer consumer; private Producer producer; public static void main (String args[]) { readQ = new Queue(100); consumer = new Consumer(readQ); producer = new Producer(readQ); } } 

Clase del consumidor

 public class Consumer implements Runnable { Queue queue; public Consumer(Queue readQ) { queue = readQ; Thread consumer = new Thread(this); consumer.start(); } public void run() { while(true) { //get an object off the queue Object object = queue.dequeue(); //do some stuff with the object } } } 

Clase de productor

 public class Producer implements Runnable { Queue queue; public Producer(Queue readQ) { queue = readQ; Thread producer = new Thread(this); producer.start(); } public void run() { while(true) { //add to the queue some sort of unique object queue.enqueue(new Object()); } } } 

Clase de cola

 //the extended Queue class is a thread safe (written in house) class public class QueueHandler extends Queue { public QueueHandler(int size) { super(size); //All I'm thinking about now is McDonalds. } public void enqueue(Object object) { //do some stuff readQ.add(); } public Object dequeue() { //do some stuff return readQ.get(); } } 

¡Y ve!

Java 5+ tiene todas las herramientas que necesita para este tipo de cosas. Querrás:

  1. Pon todos tus productores en un solo ExecutorService ;
  2. Pon todos tus consumidores en otro ExecutorService ;
  3. Si es necesario, comuníquese entre los dos usando un BlockingQueue .

Digo “si es necesario” para (3) porque desde mi experiencia es un paso innecesario. Todo lo que hace es enviar nuevas tareas al servicio del ejecutor del consumidor. Asi que:

 final ExecutorService producers = Executors.newFixedThreadPool(100); final ExecutorService consumers = Executors.newFixedThreadPool(100); while (/* has more work */) { producers.submit(...); } producers.shutdown(); producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); consumers.shutdown(); consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 

Entonces los producers presentan directamente a los consumers .

OK, como otros señalan, lo mejor que se puede hacer es usar el paquete java.util.concurrent . Recomiendo “Java Concurrency in Practice”. Es un gran libro que cubre casi todo lo que necesita saber.

En cuanto a su implementación particular, como noté en los comentarios, no inicie Threads from Constructors, puede no ser seguro.

Dejando eso de lado, la segunda implementación parece mejor. No desea poner colas en campos estáticos. Probablemente estés perdiendo flexibilidad por nada.

Si desea continuar con su propia implementación (¿para fines de aprendizaje, supongo?), Al menos suministre un método start() . Debe construir el objeto (puede crear una instancia del objeto Thread ) y luego llamar a start() para iniciar el hilo.

Editar: ExecutorService tiene su propia cola, por lo que puede ser confuso. Aquí hay algo para que comiences.

 public class Main { public static void main(String[] args) { //The numbers are just silly tune parameters. Refer to the API. //The important thing is, we are passing a bounded queue. ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue(100)); //No need to bound the queue for this executor. //Use utility method instead of the complicated Constructor. ExecutorService producer = Executors.newSingleThreadExecutor(); Runnable produce = new Produce(consumer); producer.submit(produce); } } class Produce implements Runnable { private final ExecutorService consumer; public Produce(ExecutorService consumer) { this.consumer = consumer; } @Override public void run() { Pancake cake = Pan.cook(); Runnable consume = new Consume(cake); consumer.submit(consume); } } class Consume implements Runnable { private final Pancake cake; public Consume(Pancake cake){ this.cake = cake; } @Override public void run() { cake.eat(); } } 

EDITACIÓN adicional: para productor, en lugar de while(true) , puede hacer algo como:

 @Override public void run(){ while(!Thread.currentThread().isInterrupted()){ //do stuff } } 

De esta manera puede apagar el ejecutor llamando a .shutdownNow() . Si usa while(true) , no se apagará.

También tenga en cuenta que el Producer sigue siendo vulnerable a RuntimeExceptions (es decir, una RuntimeException detendrá el procesamiento)

Estás reinventando la rueda.

Si necesita persistencia y otras características empresariales, use JMS (sugeriría ActiveMq ).

Si necesita colas rápidas en memoria, use una de las impedancias de la cola de Java.

Si necesita admitir Java 1.4 o una versión anterior, use el excelente paquete simultáneo de Doug Lea.

He extendido la respuesta propuesta de cletus al ejemplo del código de trabajo.

  1. One ExecutorService (pes) acepta tareas de Producer .
  2. One ExecutorService (ces) acepta tareas del Consumer .
  3. Tanto el Producer como el Consumer comparten BlockingQueue .
  4. Las tareas de múltiples Producer generan diferentes números.
  5. Cualquiera de las tareas del Consumer puede consumir el número generado por el Producer

Código:

 import java.util.concurrent.*; public class ProducerConsumerWithES { public static void main(String args[]){ BlockingQueue sharedQueue = new LinkedBlockingQueue(); ExecutorService pes = Executors.newFixedThreadPool(2); ExecutorService ces = Executors.newFixedThreadPool(2); pes.submit(new Producer(sharedQueue,1)); pes.submit(new Producer(sharedQueue,2)); ces.submit(new Consumer(sharedQueue,1)); ces.submit(new Consumer(sharedQueue,2)); // shutdown should happen somewhere along with awaitTermination / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */ pes.shutdown(); ces.shutdown(); } } class Producer implements Runnable { private final BlockingQueue sharedQueue; private int threadNo; public Producer(BlockingQueue sharedQueue,int threadNo) { this.threadNo = threadNo; this.sharedQueue = sharedQueue; } @Override public void run() { for(int i=1; i<= 5; i++){ try { int number = i+(10*threadNo); System.out.println("Produced:" + number + ":by thread:"+ threadNo); sharedQueue.put(number); } catch (Exception err) { err.printStackTrace(); } } } } class Consumer implements Runnable{ private final BlockingQueue sharedQueue; private int threadNo; public Consumer (BlockingQueue sharedQueue,int threadNo) { this.sharedQueue = sharedQueue; this.threadNo = threadNo; } @Override public void run() { while(true){ try { int num = sharedQueue.take(); System.out.println("Consumed: "+ num + ":by thread:"+threadNo); } catch (Exception err) { err.printStackTrace(); } } } } 

salida:

 Produced:11:by thread:1 Produced:21:by thread:2 Produced:22:by thread:2 Consumed: 11:by thread:1 Produced:12:by thread:1 Consumed: 22:by thread:1 Consumed: 21:by thread:2 Produced:23:by thread:2 Consumed: 12:by thread:1 Produced:13:by thread:1 Consumed: 23:by thread:2 Produced:24:by thread:2 Consumed: 13:by thread:1 Produced:14:by thread:1 Consumed: 24:by thread:2 Produced:25:by thread:2 Consumed: 14:by thread:1 Produced:15:by thread:1 Consumed: 25:by thread:2 Consumed: 15:by thread:1 

Nota. Si no necesita múltiples Productores y Consumidores, mantenga solo Productor y Consumidor. He agregado varios productores y consumidores para mostrar las capacidades de BlockingQueue entre múltiples productores y consumidores.

Este es un código muy simple.

 import java.util.*; // @author : rootTraveller, June 2017 class ProducerConsumer { public static void main(String[] args) throws Exception { Queue queue = new LinkedList<>(); Integer buffer = new Integer(10); //Important buffer or queue size, change as per need. Producer producerThread = new Producer(queue, buffer, "PRODUCER"); Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER"); producerThread.start(); consumerThread.start(); } } class Producer extends Thread { private Queue queue; private int queueSize ; public Producer (Queue queueIn, int queueSizeIn, String ThreadName){ super(ThreadName); this.queue = queueIn; this.queueSize = queueSizeIn; } public void run() { while(true){ synchronized (queue) { while(queue.size() == queueSize){ System.out.println(Thread.currentThread().getName() + " FULL : waiting...\n"); try{ queue.wait(); //Important } catch (Exception ex) { ex.printStackTrace(); } } //queue empty then produce one, add and notify int randomInt = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); queue.add(randomInt); queue.notifyAll(); //Important } //synchronized ends here : NOTE } } } class Consumer extends Thread { private Queue queue; private int queueSize; public Consumer(Queue queueIn, int queueSizeIn, String ThreadName){ super (ThreadName); this.queue = queueIn; this.queueSize = queueSizeIn; } public void run() { while(true){ synchronized (queue) { while(queue.isEmpty()){ System.out.println(Thread.currentThread().getName() + " Empty : waiting...\n"); try { queue.wait(); //Important } catch (Exception ex) { ex.printStackTrace(); } } //queue not empty then consume one and notify System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove()); queue.notifyAll(); } //synchronized ends here : NOTE } } } 
  1. Código de Java “BlockingQueue” que tiene sincronizado el método de poner y obtener.
  2. Código Java “Productor”, hilo productor para producir datos.
  3. Código Java “Consumidor”, hilo de consumo para consumir los datos producidos.
  4. Código de Java “ProducerConsumer_Main”, función principal para iniciar el hilo productor y consumidor.

BlockingQueue.java

 public class BlockingQueue { int item; boolean available = false; public synchronized void put(int value) { while (available == true) { try { wait(); } catch (InterruptedException e) { } } item = value; available = true; notifyAll(); } public synchronized int get() { while(available == false) { try { wait(); } catch(InterruptedException e){ } } available = false; notifyAll(); return item; } } 

Consumer.java

 package com.sukanya.producer_Consumer; public class Consumer extends Thread { blockingQueue queue; private int number; Consumer(BlockingQueue queue,int number) { this.queue = queue; this.number = number; } public void run() { int value = 0; for (int i = 0; i < 10; i++) { value = queue.get(); System.out.println("Consumer #" + this.number+ " got: " + value); } } } 

ProducerConsumer_Main.java

 package com.sukanya.producer_Consumer; public class ProducerConsumer_Main { public static void main(String args[]) { BlockingQueue queue = new BlockingQueue(); Producer producer1 = new Producer(queue,1); Consumer consumer1 = new Consumer(queue,1); producer1.start(); consumer1.start(); } }