Nesting te espera en Parallel.ForEach

En una aplicación de metro, necesito ejecutar varias llamadas WCF. Hay una cantidad importante de llamadas que hacer, por lo que necesito hacerlas en un bucle paralelo. El problema es que el ciclo paralelo sale antes de que las llamadas WCF estén completas.

¿Cómo podría refactorizar esto para que funcione como se espera?

var ids = new List() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var customers = new System.Collections.Concurrent.BlockingCollection(); Parallel.ForEach(ids, async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }); foreach ( var customer in customers ) { Console.WriteLine(customer.ID); } Console.ReadKey(); 

La idea detrás de Parallel.ForEach() es que tienes un conjunto de hilos y cada hilo procesa parte de la colección. Como habrás notado, esto no funciona con asyncawait , donde deseas liberar el hilo mientras dura la llamada asincrónica.

Podrías “arreglarlo” bloqueando los hilos ForEach() , pero eso derrota todo el punto de asyncawait .

Lo que podría hacer es usar TPL Dataflow en lugar de Parallel.ForEach() , que también admite Task asíncronas.

Específicamente, su código podría escribirse utilizando un TransformBlock que transforma cada identificación en un Customer utiliza el async lambda. Este bloque se puede configurar para ejecutarse en paralelo. Usted vincularía ese bloque a un ActionBlock que escribe cada Customer en la consola. Después de configurar la red de bloques, puede Post() cada ID en TransformBlock .

En codigo:

 var ids = new List { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var getCustomerBlock = new TransformBlock( async i => { ICustomerRepo repo = new CustomerRepo(); return await repo.GetCustomer(i); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); var writeCustomerBlock = new ActionBlock(c => Console.WriteLine(c.ID)); getCustomerBlock.LinkTo( writeCustomerBlock, new DataflowLinkOptions { PropagateCompletion = true }); foreach (var id in ids) getCustomerBlock.Post(id); getCustomerBlock.Complete(); writeCustomerBlock.Completion.Wait(); 

Aunque probablemente desee limitar el paralelismo de TransformBlock a una pequeña constante. Además, podría limitar la capacidad de TransformBlock y agregar los elementos a ella de forma asíncrona utilizando SendAsync() , por ejemplo, si la colección es demasiado grande.

Como un beneficio adicional en comparación con su código (si funcionó), la escritura comenzará tan pronto como se complete un solo elemento y no espere hasta que todo el procesamiento haya finalizado.

La respuesta de svick es (como de costumbre) excelente.

Sin embargo, considero que Dataflow es más útil cuando realmente tiene grandes cantidades de datos para transferir. O cuando necesita una cola compatible con async .

En su caso, una solución más simple es simplemente usar el paralelismo estilo async :

 var ids = new List() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var customerTasks = ids.Select(i => { ICustomerRepo repo = new CustomerRepo(); return repo.GetCustomer(i); }); var customers = await Task.WhenAll(customerTasks); foreach (var customer in customers) { Console.WriteLine(customer.ID); } Console.ReadKey(); 

Usar DataFlow como se sugirió puede ser excesivo, y la respuesta de Stephen no proporciona los medios para controlar la concurrencia de la operación. Sin embargo, eso se puede lograr de manera bastante simple:

 public static async Task RunWithMaxDegreeOfConcurrency( int maxDegreeOfConcurrency, IEnumerable collection, Func taskFactory) { var activeTasks = new List(maxDegreeOfConcurrency); foreach (var task in collection.Select(taskFactory)) { activeTasks.Add(task); if (activeTasks.Count == maxDegreeOfConcurrency) { await Task.WhenAny(activeTasks.ToArray()); //observe exceptions here activeTasks.RemoveAll(t => t.IsCompleted); } } await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => { //observe exceptions in a manner consistent with the above }); } 

Las llamadas a ToArray() se pueden optimizar utilizando una matriz en lugar de una lista y reemplazando las tareas completadas, pero dudo que ToArray() una gran diferencia en la mayoría de los escenarios. Uso de muestra según la pregunta del OP:

 RunWithMaxDegreeOfConcurrency(10, ids, async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }); 

El usuario de EDIT Fellow SO y el experto de TPL, Eli Arbel, me señalaron un artículo relacionado de Stephen Toub . Como de costumbre, su implementación es elegante y eficiente:

 public static Task ForEachAsync( this IEnumerable source, int dop, Func body) { return Task.WhenAll( from partition in Partitioner.Create(source).GetPartitions(dop) select Task.Run(async delegate { using (partition) while (partition.MoveNext()) await body(partition.Current).ContinueWith(t => { //observe exceptions }); })); } 

Puede ahorrar esfuerzo con el nuevo paquete AsyncEnumerator NuGet , que no existía hace 4 años cuando la pregunta se publicó originalmente. Le permite controlar el grado de paralelismo:

 using System.Collections.Async; ... await ids.ParallelForEachAsync(async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }, maxDegreeOfParallelism: 10); 

Descargo de responsabilidad: soy el autor de la biblioteca AsyncEnumerator, que es de código abierto y con licencia bajo MIT, y estoy publicando este mensaje solo para ayudar a la comunidad.

Task.Run() Parallel.Foreach en una Task.Run() y en lugar de la palabra clave [yourasyncmethod].Result use [yourasyncmethod].Result

(debe hacer la tarea “Ejecutar.Run” para no bloquear el hilo de la interfaz de usuario)

Algo como esto:

 var yourForeachTask = Task.Run(() => { Parallel.ForEach(ids, i => { ICustomerRepo repo = new CustomerRepo(); var cust = repo.GetCustomer(i).Result; customers.Add(cust); }); }); await yourForeachTask; 

Esto debería ser bastante eficiente y más fácil que hacer funcionar todo el TPL Dataflow:

 var customers = await ids.SelectAsync(async i => { ICustomerRepo repo = new CustomerRepo(); return await repo.GetCustomer(i); }); ... public static async Task> SelectAsync(this IEnumerable source, Func> selector, int maxDegreesOfParallelism = 4) { var results = new List(); var activeTasks = new HashSet>(); foreach (var item in source) { activeTasks.Add(selector(item)); if (activeTasks.Count >= maxDegreesOfParallelism) { var completed = await Task.WhenAny(activeTasks); activeTasks.Remove(completed); results.Add(completed.Result); } } results.AddRange(await Task.WhenAll(activeTasks)); return results; } 

Después de presentar un grupo de métodos de ayuda, podrá ejecutar consultas paralelas con este simple sintax:

 const int DegreeOfParallelism = 10; IEnumerable result = await Enumerable.Range(0, 1000000) .Split(DegreeOfParallelism) .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false)) .ConfigureAwait(false); 

Lo que ocurre aquí es que dividimos la recostackción de fonts en 10 fragmentos ( .Split(DegreeOfParallelism) ), luego ejecutamos 10 tareas, cada una de las cuales procesa sus elementos uno por uno ( .SelectManyAsync(...) ) y los .SelectManyAsync(...) en una sola lista.

Vale la pena mencionar que hay un enfoque más simple:

 double[] result2 = await Enumerable.Range(0, 1000000) .Select(async i => await CalculateAsync(i).ConfigureAwait(false)) .WhenAll() .ConfigureAwait(false); 

Pero necesita una precaución : si tiene una colección de fonts que es demasiado grande, producirá una Task para cada elemento de inmediato, lo que puede ocasionar importantes resultados de rendimiento.

Los métodos de extensión utilizados en los ejemplos anteriores tienen el siguiente aspecto:

 public static class CollectionExtensions { ///  /// Splits collection into number of collections of nearly equal size. ///  public static IEnumerable> Split(this IEnumerable src, int slicesCount) { if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount)); List source = src.ToList(); var sourceIndex = 0; for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++) { var list = new List(); int itemsLeft = source.Count - targetIndex; while (slicesCount * list.Count < itemsLeft) { list.Add(source[sourceIndex++]); } yield return list; } } ///  /// Takes collection of collections, projects those in parallel and merges results. ///  public static async Task> SelectManyAsync( this IEnumerable> source, Func> func) { List[] slices = await source .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false)) .WhenAll() .ConfigureAwait(false); return slices.SelectMany(s => s); } /// Runs selector and awaits results. public static async Task> SelectListAsync(this IEnumerable source, Func> selector) { List result = new List(); foreach (TSource source1 in source) { TResult result1 = await selector(source1).ConfigureAwait(false); result.Add(result1); } return result; } /// Wraps tasks with Task.WhenAll. public static Task WhenAll(this IEnumerable> source) { return Task.WhenAll(source); } } 

Llego un poco tarde a la fiesta, pero es posible que desee considerar el uso de GetAwaiter.GetResult () para ejecutar su código asíncrono en el contexto de sincronización, pero como se describe a continuación;

  Parallel.ForEach(ids, i => { ICustomerRepo repo = new CustomerRepo(); // Run this in thread which Parallel library occupied. var cust = repo.GetCustomer(i).GetAwaiter().GetResult(); customers.Add(cust); }); 

Un método de extensión para esto que hace uso de SemaphoreSlim y también permite establecer un grado máximo de paralelismo

  ///  /// Concurrently Executes async actions for each item of  ///  /// Type of IEnumerable /// instance of "/> /// an async  to execute /// Optional, An integer that represents the maximum degree of parallelism, /// Must be grater than 0 /// A Task representing an async operation /// If the maxActionsToRunInParallel is less than 1 public static async Task ForEachAsyncConcurrent( this IEnumerable enumerable, Func action, int? maxDegreeOfParallelism = null) { if (maxDegreeOfParallelism.HasValue) { using (var semaphoreSlim = new SemaphoreSlim( maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value)) { var tasksWithThrottler = new List(); foreach (var item in enumerable) { // Increment the number of currently running tasks and wait if they are more than limit. await semaphoreSlim.WaitAsync(); tasksWithThrottler.Add(Task.Run(async () => { await action(item); // action is completed, so decrement the number of currently running tasks semaphoreSlim.Release(); })); } // Wait for all tasks to complete. await Task.WhenAll(tasksWithThrottler.ToArray()); } } else { await Task.WhenAll(enumerable.Select(item => action(item))); } } 

Uso de la muestra:

 await enumerable.ForEachAsyncConcurrent( async item => { await SomeAsyncMethod(item); }, 5);