Implementando el Patrón Productor / Consumidor en C #

¿Cómo puedo implementar los patrones Productor / Consumidor en C # usando eventos y delegates ? ¿Qué necesito vigilar cuando se trata de recursos al usar estos patrones de diseño? ¿Hay algún caso límite que deba tener en cuenta?

Sé que este hilo es bastante viejo, pero como lo encontré algunas veces en mis búsquedas, decidí compartir este código productor-consumidor para personas que se preguntan cómo implementar una simple cola de trabajos generics productor-consumidor.

La clase Job se usa para ‘almacenar’ la llamada a un método de un objeto en forma de delegado. El delegado se llama cuando se procesa el trabajo. Cualquier argumento relevante también se almacena en esta clase de trabajo.

Con este patrón simple, es posible lograr multi-threading en los procesos de enqueue AND dequeue. En realidad, esta es la parte más fácil: el multi-threading trae nuevos desafíos a tu código, los notarás más tarde 😉

Originalmente publiqué este código en este hilo .

using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; // Compiled and tested in: Visual Studio 2017, DotNET 4.6.1 namespace MyNamespace { public class Program { public static void Main(string[] args) { MyApplication app = new MyApplication(); app.Run(); } } public class MyApplication { private BlockingCollection JobQueue = new BlockingCollection(); private CancellationTokenSource JobCancellationTokenSource = new CancellationTokenSource(); private CancellationToken JobCancellationToken; private Timer Timer; private Thread UserInputThread; public void Run() { // Give a name to the main thread: Thread.CurrentThread.Name = "Main"; // Fires a Timer thread: Timer = new Timer(new TimerCallback(TimerCallback), null, 1000, 2000); // Fires a thread to read user inputs: UserInputThread = new Thread(new ThreadStart(ReadUserInputs)) { Name = "UserInputs", IsBackground = true }; UserInputThread.Start(); // Prepares a token to cancel the job queue: JobCancellationToken = JobCancellationTokenSource.Token; // Start processing jobs: ProcessJobs(); // Clean up: JobQueue.Dispose(); Timer.Dispose(); UserInputThread.Abort(); Console.WriteLine("Done."); } private void ProcessJobs() { try { // Checks if the blocking collection is still up for dequeueing: while (!JobQueue.IsCompleted) { // The following line blocks the thread until a job is available or throws an exception in case the token is cancelled: JobQueue.Take(JobCancellationToken).Run(); } } catch { } } private void ReadUserInputs() { // User input thread is running here. ConsoleKey key = ConsoleKey.Enter; // Reads user inputs and queue them for processing until the escape key is pressed: while ((key = Console.ReadKey(true).Key) != ConsoleKey.Escape) { Job userInputJob = new Job("UserInput", this, new Action(ProcessUserInputs), key); JobQueue.Add(userInputJob); } // Stops processing the JobQueue: JobCancellationTokenSource.Cancel(); } private void ProcessUserInputs(ConsoleKey key) { // Main thread is running here. Console.WriteLine($"You just typed '{key}'. (Thread: {Thread.CurrentThread.Name})"); } private void TimerCallback(object param) { // Timer thread is running here. Job job = new Job("TimerJob", this, new Action(ProcessTimer), "A job from timer callback was processed."); JobQueue.TryAdd(job); // Just enqueues the job for later processing } private void ProcessTimer(string message) { // Main thread is running here. Console.WriteLine($"{message} (Thread: {Thread.CurrentThread.Name})"); } } ///  /// The Job class wraps an object's method call, with or without arguments. This method is called later, during the Job execution. ///  public class Job { public string Name { get; } private object TargetObject; private Delegate TargetMethod; private object[] Arguments; public Job(string name, object obj, Delegate method, params object[] args) { Name = name; TargetObject = obj; TargetMethod = method; Arguments = args; } public void Run() { try { TargetMethod.Method.Invoke(TargetObject, Arguments); } catch(Exception ex) { Debug.WriteLine($"Unexpected error running job '{Name}': {ex}"); } } } }