ExecutorService, cómo esperar a que finalicen todas las tareas

¿Cuál es la forma más sencilla de esperar a que finalicen todas las tareas de ExecutorService ? Mi tarea es principalmente computacional, así que solo quiero ejecutar una gran cantidad de trabajos, uno en cada núcleo. En este momento mi configuración se ve así:

 ExecutorService es = Executors.newFixedThreadPool(2); for (DataTable singleTable : uniquePhrases) { es.execute(new ComputeDTask(singleTable)); } try{ es.wait(); } catch (InterruptedException e){ e.printStackTrace(); } 

ComputeDTask implementa ejecutable. Esto parece ejecutar las tareas correctamente, pero el código falla en wait() con IllegalMonitorStateException . Esto es extraño, porque jugué con algunos ejemplos de juguetes y pareció funcionar.

uniquePhrases contiene varias decenas de miles de elementos. ¿Debo usar otro método? Estoy buscando algo tan simple como sea posible

El enfoque más simple es usar ExecutorService.invokeAll() que hace lo que quiere en un trazador de líneas. En su lenguaje, deberá modificar o ajustar ComputeDTask para implementar Callable<> , lo que le puede dar un poco más de flexibilidad. Probablemente en su aplicación haya una implementación significativa de Callable.call() , pero esta es una forma de Callable.call() si no se usa Executors.callable() .

 ExecutorService es = Executors.newFixedThreadPool(2); List> todo = new ArrayList>(singleTable.size()); for (DataTable singleTable: uniquePhrases) { todo.add(Executors.callable(new ComputeDTask(singleTable))); } List> answers = es.invokeAll(todo); 

Como han señalado otros, puede usar la versión de tiempo de espera de invokeAll() si corresponde. En este ejemplo, las answers van a contener un grupo de Future s que devolverán valores nulos (consulte la definición de Executors.callable() . Probablemente lo que desea hacer es una ligera refactorización para que pueda obtener una respuesta útil de nuevo, o una referencia al ComputeDTask subyacente, pero no puedo decirlo por su ejemplo.

Si no está claro, tenga en cuenta que invokeAll() no volverá hasta que se completen todas las tareas. (es decir, todos los Future s en su colección de answers informarán .isDone() si se le solicita). Esto evita todo el apagado manual, espera la Terminación, etc. … y le permite reutilizar este ExecutorService prolijamente para ciclos múltiples, si lo desea.

Hay algunas preguntas relacionadas en SO:

  • Cómo esperar a que terminen todos los hilos

  • Devuelve valores de subprocesos de java

  • invokeAll () no está dispuesto a aceptar una Colección >

  • ¿Necesito sincronizar?

Ninguno de estos es estrictamente en el punto de su pregunta, pero dan un poco de color acerca de cómo la gente piensa Executor / ExecutorService debe ser utilizado.

Si desea esperar a que se completen todas las tareas, use el método de shutdown lugar de wait . Luego awaitTermination con la awaitTermination .

Además, puede utilizar Runtime.availableProcessors para obtener la cantidad de subprocesos de hardware para que pueda inicializar su subproceso correctamente.

Si no es su objective esperar a que finalicen todas las tareas en el ExecutorService , sino esperar hasta que se complete un lote específico de tareas , puede utilizar un CompletionService , específicamente, un ExecutorCompletionService .

La idea es crear un ExecutorCompletionService envuelva su Executor , envíe un número conocido de tareas a través del CompletionService , luego dibuje el mismo número de resultados de la cola de finalización usando take() (which blocks) o poll() (which does not) . Una vez que haya dibujado todos los resultados esperados correspondientes a las tareas que envió, sabrá que todo está hecho.

Permítanme exponer esto una vez más, porque no es obvio desde la interfaz: debe saber cuántas cosas pone en CompletionService para saber cuántas cosas intentar dibujar. Esto es especialmente importante con el método take() : llámalo una vez más y bloqueará tu cadena de llamada hasta que otro subproceso envíe otro trabajo al mismo CompletionService .

Hay algunos ejemplos que muestran cómo usar CompletionService en el libro Java Concurrency in Practice .

Si desea esperar a que el servicio del ejecutor finalice la ejecución, llame al shutdown() y luego, aguarde la terminación (units, unitType) , por ejemplo, awaitTermination(1, MINUTE) . El ExecutorService no bloquea en su propio monitor, por lo que no puede usar wait etc.

Puede esperar que los trabajos terminen en un determinado intervalo:

 int maxSecondsPerComputeDTask = 20; try { while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) { // consider giving up with a 'break' statement under certain conditions } } catch (InterruptedException e) { throw new RuntimeException(e); } 

O puede usar ExecutorService . submit ( Runnable ) y recoge los objetos Future que devuelve y llama a get () en cada uno de ellos para esperar a que finalicen.

 ExecutorService es = Executors.newFixedThreadPool(2); Collection> futures = new LinkedList< >(); for (DataTable singleTable : uniquePhrases) { futures.add(es.submit(new ComputeDTask(singleTable))); } for (Future< ?> future : futures) { try { future.get(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } } 

InterruptedException es extremadamente importante de manejar correctamente. Es lo que permite que usted o los usuarios de su biblioteca finalicen un largo proceso de forma segura.

Solo usa

 latch = new CountDownLatch(noThreads) 

En cada hilo

 latch.countDown(); 

y como barrera

 latch.await(); 

Causa raíz de IllegalMonitorStateException :

Lanzado para indicar que un subproceso ha intentado esperar en el monitor de un objeto o para notificar a otros subprocesos que esperan en el monitor de un objeto sin poseer el monitor especificado.

Desde su código, acaba de llamar a wait () en ExecutorService sin tener el locking.

El siguiente código corrige IllegalMonitorStateException

 try { synchronized(es){ es.wait(); // Add some condition before you call wait() } } 

Siga uno de los enfoques a continuación para esperar la finalización de todas las tareas, que se han enviado a ExecutorService .

  1. Iterar a través de todas las tareas Future de submit en ExecutorService y verificar el estado con la llamada de locking get() en el objeto Future

  2. Uso de invokeAll en ExecutorService

  3. Usando CountDownLatch

  4. Usando ForkJoinPool o newWorkStealingPool of Executors (desde java 8)

  5. Cierre el grupo como se recomienda en la página de documentación de Oracle

     void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } 

    Si desea aguardar con gracia todas las tareas para completar cuando está utilizando la opción 5 en lugar de las opciones 1 a 4, cambie

     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 

    a

    un while(condition) que verifica por cada 1 minuto.

También tengo la situación de que tengo un conjunto de documentos que deben rastrearse. Empiezo con un documento inicial de “semilla” que debe procesarse, ese documento contiene enlaces a otros documentos que también deben procesarse, y así sucesivamente.

En mi progtwig principal, solo quiero escribir algo como lo siguiente, donde Crawler controla un montón de hilos.

 Crawler c = new Crawler(); c.schedule(seedDocument); c.waitUntilCompletion() 

La misma situación sucedería si quisiera navegar por un árbol; Aparecería en el nodo raíz, el procesador de cada nodo agregaría elementos secundarios a la cola según fuera necesario, y un grupo de hilos procesaría todos los nodos en el árbol, hasta que no hubiera más.

No pude encontrar nada en la JVM, que pensé que era un poco sorprendente. Así que escribí una clase AutoStopThreadPool que se puede usar directamente o subclase para agregar métodos adecuados para el dominio, por ejemplo, schedule(Document) . ¡Espero eso ayude!

AutoStopThreadPool Javadoc | Descargar

Agregue todos los hilos en la colección y invokeAll usando invokeAll . Si puede usar el método invokeAll de ExecutorService , JVM no pasará a la siguiente línea hasta que todos los hilos estén completos.

Aquí hay un buen ejemplo: invokeAll via ExecutorService

Envíe sus tareas al Runner y luego espere llamar al método waitTillDone () de esta manera:

 Runner runner = Runner.runner(2); for (DataTable singleTable : uniquePhrases) { runner.run(new ComputeDTask(singleTable)); } // blocks until all tasks are finished (or failed) runner.waitTillDone(); runner.shutdown(); 

Para usarlo, agrega esta dependencia gradle / maven: 'com.github.matejtymes:javafixes:1.0'

Para obtener más detalles, consulte aquí: https://github.com/MatejTymes/JavaFixes o aquí: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

Puede utilizar el método ExecutorService.invokeAll , ejecutará todas las tareas y esperará a que todos los hilos terminen su tarea.

Aquí está completo javadoc

También puede utilizar la versión sobrecargada de este método para especificar el tiempo de espera.

Aquí hay un código de muestra con ExecutorService.invokeAll

 public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service = Executors.newFixedThreadPool(3); List> taskList = new ArrayList<>(); taskList.add(new Task1()); taskList.add(new Task2()); List> results = service.invokeAll(taskList); for (Future f : results) { System.out.println(f.get()); } } } class Task1 implements Callable { @Override public String call() throws Exception { try { Thread.sleep(2000); return "Task 1 done"; } catch (Exception e) { e.printStackTrace(); return " error in task1"; } } } class Task2 implements Callable { @Override public String call() throws Exception { try { Thread.sleep(3000); return "Task 2 done"; } catch (Exception e) { e.printStackTrace(); return " error in task2"; } } } 

Una alternativa simple a esto es usar hilos junto con join. Referirse: unir hilos

Esperaré a que el ejecutor finalice con un tiempo de espera especificado que considere adecuado para completar las tareas.

  try { //do stuff here exe.execute(thread); } finally { exe.shutdown(); } boolean result = exe.awaitTermination(4, TimeUnit.HOURS); if (!result) { LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour."); } 

Parece que necesita ForkJoinPool y usa el grupo global para ejecutar tareas.

 public static void main(String[] args) { // the default `commonPool` should be sufficient for many cases. ForkJoinPool pool = ForkJoinPool.commonPool(); // The root of your task that may spawn other tasks. // Make sure it submits the additional tasks to the same executor that it is in. Runnable rootTask = new YourTask(pool); pool.execute(rootTask); pool.awaitQuiescence(...); // that's it. } 

La belleza está en pool.awaitQuiescence donde el método bloqueará utilizar el hilo de la persona que llama para ejecutar sus tareas y luego regresar cuando esté realmente vacío.

Puede usar un ThreadPoolExecutor con un tamaño de grupo establecido en Runtime.getRuntime (). .execute() () y .execute() todas las tareas que desee ejecutar, luego llame a tpe.shutdown() y luego espere un while(!tpe.terminated()) { /* waiting for all tasks to complete */} que bloquea para que se completen todas las tareas enviadas. donde tpe es una referencia a su instancia de ThreadPoolExecutor .

O, si es más apropiado, utilice un ExecutionCompletionService Un CompletionService que utiliza un Executor suministrado para ejecutar tareas. Esta clase organiza que las tareas enviadas sean, una vez completadas, colocadas en una cola accesible usando take. La clase es lo suficientemente liviana para ser adecuada para uso transitorio al procesar grupos de tareas.

NOTA: Esta solución no se bloquea como lo hace ExecutorService.invokeAll() mientras espera que finalicen todos los trabajos. Entonces, aunque el más simple , también es el más limitado ya que bloqueará el hilo principal de la aplicación. No es bueno si desea conocer el estado de lo que está sucediendo, o hacer otras cosas mientras se están ejecutando estos trabajos, como el procesamiento posterior de los resultados o la producción y el resultado agregado incremental.