Estrangulamiento de tareas asíncronas

Me gustaría ejecutar un montón de tareas asíncronas, con un límite de cuántas tareas pueden estar pendientes de finalizar en un momento dado.

Digamos que tiene 1000 URL, y solo quiere tener 50 solicitudes abiertas a la vez; pero tan pronto como se completa una solicitud, se abre una conexión a la siguiente URL en la lista. De esta forma, siempre hay exactamente 50 conexiones abiertas a la vez, hasta que se agote la lista de URL.

También quiero utilizar un número dado de hilos si es posible.

Se me ocurrió un método de extensión, ThrottleTasksAsync , que hace lo que quiero. ¿Ya hay una solución más simple? Yo asumiría que este es un escenario común.

Uso:

 class Program { static void Main(string[] args) { Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait(); Console.WriteLine("Press a key to exit..."); Console.ReadKey(true); } } 

Aquí está el código:

 static class IEnumerableExtensions { public static async Task ThrottleTasksAsync(this IEnumerable enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task> taskToRun) { var blockingQueue = new BlockingCollection(new ConcurrentBag()); var semaphore = new SemaphoreSlim(maxConcurrentTasks); // Run the throttler on a separate thread. var t = Task.Run(() => { foreach (var item in enumerable) { // Wait for the semaphore semaphore.Wait(); blockingQueue.Add(item); } blockingQueue.CompleteAdding(); }); var taskList = new List<Task>(); Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, _ => { Enumerable_T item; if (blockingQueue.TryTake(out item, 100)) { taskList.Add( // Run the task taskToRun(item) .ContinueWith(tsk => { // For effect Thread.Sleep(2000); // Release the semaphore semaphore.Release(); return tsk.Result; } ) ); } }); // Await all the tasks. return await Task.WhenAll(taskList); } static IEnumerable IterateUntilTrue(Func condition) { while (!condition()) yield return true; } } 

El método utiliza BlockingCollection y SemaphoreSlim para que funcione. El regulador se ejecuta en un hilo y todas las tareas asíncronas se ejecutan en el otro hilo. Para lograr el paralelismo, agregué un parámetro maxDegreeOfParallelism que se pasó a un bucle Parallel.ForEach reasignado como un ciclo while.

La versión anterior era:

 foreach (var master = ...) { var details = ...; Parallel.ForEach(details, detail => { // Process each detail record here }, new ParallelOptions { MaxDegreeOfParallelism = 15 }); // Perform the final batch updates here } 

Pero, el grupo de subprocesos se agota rápidamente, y no puede hacer async / await .

Bonificación: para evitar el problema en BlockingCollection donde se lanza una excepción en Take() cuando se llama a CompleteAdding() , estoy usando la sobrecarga de TryTake con un tiempo de espera TryTake . Si no utilizo el tiempo de espera en TryTake , se evitaría el uso de BlockingCollection porque TryTake no se bloqueará. ¿Hay una mejor manera? Idealmente, habría un método TakeAsync .

Como se sugirió, use TPL Dataflow.

Un TransformBlock puede ser lo que está buscando.

Define un MaxDegreeOfParallelism para limitar cuántas cadenas se pueden transformar (es decir, cuántas URL se pueden descargar) en paralelo. A continuación, publica las URL en el bloque y, cuando finaliza, le dice al bloque que ha terminado de agregar elementos y obtiene las respuestas.

 var downloader = new TransformBlock( url => Download(url), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 } ); var buffer = new BufferBlock(); downloader.LinkTo(buffer); foreach(var url in urls) downloader.Post(url); //or await downloader.SendAsync(url); downloader.Complete(); await downloader.Completion; IList responses; if (buffer.TryReceiveAll(out responses)) { //process responses } 

Nota: TransformBlock almacena su entrada y salida. ¿Por qué, entonces, tenemos que vincularlo a un BufferBlock ?

Debido a que TransformBlock no se completará hasta que se hayan consumido todos los elementos ( HttpResponse ), y await downloader.Completion . La terminación se suspenderá. En cambio, dejamos que el progtwig de downloader reenvíe toda su salida a un bloque de almacenamiento intermedio dedicado, luego esperamos a que finalice el downloader e inspeccionamos el bloque de almacenamiento intermedio.

Digamos que tiene 1000 URL, y solo quiere tener 50 solicitudes abiertas a la vez; pero tan pronto como se completa una solicitud, se abre una conexión a la siguiente URL en la lista. De esta forma, siempre hay exactamente 50 conexiones abiertas a la vez, hasta que se agote la lista de URL.

La siguiente solución simple ha surgido muchas veces aquí en SO. No usa código de locking y no crea hilos explícitamente, por lo que se escala muy bien:

 const int MAX_DOWNLOADS = 50; static async Task DownloadAsync(string[] urls) { using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async url => { await semaphore.WaitAsync(); try { var data = await httpClient.GetStringAsync(url); Console.WriteLine(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks); } } 

El hecho es que el procesamiento de los datos descargados debe hacerse en una tubería diferente , con un nivel de paralelismo diferente , especialmente si se trata de un procesamiento vinculado a la CPU.

Por ejemplo, probablemente desee tener 4 subprocesos haciendo simultáneamente el procesamiento de datos (la cantidad de núcleos de CPU) y hasta 50 solicitudes pendientes para más datos (que no usan subprocesos en absoluto). AFAICT, esto no es lo que su código está haciendo actualmente.

Ahí es donde TPL Dataflow o Rx pueden ser útiles como una solución preferida. Sin embargo, es ciertamente posible implementar algo como esto con TPL simple. Tenga en cuenta que el único código de locking aquí es el que está procesando los datos dentro de Task.Run :

 const int MAX_DOWNLOADS = 50; const int MAX_PROCESSORS = 4; // process data class Processing { SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS); HashSet _pending = new HashSet(); object _lock = new Object(); async Task ProcessAsync(string data) { await _semaphore.WaitAsync(); try { await Task.Run(() => { // simuate work Thread.Sleep(1000); Console.WriteLine(data); }); } finally { _semaphore.Release(); } } public async void QueueItemAsync(string data) { var task = ProcessAsync(data); lock (_lock) _pending.Add(task); try { await task; } catch { if (!task.IsCanceled && !task.IsFaulted) throw; // not the task's exception, rethrow // don't remove faulted/cancelled tasks from the list return; } // remove successfully completed tasks from the list lock (_lock) _pending.Remove(task); } public async Task WaitForCompleteAsync() { Task[] tasks; lock (_lock) tasks = _pending.ToArray(); await Task.WhenAll(tasks); } } // download data static async Task DownloadAsync(string[] urls) { var processing = new Processing(); using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async (url) => { await semaphore.WaitAsync(); try { var data = await httpClient.GetStringAsync(url); // put the result on the processing pipeline processing.QueueItemAsync(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks.ToArray()); await processing.WaitForCompleteAsync(); } } 

Según lo solicitado, aquí está el código con el que terminé.

El trabajo se configura en una configuración de detalle maestra y cada maestro se procesa como un lote. Cada unidad de trabajo está en cola de esta manera:

 var success = true; // Start processing all the master records. Master master; while (null != (master = await StoredProcedures.ClaimRecordsAsync(...))) { await masterBuffer.SendAsync(master); } // Finished sending master records masterBuffer.Complete(); // Now, wait for all the batches to complete. await batchAction.Completion; return success; 

Los maestros se almacenan en búfer uno a la vez para ahorrar trabajo para otros procesos externos. Los detalles de cada maestro se envían para trabajar a través de masterTransform TransformManyBlock . Un BatchedJoinBlock también se crea para recostackr los detalles en un lote.

El trabajo real se realiza en detailTransform TransformBlock , de forma asincrónica, 150 a la vez. BoundedCapacity se establece en 300 para garantizar que demasiados Masters no se almacenan en el búfer al principio de la cadena, al tiempo que deja espacio para que se pongan en cola suficientes registros de detalles para permitir que se procesen 150 registros a la vez. El bloque emite un object a sus destinos, porque se filtra a través de los enlaces dependiendo de si se trata de un Detail o una Exception .

batchAction ActionBlock recostack la salida de todos los lotes y realiza actualizaciones masivas de la base de datos, registro de errores, etc. para cada lote.

Habrá varios BatchedJoinBlock s, uno para cada maestro. Como cada ISourceBlock se ISourceBlock secuencialmente y cada lote solo acepta la cantidad de registros de detalles asociados con un maestro, los lotes se procesarán en orden. Cada bloque solo emite un grupo y se desvincula al finalizar. Solo el último bloque de lote propaga su finalización al ActionBlock final.

La red de flujo de datos:

 // The dataflow network BufferBlock masterBuffer = null; TransformManyBlock masterTransform = null; TransformBlock detailTransform = null; ActionBlock, IList>> batchAction = null; // Buffer master records to enable efficient throttling. masterBuffer = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1 }); // Sequentially transform master records into a stream of detail records. masterTransform = new TransformManyBlock(async masterRecord => { var records = await StoredProcedures.GetObjectsAsync(masterRecord); // Filter the master records based on some criteria here var filteredRecords = records; // Only propagate completion to the last batch var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0; // Create a batch join block to encapsulate the results of the master record. var batchjoinblock = new BatchedJoinBlock(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 }); // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block. var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail); var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception); var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion }); // Unlink batchjoinblock upon completion. // (the returned task does not need to be awaited, despite the warning.) batchjoinblock.Completion.ContinueWith(task => { detailLink1.Dispose(); detailLink2.Dispose(); batchLink.Dispose(); }); return filteredRecords; }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); // Process each detail record asynchronously, 150 at a time. detailTransform = new TransformBlock(async detail => { try { // Perform the action for each detail here asynchronously await DoSomethingAsync(); return detail; } catch (Exception e) { success = false; return e; } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 }); // Perform the proper action for each batch batchAction = new ActionBlock, IList>>(async batch => { var details = batch.Item1.Cast(); var errors = batch.Item2.Cast(); // Do something with the batch here }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true }); masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true }); 
Intereting Posts