¿Cómo esperar a que terminen todos los hilos, usando ExecutorService?

Necesito ejecutar algunas tareas de 4 a la vez, algo como esto:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } //...wait for completion somehow 

¿Cómo puedo recibir una notificación una vez que todos estén completos? Por ahora no puedo pensar en nada mejor que establecer algún contador de tareas global y disminuirlo al final de cada tarea, luego monitorear en bucle infinito este contador para convertirme en 0; u obtener una lista de Futures y en el monitor de bucle infinito isDone para todos ellos. ¿Cuáles son las mejores soluciones que no implican bucles infinitos?

Gracias.

Básicamente, en un ExecutorService llama a shutdown() y luego awaitTermination() :

 ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } taskExecutor.shutdown(); try { taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { ... } 

Use un CountDownLatch :

 CountDownLatch latch = new CountDownLatch(totalNumberOfTasks); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } try { latch.await(); } catch (InterruptedException E) { // handle } 

y dentro de tu tarea (encierra en try / finally)

 latch.countDown(); 

ExecutorService.invokeAll() hace por usted.

 ExecutorService taskExecutor = Executors.newFixedThreadPool(4); List> tasks; // your tasks // invokeAll() returns when all tasks are complete List> futures = taskExecutor.invokeAll(tasks); 

También puedes usar Lists of Futures:

 List futures = new ArrayList(); // now add to it: futures.add(executorInstance.submit(new Callable() { public Void call() throws IOException { // do something return null; } })); 

luego, cuando desea unirse a todos ellos, es esencialmente el equivalente de unirse a cada uno (con el beneficio adicional de que vuelve a boost las excepciones de los hilos secundarios a los principales):

 for(Future f: this.futures) { f.get(); } 

Básicamente, el truco consiste en llamar a .get () en cada uno de los Future a la vez, en lugar de invocar infinitos bucles isDone () en (todos o cada uno). Entonces, se garantiza que “continuará” pasando y pasando este bloque tan pronto como termine el último hilo. La advertencia es que, dado que la llamada .get () vuelve a generar excepciones, si uno de los hilos muere, se levantará de esto posiblemente antes de que los otros hilos hayan terminado hasta su finalización [para evitar esto, podría agregar una catch ExecutionException alrededor del obtener llamada]. La otra advertencia es que mantiene una referencia a todos los subprocesos, de modo que si tienen variables locales de subprocesos, no se recostackrán hasta después de superar este bloque (aunque es posible que pueda evitar esto, si se convierte en un problema, eliminando El futuro está fuera de ArrayList). Si quisiera saber qué futuro “termina primero”, podría usar algo como https://stackoverflow.com/a/31885029/32453

Solo mis dos centavos. Para superar el requisito de CountDownLatch de saber el número de tareas de antemano, puede hacerlo a la vieja usanza mediante el uso de un Semaphore simple.

 ExecutorService taskExecutor = Executors.newFixedThreadPool(4); int numberOfTasks=0; Semaphore s=new Semaphore(0); while(...) { taskExecutor.execute(new MyTask()); numberOfTasks++; } try { s.aquire(numberOfTasks); ... 

En su tarea simplemente llame a la s.release() como lo haría latch.countDown();

En Java8 puedes hacerlo con CompletableFuture :

 ExecutorService es = Executors.newFixedThreadPool(4); List tasks = getTasks(); CompletableFuture< ?>[] futures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, es)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown(); 

La clase CyclicBarrier en Java 5 y posteriores está diseñada para este tipo de cosas.

Un poco tarde para el juego, pero para completar …

En lugar de ‘esperar’ a que terminen todas las tareas, puedes pensar en términos del principio de Hollywood, “no me llames, te llamaré”, cuando haya terminado. Creo que el código resultante es más elegante …

Guava ofrece algunas herramientas interesantes para lograr esto.

Un ejemplo ::

Envuelva un ExecutorService en un ListeningExecutorService ::

 ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); 

Presentar una colección de callables para su ejecución ::

 for (Callable callable : callables) { ListenableFuture lf = service.submit(callable); // listenableFutures is a collection listenableFutures.add(lf) }); 

Ahora la parte esencial:

 ListenableFuture> lf = Futures.successfulAsList(listenableFutures); 

Adjunte una callback a ListenableFuture, que puede usar para recibir una notificación cuando se completen todos los futuros:

  Futures.addCallback(lf, new FutureCallback>() { @Override public void onSuccess(List result) { log.info("@@ finished processing {} elements", Iterables.size(result)); // do something with all the results } @Override public void onFailure(Throwable t) { log.info("@@ failed because of :: {}", t); } }); 

Esto también ofrece la ventaja de que puede recostackr todos los resultados en un solo lugar una vez que finaliza el procesamiento …

Más información aquí

Sigue uno de los enfoques a continuación.

  1. Iterar a través de todas las tareas futuras , devuelto desde el submit en ExecutorService y verificar el estado con la llamada de locking get() en el objeto Future como lo sugiere Kiran
  2. Use invokeAll() en ExecutorService
  3. CountDownLatch
  4. ForkJoinPool o Executors.html # newWorkStealingPool
  5. Use shutdown, awaitTermination, shutdownNow APIs de ThreadPoolExecutor en la secuencia correcta

Preguntas relacionadas con SE:

¿Cómo se usa CountDownLatch en Java Multithreading?

Cómo cerrar correctamente el ExecutorService de Java

Puede ajustar sus tareas en otro ejecutable, que enviará notificaciones:

 taskExecutor.execute(new Runnable() { public void run() { taskStartedNotification(); new MyTask().run(); taskFinishedNotification(); } }); 

Acabo de escribir un progtwig de ejemplo que resuelve tu problema. No se dio una implementación concisa, así que agregaré una. Si bien puede usar executor.shutdown() y executor.awaitTermination() , no es la mejor práctica, ya que el tiempo empleado por los diferentes hilos sería impredecible.

 ExecutorService es = Executors.newCachedThreadPool(); List> tasks = new ArrayList<>(); for (int j = 1; j < = 10; j++) { tasks.add(new Callable() { @Override public Integer call() throws Exception { int sum = 0; System.out.println("Starting Thread " + Thread.currentThread().getId()); for (int i = 0; i < 1000000; i++) { sum += i; } System.out.println("Stopping Thread " + Thread.currentThread().getId()); return sum; } }); } try { List> futures = es.invokeAll(tasks); int flag = 0; for (Future f : futures) { Integer res = f.get(); System.out.println("Sum: " + res); if (!f.isDone()) flag = 1; } if (flag == 0) System.out.println("SUCCESS"); else System.out.println("FAILED"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } 

Solo para proporcionar más alternativas aquí diferentes para usar pestillo / barreras. También puede obtener los resultados parciales hasta que todos terminen de usar CompletionService .

Desde Java Concurrency en la práctica: “Si tiene un lote de cálculos para enviar a un Ejecutor y desea recuperar sus resultados a medida que estén disponibles, puede conservar el Futuro asociado con cada tarea y realizar encuestas repetidamente para completarlo llamando a get con un tiempo de espera fuera de cero. Esto es posible, pero tedioso . Afortunadamente hay una mejor manera : un servicio de finalización “.

Aquí la implementación

 public class TaskSubmiter { private final ExecutorService executor; TaskSubmiter(ExecutorService executor) { this.executor = executor; } void doSomethingLarge(AnySourceClass source) { final List info = doPartialAsyncProcess(source); CompletionService completionService = new ExecutorCompletionService(executor); for (final InterestedResult interestedResultItem : info) completionService.submit(new Callable() { public PartialResult call() { return InterestedResult.doAnOperationToGetPartialResult(); } }); try { for (int t = 0, n = info.size(); t < n; t++) { Future f = completionService.take(); PartialResult PartialResult = f.get(); processThisSegment(PartialResult); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw somethinghrowable(e.getCause()); } } } 

Podrías usar tu propia subclase de ExecutorCompletionService para envolver taskExecutor y tu propia implementación de BlockingQueue para informarte cuando cada tarea se complete y realizar cualquier callback u otra acción que desees cuando el número de tareas completadas scope tu objective deseado.

debe usar el método executorService.shutdown() y executorService.awaitTermination .

Un ejemplo de la siguiente manera:

 public class ScheduledThreadPoolExample { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); executorService.scheduleAtFixedRate(() -> System.out.println("process task."), 0, 1, TimeUnit.SECONDS); TimeUnit.SECONDS.sleep(10); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } } 

Java 8: podemos usar la API de transmisión para procesar la transmisión. Por favor vea el fragmento a continuación

 final List tasks = ...; //or any other functional interface tasks.stream().parallel().forEach(Runnable::run) // Uses default pool //alternatively to specify parallelism new ForkJoinPool(15).submit( () -> tasks.stream().parallel().forEach(Runnable::run) ).get(); 

Puedes usar este código:

 public class MyTask implements Runnable { private CountDownLatch countDownLatch; public MyTask(CountDownLatch countDownLatch { this.countDownLatch = countDownLatch; } @Override public void run() { try { //Do somethings // this.countDownLatch.countDown();//important } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); for (int i = 0; i < NUMBER_OF_TASKS; i++){ taskExecutor.execute(new MyTask(countDownLatch)); } countDownLatch.await(); System.out.println("Finish tasks"); 

Esta es mi solución, basada en la sugerencia de “AdamSkywalker”, y funciona

 package frss.main; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestHilos { void procesar() { ExecutorService es = Executors.newFixedThreadPool(4); List tasks = getTasks(); CompletableFuture< ?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown(); System.out.println("FIN DEL PROCESO DE HILOS"); } private List getTasks() { List tasks = new ArrayList(); Hilo01 task1 = new Hilo01(); tasks.add(task1); Hilo02 task2 = new Hilo02(); tasks.add(task2); return tasks; } private class Hilo01 extends Thread { @Override public void run() { System.out.println("HILO 1"); } } private class Hilo02 extends Thread { @Override public void run() { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("HILO 2"); } } public static void main(String[] args) { TestHilos test = new TestHilos(); test.procesar(); } } 

Así que publico mi respuesta de la pregunta vinculada aquí, en caso de que alguien quiera una forma más sencilla de hacerlo

 ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture[] futures = new CompletableFuture[10]; int i = 0; while (...) { futures[i++] = CompletableFuture.runAsync(runner, executor); } CompletableFuture.allOf(futures).join(); // THis will wait until all future ready. 

Aquí hay dos opciones, confunde un poco cuál es el mejor para llevar.

Opción 1:

 ExecutorService es = Executors.newFixedThreadPool(4); List tasks = getTasks(); CompletableFuture< ?>[] futures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, es)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown(); 

Opcion 2:

 ExecutorService es = Executors.newFixedThreadPool(4); List< Future> futures = new ArrayList<>(); for(Runnable task : taskList) { futures.add(es.submit(task)); } for(Future< ?> future : futures) { try { future.get(); }catch(Exception e){ // do logging and nothing else } } es.shutdown(); 

Aquí poniendo future.get (); en try catch es una buena idea, ¿verdad?

Esto podría ayudar

 Log.i(LOG_TAG, "shutting down executor..."); executor.shutdown(); while (true) { try { Log.i(LOG_TAG, "Waiting for executor to terminate..."); if (executor.isTerminated()) break; if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { break; } } catch (InterruptedException ignored) {} } 

Puede llamar a waitTillDone () en esta clase Runner :

 Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool while(...) { runner.run(new MyTask()); // here you submit your task } runner.waitTillDone(); // and this blocks until all tasks are finished (or failed) runner.shutdown(); // once you done you can shutdown the runner 

Puedes reutilizar esta clase y llamar a waitTillDone () tantas veces como quieras antes de llamar a shutdown (), además tu código es extremadamente simple . Además, no es necesario saber la cantidad de tareas por adelantado.

Para usarlo simplemente agregue esta comstackción gradle / maven compile 'com.github.matejtymes:javafixes:1.1.1' a su proyecto.

Más detalles se pueden encontrar aquí:

https://github.com/MatejTymes/JavaFixes

http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

Hay un método en el ejecutor getActiveCount() – que da la cuenta de hilos activos.

Después de recorrer el hilo, podemos verificar si el valor de activeCount() es 0 . Una vez que el valor es cero, significa que no hay hilos activos actualmente en ejecución, lo que significa que la tarea ha finalizado:

 while (true) { if (executor.getActiveCount() == 0) { //ur own piece of code break; } }