C # productor / consumidor

Recientemente me encontré con una implementación de patrón de productor / consumidor c #. es muy simple y (al menos para mí) muy elegante.

parece haber sido ideado alrededor de 2006, entonces me preguntaba si esta implementación es
– seguro
– sigue siendo aplicable

El código está debajo (el código original fue referenciado en http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375 )

using System; using System.Collections; using System.Threading; public class Test { static ProducerConsumer queue; static void Main() { queue = new ProducerConsumer(); new Thread(new ThreadStart(ConsumerJob)).Start(); Random rng = new Random(0); for (int i=0; i < 10; i++) { Console.WriteLine ("Producing {0}", i); queue.Produce(i); Thread.Sleep(rng.Next(1000)); } } static void ConsumerJob() { // Make sure we get a different random seed from the // first thread Random rng = new Random(1); // We happen to know we've only got 10 // items to receive for (int i=0; i < 10; i++) { object o = queue.Consume(); Console.WriteLine ("\t\t\t\tConsuming {0}", o); Thread.Sleep(rng.Next(1000)); } } } public class ProducerConsumer { readonly object listLock = new object(); Queue queue = new Queue(); public void Produce(object o) { lock (listLock) { queue.Enqueue(o); // We always need to pulse, even if the queue wasn't // empty before. Otherwise, if we add several items // in quick succession, we may only pulse once, waking // a single thread up, even if there are multiple threads // waiting for items. Monitor.Pulse(listLock); } } public object Consume() { lock (listLock) { // If the queue is empty, wait for an item to be added // Note that this is a while loop, as we may be pulsed // but not wake up before another thread has come in and // consumed the newly added object. In that case, we'll // have to wait for another pulse. while (queue.Count==0) { // This releases listLock, only reacquiring it // after being woken up by a call to Pulse Monitor.Wait(listLock); } return queue.Dequeue(); } } } 

El código es anterior a eso, lo escribí un tiempo antes de que apareciera .NET 2.0. El concepto de cola de productor / consumidor es mucho más antiguo que eso 🙂

Sí, ese código es seguro hasta donde yo sé, pero tiene algunas deficiencias:

  • No es genérico. Una versión moderna sin duda sería genérica.
  • No tiene forma de detener la cola. Una forma simple de detener la cola (para que todos los subprocesos de consumidor se retiren) es tener un token de “detener el trabajo” que se puede poner en la cola. A continuación, agrega tantos tokens como hilos tenga. Alternativamente, tiene una bandera separada para indicar que quiere detenerse. (Esto permite que los otros hilos se detengan antes de terminar todo el trabajo actual en la cola).
  • Si los trabajos son muy pequeños, consumir un solo trabajo a la vez puede no ser lo más eficiente.

Las ideas detrás del código son más importantes que el código en sí, para ser sincero.

Podría hacer algo como el siguiente fragmento de código. Es genérico y tiene un método para poner nulos en cola (o cualquier indicador que quiera usar) para indicarle a los hilos de trabajo que salgan.

El código está tomado de aquí: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace ConsoleApplication1 { public class TaskQueue : IDisposable where T : class { object locker = new object(); Thread[] workers; Queue taskQ = new Queue(); public TaskQueue(int workerCount) { workers = new Thread[workerCount]; // Create and start a separate thread for each worker for (int i = 0; i < workerCount; i++) (workers[i] = new Thread(Consume)).Start(); } public void Dispose() { // Enqueue one null task per worker to make each exit. foreach (Thread worker in workers) EnqueueTask(null); foreach (Thread worker in workers) worker.Join(); } public void EnqueueTask(T task) { lock (locker) { taskQ.Enqueue(task); Monitor.PulseAll(locker); } } void Consume() { while (true) { T task; lock (locker) { while (taskQ.Count == 0) Monitor.Wait(locker); task = taskQ.Dequeue(); } if (task == null) return; // This signals our exit Console.Write(task); Thread.Sleep(1000); // Simulate time-consuming task } } } } 

De regreso en el día, aprendí cómo funciona Monitor.Wait / Pulse (y mucho sobre los hilos en general) del código anterior y de la serie de artículos de la que proviene. Entonces, como dice Jon, tiene mucho valor y, de hecho, es seguro y aplicable.

Sin embargo, a partir de .NET 4, hay una implementación de cola productor-consumidor en el marco . Solo lo encontré yo mismo, pero hasta este punto hace todo lo que necesito.

Advertencia: si lees los comentarios, entenderás que mi respuesta es incorrecta 🙂

Hay un posible punto muerto en su código.

Imagínese el siguiente caso, para mayor claridad, utilicé un enfoque de subproceso único, pero debería ser fácil de convertir a multihebra con el modo de suspensión:

 // We create some actions... object locker = new object(); Action action1 = () => { lock (locker) { System.Threading.Monitor.Wait(locker); Console.WriteLine("This is action1"); } }; Action action2 = () => { lock (locker) { System.Threading.Monitor.Wait(locker); Console.WriteLine("This is action2"); } }; // ... (stuff happens, etc.) // Imagine both actions were running // and there's 0 items in the queue // And now the producer kicks in... lock (locker) { // This would add a job to the queue Console.WriteLine("Pulse now!"); System.Threading.Monitor.Pulse(locker); } // ... (more stuff) // and the actions finish now! Console.WriteLine("Consume action!"); action1(); // Oops... they're locked... action2(); 

Por favor avíseme si esto no tiene ningún sentido.

Si esto se confirma, la respuesta a su pregunta es: “no, no es seguro”;) Espero que esto ayude.