Espere a que se completen los subprocesos agrupados

Lo siento por una pregunta redundante. Sin embargo, he encontrado muchas soluciones a mi problema, pero ninguna de ellas está muy bien explicada. Espero que quede claro, aquí.

El hilo principal de mi aplicación C # genera 1..n trabajadores de fondo que usan ThreadPool. Deseo que el hilo original se bloquee hasta que todos los trabajadores hayan terminado. Investigué el ManualResetEvent en particular, pero no tengo claro su uso.

En pseudo:

foreach( var o in collection ) { queue new worker(o); } while( workers not completed ) { continue; } 

Si es necesario, sabré la cantidad de trabajadores que están a punto de ponerse en cola de antemano.

Prueba esto. La función incluye una lista de delegates de Acción. Se agregará una entrada de trabajador de ThreadPool para cada elemento de la lista. Esperará a que se complete cada acción antes de regresar.

 public static void SpawnAndWait(IEnumerable actions) { var list = actions.ToList(); var handles = new ManualResetEvent[actions.Count()]; for (var i = 0; i < list.Count; i++) { handles[i] = new ManualResetEvent(false); var currentAction = list[i]; var currentHandle = handles[i]; Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } }; ThreadPool.QueueUserWorkItem(x => wrappedAction()); } WaitHandle.WaitAll(handles); } 

Aquí hay un enfoque diferente: encapsulación; entonces tu código podría ser tan simple como:

  Forker p = new Forker(); foreach (var obj in collection) { var tmp = obj; p.Fork(delegate { DoSomeWork(tmp); }); } p.Join(); 

Donde la clase de Forker se da a continuación (me aburrí en el tren ;-p) … una vez más, esto evita los objetos del sistema operativo, pero envuelve las cosas bastante bien (IMO):

 using System; using System.Threading; /// Event arguments representing the completion of a parallel action. public class ParallelEventArgs : EventArgs { private readonly object state; private readonly Exception exception; internal ParallelEventArgs(object state, Exception exception) { this.state = state; this.exception = exception; } /// The opaque state object that identifies the action (null otherwise). public object State { get { return state; } } /// The exception thrown by the parallel action, or null if it completed without exception. public Exception Exception { get { return exception; } } } /// Provides a caller-friendly wrapper around parallel actions. public sealed class Forker { int running; private readonly object joinLock = new object(), eventLock = new object(); /// Raised when all operations have completed. public event EventHandler AllComplete { add { lock (eventLock) { allComplete += value; } } remove { lock (eventLock) { allComplete -= value; } } } private EventHandler allComplete; /// Raised when each operation completes. public event EventHandler ItemComplete { add { lock (eventLock) { itemComplete += value; } } remove { lock (eventLock) { itemComplete -= value; } } } private EventHandler itemComplete; private void OnItemComplete(object state, Exception exception) { EventHandler itemHandler = itemComplete; // don't need to lock if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception)); if (Interlocked.Decrement(ref running) == 0) { EventHandler allHandler = allComplete; // don't need to lock if (allHandler != null) allHandler(this, EventArgs.Empty); lock (joinLock) { Monitor.PulseAll(joinLock); } } } /// Adds a callback to invoke when each operation completes. /// Current instance (for fluent API). public Forker OnItemComplete(EventHandler handler) { if (handler == null) throw new ArgumentNullException("handler"); ItemComplete += handler; return this; } /// Adds a callback to invoke when all operations are complete. /// Current instance (for fluent API). public Forker OnAllComplete(EventHandler handler) { if (handler == null) throw new ArgumentNullException("handler"); AllComplete += handler; return this; } /// Waits for all operations to complete. public void Join() { Join(-1); } /// Waits (with timeout) for all operations to complete. /// Whether all operations had completed before the timeout. public bool Join(int millisecondsTimeout) { lock (joinLock) { if (CountRunning() == 0) return true; Thread.SpinWait(1); // try our luck... return (CountRunning() == 0) || Monitor.Wait(joinLock, millisecondsTimeout); } } /// Indicates the number of incomplete operations. /// The number of incomplete operations. public int CountRunning() { return Interlocked.CompareExchange(ref running, 0, 0); } /// Enqueues an operation. /// The operation to perform. /// The current instance (for fluent API). public Forker Fork(ThreadStart action) { return Fork(action, null); } /// Enqueues an operation. /// The operation to perform. /// An opaque object, allowing the caller to identify operations. /// The current instance (for fluent API). public Forker Fork(ThreadStart action, object state) { if (action == null) throw new ArgumentNullException("action"); Interlocked.Increment(ref running); ThreadPool.QueueUserWorkItem(delegate { Exception exception = null; try { action(); } catch (Exception ex) { exception = ex;} OnItemComplete(state, exception); }); return this; } } 

Primero, ¿cuánto tiempo ejecutan los trabajadores? los subprocesos de grupo generalmente se deben usar para tareas de corta duración; si van a ejecutarse por un tiempo, considere los subprocesos manuales.

Re el problema; ¿realmente necesitas bloquear el hilo principal? ¿Puedes usar una callback en su lugar? Si es así, algo como:

 int running = 1; // start at 1 to prevent multiple callbacks if // tasks finish faster than they are started Action endOfThread = delegate { if(Interlocked.Decrement(ref running) == 0) { // ****run callback method**** } }; foreach(var o in collection) { var tmp = o; // avoid "capture" issue Interlocked.Increment(ref running); ThreadPool.QueueUserWorkItem(delegate { DoSomeWork(tmp); // [A] should handle exceptions internally endOfThread(); }); } endOfThread(); // opposite of "start at 1" 

Esta es una forma bastante liviana (sin primitivos del SO) de rastrear a los trabajadores.

Si necesita bloquear, puede hacer lo mismo usando un Monitor (nuevamente, evitando un objeto OS):

  object syncLock = new object(); int running = 1; Action endOfThread = delegate { if (Interlocked.Decrement(ref running) == 0) { lock (syncLock) { Monitor.Pulse(syncLock); } } }; lock (syncLock) { foreach (var o in collection) { var tmp = o; // avoid "capture" issue ThreadPool.QueueUserWorkItem(delegate { DoSomeWork(tmp); // [A] should handle exceptions internally endOfThread(); }); } endOfThread(); Monitor.Wait(syncLock); } Console.WriteLine("all done"); 

He estado usando la nueva biblioteca de tareas Paralelas en CTP aquí :

  Parallel.ForEach(collection, o => { DoSomeWork(o); }); 

Aquí hay una solución que usa la clase CountdownEvent .

 var complete = new CountdownEvent(1); foreach (var o in collection) { var capture = o; ThreadPool.QueueUserWorkItem((state) => { try { DoSomething(capture); } finally { complete.Signal(); } }, null); } complete.Signal(); complete.Wait(); 

Por supuesto, si tiene acceso a la clase CountdownEvent , entonces tiene todo el TPL para trabajar. La clase Parallel se ocupa de esperarte.

 Parallel.ForEach(collection, o => { DoSomething(o); }); 

Creo que estabas en el camino correcto con el ManualResetEvent. Este enlace tiene una muestra de código que coincide exactamente con lo que intenta hacer. La clave es usar WaitHandle.WaitAll y pasar una serie de eventos de espera. Cada hilo necesita establecer uno de estos eventos de espera.

  // Simultaneously calculate the terms. ThreadPool.QueueUserWorkItem( new WaitCallback(CalculateBase)); ThreadPool.QueueUserWorkItem( new WaitCallback(CalculateFirstTerm)); ThreadPool.QueueUserWorkItem( new WaitCallback(CalculateSecondTerm)); ThreadPool.QueueUserWorkItem( new WaitCallback(CalculateThirdTerm)); // Wait for all of the terms to be calculated. WaitHandle.WaitAll(autoEvents); // Reset the wait handle for the next calculation. manualEvent.Reset(); 

Editar:

Asegúrese de que en la ruta del código del hilo del trabajador configure el evento (es decir, autoEvents 1. Set ();). Una vez que todos estén señalados, la espera volverá.

 void CalculateSecondTerm(object stateInfo) { double preCalc = randomGenerator.NextDouble(); manualEvent.WaitOne(); secondTerm = preCalc * baseNumber * randomGenerator.NextDouble(); autoEvents[1].Set(); } 

He encontrado una buena solución aquí:

http://msdn.microsoft.com/en-us/magazine/cc163914.aspx

Puede ser útil para otros con el mismo problema

Usando .NET 4.0 Barrie r class:

  Barrier sync = new Barrier(1); foreach(var o in collection) { WaitCallback worker = (state) => { // do work sync.SignalAndWait(); }; sync.AddParticipant(); ThreadPool.QueueUserWorkItem(worker, o); } sync.SignalAndWait();