Bloque ThreadPoolExecutor cuando la cola está llena?

Estoy tratando de ejecutar muchas tareas usando un ThreadPoolExecutor. A continuación se muestra un ejemplo hipotético:

def workQueue = new ArrayBlockingQueue(3, false) def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue) for(int i = 0; i < 100000; i++) threadPoolExecutor.execute(runnable) 

El problema es que obtengo rápidamente una java.util.concurrent.RejectedExecutionException ya que el número de tareas excede el tamaño de la cola de trabajo. Sin embargo, el comportamiento deseado que estoy buscando es tener el bloque de hilos principal hasta que haya espacio en la cola. Cuál es la mejor manera de lograr esto?

En algunas circunstancias muy limitadas, puede implementar un java.util.concurrent.RejectedExecutionHandler que hace lo que necesita.

 RejectedExecutionHandler block = new RejectedExecutionHandler() { rejectedExecution(Runnable r, ThreadPoolExecutor executor) { executor.getQueue().put( r ); } }; ThreadPoolExecutor pool = new ... pool.setRejectedExecutionHandler(block); 

Ahora. Esta es una muy mala idea por los siguientes motivos

  • Es propenso a un punto muerto porque todos los hilos en el conjunto pueden morir antes de que lo que coloque en la cola sea visible. Mitigue esto estableciendo un tiempo razonable de mantener vivo.
  • La tarea no está envuelta de la manera que su ejecutor puede esperar. Muchas implementaciones de ejecutores envuelven sus tareas en algún tipo de objeto de seguimiento antes de la ejecución. Mira la fuente de los tuyos.
  • La API desaconseja agregar mediante getQueue (), y puede estar prohibido en algún momento.

Una estrategia casi siempre mejor es instalar ThreadPoolExecutor.CallerRunsPolicy que estrangulará tu aplicación ejecutando la tarea en el hilo que llama a execute ().

Sin embargo, a veces una estrategia de locking, con todos sus riesgos inherentes, es realmente lo que desea. Yo diría bajo estas condiciones

  • Solo tiene un hilo que ejecuta execute ()
  • Tienes que (o quieres) tener una longitud de cola muy pequeña
  • Es absolutamente necesario limitar el número de subprocesos que ejecutan este trabajo (generalmente por razones externas), y una estrategia de ejecución de llamada podría romper eso.
  • Sus tareas son de un tamaño impredecible, por lo que las ejecuciones de llamadas pueden provocar inanición si el grupo estuvo momentáneamente ocupado con 4 tareas breves y su ejecución de llamada de un solo hilo se quedó atascada con una grande.

Entonces, como digo. Rara vez se necesita y puede ser peligroso, pero ahí lo tienes.

Buena suerte.

Podría usar un semaphore para bloquear el acceso de los hilos al grupo.

 ExecutorService service = new ThreadPoolExecutor( 3, 3, 1, TimeUnit.HOURS, new ArrayBlockingQueue<>(6, false) ); Semaphore lock = new Semaphore(6); // equal to queue capacity for (int i = 0; i < 100000; i++ ) { try { lock.acquire(); service.submit(() -> { try { task.run(); } finally { lock.release(); } }); } catch (InterruptedException e) { throw new RuntimeException(e); } } 

Algunos errores

  • Solo use este patrón con un grupo de subprocesos fijos. Es poco probable que la cola esté llena con frecuencia, por lo que no se crearán nuevos hilos. Consulte los documentos de Java en ThreadPoolExecutor para obtener más detalles: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html Hay una forma de evitar esto, pero está fuera de scope de esta respuesta.
  • El tamaño de la cola debe ser mayor que el número de hilos centrales. Si tuviéramos que hacer el tamaño de la cola 3, lo que terminaría sucediendo es:

    • T0: los tres hilos están trabajando, la cola está vacía, no hay permisos disponibles.
    • T1: el hilo 1 finaliza, libera un permiso.
    • T2: el hilo 1 sondea la cola para el nuevo trabajo, no encuentra ninguno y espera .
    • T3: el hilo principal envía trabajo al grupo, el hilo 1 comienza a funcionar.

    El ejemplo anterior se traduce para enhebrar el hilo principal que bloquea el hilo 1. Puede parecer un período pequeño, pero ahora multiplica la frecuencia por días y meses. De repente, los cortos períodos de tiempo sumn una gran cantidad de tiempo desperdiciado.

Lo que debe hacer es ajustar su ThreadPoolExecutor en Executor, que limita explícitamente la cantidad de operaciones ejecutadas simultáneamente dentro de él:

  private static class BlockingExecutor implements Executor { final Semaphore semaphore; final Executor delegate; private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) { semaphore = new Semaphore(concurrentTasksLimit); this.delegate = delegate; } @Override public void execute(final Runnable command) { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); return; } final Runnable wrapped = () -> { try { command.run(); } finally { semaphore.release(); } }; delegate.execute(wrapped); } } 

Puede ajustar concurrentTasksLimit a threadPoolSize + queueSize de su delegado ejecutor y resolverá su problema

Aquí está mi fragmento de código en este caso:

 public void executeBlocking( Runnable command ) { if ( threadPool == null ) { logger.error( "Thread pool '{}' not initialized.", threadPoolName ); return; } ThreadPool threadPoolMonitor = this; boolean accepted = false; do { try { threadPool.execute( new Runnable() { @Override public void run() { try { command.run(); } // to make sure that the monitor is freed on exit finally { // Notify all the threads waiting for the resource, if any. synchronized ( threadPoolMonitor ) { threadPoolMonitor.notifyAll(); } } } } ); accepted = true; } catch ( RejectedExecutionException e ) { // Thread pool is full try { // Block until one of the threads finishes its job and exits. synchronized ( threadPoolMonitor ) { threadPoolMonitor.wait(); } } catch ( InterruptedException ignored ) { // return immediately break; } } } while ( !accepted ); } 

threadPool es una instancia local de java.util.concurrent.ExecutorService que ya se ha inicializado.

Resolví este problema usando un RejectedExecutionHandler personalizado, que simplemente bloquea el hilo de llamada durante un momento y luego intenta enviar la tarea de nuevo:

 public class BlockWhenQueueFull implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // The pool is full. Wait, then try again. try { long waitMs = 250; Thread.sleep(waitMs); } catch (InterruptedException interruptedException) {} executor.execute(r); } } 

Esta clase solo se puede usar en el ejecutor de grupo de hilos como un RejectedExecutionHandler como cualquier otro. En este ejemplo:

 executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull()) 

El único inconveniente que veo es que el hilo de llamada podría bloquearse un poco más de lo estrictamente necesario (hasta 250 ms). Para muchas tareas de ejecución breve, quizás disminuya el tiempo de espera a 10 ms o menos. Además, dado que este ejecutor se está llamando efectivamente de forma recursiva, esperar mucho tiempo para que un hilo esté disponible (horas) puede provocar un desbordamiento de la stack.

Sin embargo, personalmente me gusta este método. Es compacto, fácil de entender y funciona bien. ¿Me estoy perdiendo algo importante?

Esto es lo que terminé haciendo:

 int NUM_THREADS = 6; Semaphore lock = new Semaphore(NUM_THREADS); ExecutorService pool = Executors.newCachedThreadPool(); for (int i = 0; i < 100000; i++) { try { lock.acquire(); } catch (InterruptedException e) { throw new RuntimeException(e); } pool.execute(() -> { try { // Task logic } finally { lock.release(); } }); }