Java ExecutorService: awaitTermination de todas las tareas creadas recursivamente

Uso un ExecutorService para ejecutar una tarea. Esta tarea puede crear recursivamente otras tareas que se envían al mismo ExecutorService y esas tareas secundarias también pueden hacer eso.

Ahora tengo el problema de que quiero esperar hasta que se hayan completado todas las tareas (es decir, todas las tareas hayan finalizado y no hayan enviado nuevas) antes de continuar.

No puedo llamar a ExecutorService.shutdown() en el hilo principal porque esto impide que el ExecutorService acepte nuevas tareas.

Y Calling ExecutorService.awaitTermination() parece no hacer nada si no se ha llamado al shutdown .

Así que estoy un poco atrapado aquí. No puede ser tan difícil para ExecutorService ver que todos los trabajadores están inactivos, ¿o sí? La única solución poco elegante que pude encontrar es usar directamente un ThreadPoolExecutor y consultar su getPoolSize() vez en cuando. ¿Realmente no hay una mejor manera de hacerlo?

Si inicialmente se desconoce la cantidad de tareas en el árbol de tareas recursivas, quizás la forma más fácil sea implementar su propia primitiva de sincronización, una especie de “semáforo inverso”, y compartirla entre sus tareas. Antes de enviar cada tarea, incrementa un valor, cuando la tarea se completa, disminuye ese valor y espera hasta que el valor sea 0.

Implementarlo como una primitiva separada explícitamente llamada de tareas desacopla esta lógica de la implementación del grupo de subprocesos y le permite enviar varios árboles independientes de tareas recursivas en el mismo grupo.

Algo como esto:

 public class InverseSemaphore { private int value = 0; private Object lock = new Object(); public void beforeSubmit() { synchronized(lock) { value++; } } public void taskCompleted() { synchronized(lock) { value--; if (value == 0) lock.notifyAll(); } } public void awaitCompletion() throws InterruptedException { synchronized(lock) { while (value > 0) lock.wait(); } } } 

Tenga en cuenta que se debe llamar a taskCompleted() dentro de un bloque finally , para hacerlo inmune a posibles excepciones.

También tenga en cuenta que beforeSubmit() debe ser llamado por el hilo de envío antes de enviar la tarea, no por la tarea en sí, para evitar la posible “falsa finalización” cuando se completan tareas antiguas y aún no se inician nuevas.

EDITAR: Problema importante con el patrón de uso reparado.

Este es realmente un candidato ideal para un Phaser. Java 7 está saliendo con esta nueva clase. Es un CountdonwLatch / CyclicBarrier flexible. Puede obtener una versión estable en el sitio de interés de JSR 166 .

La forma en que es un CountdownLatch / CyclicBarrier más flexible es porque no solo es compatible con un número desconocido de partes (hilos) sino que también es reutilizable (es ahí donde entra la parte de fase)

Para cada tarea que envíes, te registrarás; cuando la hayas completado, llegarás. Esto puede hacerse recursivamente.

 Phaser phaser = new Phaser(); ExecutorService e = // Runnable recursiveRunnable = new Runnable(){ public void run(){ //do work recursively if you have to if(shouldBeRecursive){ phaser.register(); e.submit(recursiveRunnable); } phaser.arrive(); } } public void doWork(){ int phase = phaser.getPhase(); phaser.register(); e.submit(recursiveRunnable); phaser.awaitAdvance(phase); } 

Editar: Gracias @depthofreality por señalar la condición de carrera en mi ejemplo anterior. Lo estoy actualizando para que el hilo de ejecución solo espere el avance de la fase actual, ya que bloquea para que se complete la función recursiva.

El número de fase no se disparará hasta que el número arrive s == register s. Dado que antes de cada invocación recursiva invoca register , se producirá un incremento de fase cuando se completen todas las invocaciones.

Wow, ustedes son rápidos 🙂

Gracias por todas las sugerencias. Los futuros no se integran fácilmente con mi modelo porque no sé cuántos runnables están progtwigdos de antemano. Entonces, si mantengo viva una tarea parental solo para esperar a que terminen las tareas secundarias recursivas, tengo mucha basura por ahí.

Resolví mi problema usando la sugerencia de AtomicInteger. Básicamente, he subclasificado ThreadPoolExecutor e incremento el contador en llamadas a execute () y decremento en llamadas a afterExecute (). Cuando el contador obtiene 0, llamo shutdown (). Esto parece funcionar para mis problemas, no estoy seguro de si esa es una buena manera de hacerlo. Especialmente, supongo que solo usas execute () para agregar Runnables.

Como nodo lateral: primero traté de registrar después de Ejecutar () el número de Runnables en la cola y el número de trabajadores que están activos y el apagado cuando esos son 0; pero eso no funcionó porque no todos los Runnables aparecieron en la cola y getActiveCount () tampoco hizo lo que yo esperaba.

De todos modos, aquí está mi solución: (si alguien encuentra serios problemas con esto, por favor hágamelo saber 🙂

 public class MyThreadPoolExecutor extends ThreadPoolExecutor { private final AtomicInteger executing = new AtomicInteger(0); public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime, TimeUnit seconds, BlockingQueue queue) { super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue); } @Override public void execute(Runnable command) { //intercepting beforeExecute is too late! //execute() is called in the parent thread before it terminates executing.incrementAndGet(); super.execute(command); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); int count = executing.decrementAndGet(); if(count == 0) { this.shutdown(); } } } 

Puede crear su propio grupo de subprocesos que amplíe ThreadPoolExecutor . Desea saber cuándo se ha enviado una tarea y cuándo se completa.

 public class MyThreadPoolExecutor extends ThreadPoolExecutor { private int counter = 0; public MyThreadPoolExecutor() { super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue()); } @Override public synchronized void execute(Runnable command) { counter++; super.execute(command); } @Override protected synchronized void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); counter--; notifyAll(); } public synchronized void waitForExecuted() throws InterruptedException { while (counter == 0) wait(); } } 

Use un futuro para sus tareas (en lugar de enviar Runnable ), una callback actualiza su estado cuando se completa, por lo que puede usar Future.isDone para rastrear el estado de todas sus tareas.

(Mea culpa: es un ‘bit’ pasado mi hora de acostarse;) pero aquí hay un primer bash de un latch dynamic):

 package oss.alphazero.sto4958330; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class DynamicCountDownLatch { @SuppressWarnings("serial") private static final class Sync extends AbstractQueuedSynchronizer { private final CountDownLatch toplatch; public Sync() { setState(0); this.toplatch = new CountDownLatch(1); } @Override protected int tryAcquireShared(int acquires){ try { toplatch.await(); } catch (InterruptedException e) { throw new RuntimeException("Interrupted", e); } return getState() == 0 ? 1 : -1; } public boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } public boolean tryExtendState(int acquires) { for (;;) { int s = getState(); int exts = s+1; if (compareAndSetState(s, exts)) { toplatch.countDown(); return exts > 0; } } } } private final Sync sync; public DynamicCountDownLatch(){ this.sync = new Sync(); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public void join() { sync.tryExtendState(1); } } 

Este pestillo introduce un nuevo método join () para la API CountDownLatch existente (clonada), que las tareas utilizan para señalar su entrada en el grupo de tareas más grande.

El pestillo pasa de la tarea principal a la tarea secundaria. Cada tarea, según el patrón de Suraj, primero ‘une ()’ el pestillo, hace su tarea (), y luego countDown ().

Para abordar situaciones donde el hilo principal inicia el grupo de tareas y luego espera () inmediatamente antes de que cualquiera de los subprocesos de la tarea haya tenido la oportunidad de unirse (), el topLatch se usa en la clase de Sync interna. Este es un pestillo que contará hacia abajo en cada unión (); solo la primera cuenta regresiva es por supuesto significativa, ya que todas las subsecuentes son nops.

La implementación inicial anterior introduce una especie de arruga semántica, ya que se supone que tryAcquiredShared (int) no está lanzando una excepción InterruptedException, pero sí tenemos que lidiar con la interrupción de la espera en el topLatch.

¿Es esto una mejora sobre la propia solución de OP usando contadores atómicos? Yo diría que probablemente no IFF es insistente sobre el uso de ejecutores, pero creo que es un enfoque alternativo igualmente válido que usa el AQS en ese caso, y también se puede utilizar con hilos generics.

Crit beckers compañeros.

Si desea utilizar las clases JSR166y, por ejemplo, Phaser o Fork / Join, cualquiera de las cuales podría funcionar para usted, siempre puede descargar el soporte de Java 6 desde: http://gee.cs.oswego.edu/dl/concurrency -interest / y usar eso como base en lugar de escribir una solución completamente casera. Luego, cuando sale 7, puede soltar la dependencia en el backport y cambiar algunos nombres de paquetes.

(Divulgación completa: hemos estado utilizando el LinkedTransferQueue en prod desde hace un tiempo. Sin problemas)

Debo decir que las soluciones descritas anteriormente del problema con la tarea de llamada recursiva y esperar para las tareas de suborden final no me satisfacen. Mi solución está inspirada en la documentación original de Oracle allí: CountDownLatch y ejemplo allí: recursos humanos CountDownLatch .

El primer hilo común en proceso en el caso de la clase HRManagerCompact tiene el pestillo de espera para dos subprocesos de hija, que tiene pestillos de espera para los subsiguientes subprocesos de 2 hijas … etc.

Por supuesto, el locking se puede establecer en un valor diferente de 2 (en el constructor de CountDownLatch), así como también se puede establecer el número de objetos ejecutables en iteración, es decir, ArrayList, pero debe corresponderse (el número de conteos debe ser igual al parámetro en el constructor CountDownLatch).

Tenga cuidado, el número de pestillos aumenta exponencialmente según la condición de restricción: ‘level.get () <2', así como el número de objetos. 1, 2, 4, 8, 16 ... y pestillos 0, 1, 2, 4 ... Como puede ver, para cuatro niveles (level.get () <4) habrá 15 hilos de espera y 7 pestillos en el tiempo, cuando se están ejecutando los 16 subprocesos máximos.

 package processes.countdownlatch.hr; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** Recursively latching running classes to wait for the peak threads * * @author hariprasad */ public class HRManagerCompact extends Thread { final int N = 2; // number of daughter's tasks for latch CountDownLatch countDownLatch; CountDownLatch originCountDownLatch; AtomicInteger level = new AtomicInteger(0); AtomicLong order = new AtomicLong(0); // id latched thread waiting for HRManagerCompact techLead1 = null; HRManagerCompact techLead2 = null; HRManagerCompact techLead3 = null; // constructor public HRManagerCompact(CountDownLatch countDownLatch, String name, AtomicInteger level, AtomicLong order){ super(name); this.originCountDownLatch=countDownLatch; this.level = level; this.order = order; } private void doIt() { countDownLatch = new CountDownLatch(N); AtomicInteger leveli = new AtomicInteger(level.get() + 1); AtomicLong orderi = new AtomicLong(Thread.currentThread().getId()); techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi); techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi); //techLead3 = new HRManagerCompact(countDownLatch, "third", leveli); techLead1.start(); techLead2.start(); //techLead3.start(); try { synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi); countDownLatch.await(); // wait actual thread } System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId()); Thread.sleep(10*level.intValue()); if (level.get() < 2) doIt(); Thread.yield(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } /*catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }*/ // TODO Auto-generated method stub System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId()); originCountDownLatch.countDown(); // count down } public static void main(String args[]){ AtomicInteger levelzero = new AtomicInteger(0); HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue())); hr.doIt(); } } 

Posible producto comentado (con alguna probabilidad):

 first: working... 1, 1, 10 // thread 1, first daughter's task (10) second: working... 1, 1, 11 // thread 1, second daughter's task (11) first: working... 2, 10, 12 // thread 10, first daughter's task (12) first: working... 2, 11, 14 // thread 11, first daughter's task (14) second: working... 2, 11, 15 // thread 11, second daughter's task (15) second: working... 2, 10, 13 // thread 10, second daughter's task (13) --- first: recruted 2, 10, 12 // finished 12 --- first: recruted 2, 11, 14 // finished 14 --- second: recruted 2, 10, 13 // finished 13 (now can be opened latch 10) --- second: recruted 2, 11, 15 // finished 15 (now can be opened latch 11) *** HR Manager waiting for recruitment to complete... 0, 0, 1 *** HR Manager waiting for recruitment to complete... 1, 1, 10 *** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened --- first: recruted 1, 1, 10 // finished 10 *** HR Manager waiting for recruitment to complete... 1, 1, 11 *** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened --- second: recruted 1, 1, 11 // finished 11 (now can be opened latch 1) *** Distribute Offer Letter, it means finished. 0, 0, 1 // latch on 1 opened 

Use CountDownLatch . Pase el objeto CountDownLatch a cada una de sus tareas y codifique sus tareas como a continuación.

 public void doTask() { // do your task latch.countDown(); } 

Mientras que el hilo que necesita esperar debería ejecutar el siguiente código:

 public void doWait() { latch.await(); } 

Pero, por supuesto, esto supone que ya conoce la cantidad de tareas secundarias para que pueda inicializar el recuento de la retención.

La única solución poco elegante que pude encontrar es usar directamente un ThreadPoolExecutor y consultar su getPoolSize () de vez en cuando. ¿Realmente no hay una mejor manera de hacerlo?

Tienes que usar los métodos shutdown() , awaitTermination () and shutdownNow() en una secuencia adecuada.

shutdown() : inicia un cierre ordenado en el que se ejecutan las tareas enviadas previamente, pero no se aceptarán nuevas tareas.

awaitTermination() : Bloquea hasta que todas las tareas hayan completado la ejecución después de una solicitud de cierre, o se agote el tiempo de espera, o se interrumpa el hilo actual, lo que ocurra primero.

shutdownNow() : intenta detener todas las tareas que se ejecutan activamente, detiene el procesamiento de las tareas en espera y devuelve una lista de las tareas que estaban esperando su ejecución.

Forma recomendada desde la página de documentación de Oracle de ExecutorService :

  void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } 

Puede reemplazar si la condición con la condición de tiempo en caso de larga duración en la finalización de las tareas de la siguiente manera:

Cambio

 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) 

A

  while(!pool.awaitTermination(60, TimeUnit.SECONDS)) { Thread.sleep(60000); } 

Puede consultar otras alternativas (excepto join() , que se puede usar con un hilo independiente) en:

espere hasta que todos los hilos terminen su trabajo en java