Limite la cantidad de hilos paralelos en C #

Estoy escribiendo un progtwig de C # para generar y cargar medio millón de archivos a través de FTP. Quiero procesar 4 archivos en paralelo ya que la máquina tiene 4 núcleos y la generación de archivos lleva mucho más tiempo. ¿Es posible convertir el siguiente ejemplo de Powershell en C #? ¿O hay algún marco mejor, como el marco Actor en C # (como F # MailboxProcessor)?

Ejemplo de Powershell

$maxConcurrentJobs = 3; # Read the input and queue it up $jobInput = get-content .\input.txt $queue = [System.Collections.Queue]::Synchronized( (New-Object System.Collections.Queue) ) foreach($item in $jobInput) { $queue.Enqueue($item) } # Function that pops input off the queue and starts a job with it function RunJobFromQueue { if( $queue.Count -gt 0) { $j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue() Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null } } # Start up to the max number of concurrent jobs # Each job will take care of running the rest for( $i = 0; $i -lt $maxConcurrentJobs; $i++ ) { RunJobFromQueue } 

Actualizar:
La conexión al servidor FTP remoto puede ser lenta, por lo que quiero limitar el proceso de carga de FTP.

Asumiendo que está construyendo esto con el TPL, puede establecer ParallelOptions.MaxDegreesOfParallelism en lo que quiera que sea.

Paralelo.Para obtener un ejemplo de código.

Task Parallel Library es tu amigo aquí. Vea este enlace que describe lo que está disponible para usted. Básicamente, el framework 4 viene con él, que optimiza estos subprocesos agrupados de hilos esencialmente en segundo plano para la cantidad de procesadores en la máquina en ejecución.

Tal vez algo como:

 ParallelOptions options = new ParallelOptions(); options.MaxDegreeOfParallelism = 4; 

Luego, en tu ciclo, algo así como:

 Parallel.Invoke(options, () => new WebClient().Upload("http://www.linqpad.net", "lp.html"), () => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html")); 

Si está utilizando .Net 4.0, puede usar la biblioteca paralela

Supongamos que está iterando a través del medio millón de archivos, puede “paralelamente” la iteración usando un Foreach Paralelo, por ejemplo, o puede echar un vistazo a PLinq Aquí, una comparación entre los dos

Básicamente, va a querer crear una Acción o Tarea para cada archivo para cargar, ponerlos en una Lista, y luego procesar esa lista, limitando el número que se puede procesar en paralelo.

La publicación de mi blog muestra cómo hacer esto con Tareas y con Acciones, y proporciona un ejemplo de proyecto que puede descargar y ejecutar para ver ambos en acción.

Con acciones

Si usa Actions, puede usar la función incorporada .Net Parallel.Invoke. Aquí lo limitamos a ejecutar como máximo 4 hilos en paralelo.

 var listOfActions = new List(); foreach (var file in files) { var localFile = file; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(() => UploadFile(localFile))); } var options = new ParallelOptions {MaxDegreeOfParallelism = 4}; Parallel.Invoke(options, listOfActions.ToArray()); 

Sin embargo, esta opción no es compatible con la función asincrónica, y supongo que es la función FileUpload, por lo que es posible que desee utilizar el siguiente ejemplo de tarea.

Con tareas

Con Tasks no hay función incorporada. Sin embargo, puede usar el que brindo en mi blog.

  ///  /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. ///  /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The cancellation token. public static async Task StartAndWaitAllThrottledAsync(IEnumerable tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } ///  /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. /// NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. ///  /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely. /// The cancellation token. public static async Task StartAndWaitAllThrottledAsync(IEnumerable tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) { var postTaskTasks = new List(); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. await Task.WhenAll(postTaskTasks.ToArray()); } } 

Y luego, creando su lista de tareas y llamando a la función para que se ejecuten, digamos que un máximo de 4 simultáneas a la vez, puede hacer esto:

 var listOfTasks = new List(); foreach (var file in files) { var localFile = file; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(async () => await UploadFile(localFile))); } await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4); 

Además, como este método es compatible con la función asincrónica, no bloqueará el subproceso de la interfaz de usuario, como lo haría el uso de Parallel.Invoke o Parallel.ForEach.

He codificado debajo de la técnica donde uso BlockingCollection como un administrador de conteo de hilos. Es bastante simple de implementar y maneja el trabajo. Simplemente acepta objetos Tarea y agrega un valor entero a la lista de locking, lo que aumenta el conteo de hilos en ejecución en 1. Cuando termina el hilo, este quita el objeto y libera el bloque al agregar la operación para las próximas tareas.

  public class BlockingTaskQueue { private BlockingCollection threadManager { get; set; } = null; public bool IsWorking { get { return threadManager.Count > 0 ? true : false; } } public BlockingTaskQueue(int maxThread) { threadManager = new BlockingCollection(maxThread); } public async Task AddTask(Task task) { Task.Run(() => { Run(task); }); } private bool Run(Task task) { try { threadManager.Add(1); task.Start(); task.Wait(); return true; } catch (Exception ex) { return false; } finally { threadManager.Take(); } } }