Cola de proceso con subprocesamiento múltiple o tareas

Tengo una aplicación de mensajes de telefonía en la que hay muchos mensajes que procesar. Debido a que los puertos del teléfono son limitados, el mensaje se procesará primero en primer lugar. Cada mensaje tiene un indicador ‘Confirmar’ que indica el que se procesa. Fue inicializado como falso por supuesto.

Quiero poner todos los mensajes en una cola y luego procesarlos con múltiples hilos o tareas.

public class MessageQueue { public Queue MessageWorkItem { get; set; } public Messages Message { get; set; } public MessageQueue() { MessageWorkItem = new Queue(); Message = new Messages(); } public void GetMessageMetaData() { try { // It is just a test, add only one item into the queue Message.MessageID = Guid.NewGuid(); Message.NumberToCall = "1111111111"; Message.FacilityID = "3333"; Message.NumberToDial = "2222222222"; Message.CountryCode = "1"; Message.Acknowledge = false; } catch (Exception ex) { } } public void AddingItemToQueue() { GetMessageMetaData(); if (!Message.Acknowledge) { lock (MessageWorkItem) { MessageWorkItem.Enqueue(Message); } } } } public class Messages { public Guid MessageID { get; set; } public string NumberToCall { get; set; } public string FacilityID { get; set; } public string NumberToDial { get; set; } public string CountryCode { get; set; } public bool Acknowledge { get; set; } } 

Ahora mi pregunta es cómo quitar el enrutamiento del artículo de la cola con multihebra. Para cada elemento de la cola, quiero ejecutar un script.

  public void RunScript(Message item) { try { PlayMessage(item); return; } catch (HangupException hex) { Log.WriteWithId("Caller Hungup!", hex.Message); } catch (Exception ex) { Log.WriteException(ex, "Unexpected exception: {0}"); } } 

Lo que pensé era ver si

if (MessageWorkItem.Count> = 1) Luego hago algo pero necesito ayuda con el código.

Si puede usar .Net 4.5, le sugiero que busque en Dataflow desde la Biblioteca de tareas paralelas (TPL) .

Esa página conduce a muchos tutoriales de ejemplo, como Cómo implementar un patrón de flujo de datos consumidor-productor y Tutorial: uso de Dataflow en una aplicación Windows Forms .

Echa un vistazo a esa documentación para ver si te puede ayudar. Es bastante para asimilar, pero creo que probablemente sea tu mejor enfoque.

Alternativamente, podría considerar usar un BlockingCollection junto con su método GetConsumingEnumerable() para acceder a los elementos en la cola.

Lo que debe hacer es dividir el trabajo en objetos que desea procesar de alguna manera, y usar un BlockingCollection para administrar la cola.

Algunos ejemplos de código que usan ints lugar de objetos como los elementos de trabajo ayudarán a demostrar esto:

Cuando un hilo de trabajo ha terminado con su elemento actual, eliminará un nuevo elemento de la cola de trabajo, procesará ese elemento y luego lo agregará a la cola de salida.

Un subproceso de consumidor separado elimina elementos completados de la cola de salida y hace algo con ellos.

Al final, debemos esperar a que todos los trabajadores terminen (Task.WaitAll (workers)) antes de que podamos marcar la cola de salida como completada (outputQueue.CompleteAdding ()).

 using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace Demo { class Program { static void Main(string[] args) { new Program().run(); } void run() { int threadCount = 4; Task[] workers = new Task[threadCount]; Task.Factory.StartNew(consumer); for (int i = 0; i < threadCount; ++i) { int workerId = i; Task task = new Task(() => worker(workerId)); workers[i] = task; task.Start(); } for (int i = 0; i < 100; ++i) { Console.WriteLine("Queueing work item {0}", i); inputQueue.Add(i); Thread.Sleep(50); } Console.WriteLine("Stopping adding."); inputQueue.CompleteAdding(); Task.WaitAll(workers); outputQueue.CompleteAdding(); Console.WriteLine("Done."); Console.ReadLine(); } void worker(int workerId) { Console.WriteLine("Worker {0} is starting.", workerId); foreach (var workItem in inputQueue.GetConsumingEnumerable()) { Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem); Thread.Sleep(100); // Simulate work. outputQueue.Add(workItem); // Output completed item. } Console.WriteLine("Worker {0} is stopping.", workerId); } void consumer() { Console.WriteLine("Consumer is starting."); foreach (var workItem in outputQueue.GetConsumingEnumerable()) { Console.WriteLine("Consumer is using item {0}", workItem); Thread.Sleep(25); } Console.WriteLine("Consumer is finished."); } BlockingCollection inputQueue = new BlockingCollection(); BlockingCollection outputQueue = new BlockingCollection(); } } 

Parallel.ForEach de TPL . Es paralelo para cada uno.

Muestra (cambiado MessageWorkItem a la cola genérica):

  public class MessageQueue { public Queue MessageWorkItem { get; set; } public MessageQueue() { MessageWorkItem = new Queue(); } public Message GetMessageMetaData() { try { // It is just a test, add only one item into the queue return new Message() { MessageID = Guid.NewGuid(), NumberToCall = "1111111111", FacilityID = "3333", NumberToDial = "2222222222", CountryCode = "1", Acknowledge = false }; } catch (Exception ex) { return null; } } public void AddingItemToQueue() { var message = GetMessageMetaData(); if (!message.Acknowledge) { lock (MessageWorkItem) { MessageWorkItem.Enqueue(message); } } } } public class Message { public Guid MessageID { get; set; } public string NumberToCall { get; set; } public string FacilityID { get; set; } public string NumberToDial { get; set; } public string CountryCode { get; set; } public bool Acknowledge { get; set; } } class Program { static void Main(string[] args) { MessageQueue me = new MessageQueue(); for (int i = 0; i < 10000; i++) me.AddingItemToQueue(); Console.WriteLine(me.MessageWorkItem.Count); Parallel.ForEach(me.MessageWorkItem, RunScript); } static void RunScript(Message item) { // todo: ... Console.WriteLine(item.MessageID); Thread.Sleep(300); } }