¿Cómo bloquear el método submit () de ThreadPoolExecutor si está saturado?

Quiero crear un ThreadPoolExecutor modo que cuando haya alcanzado su tamaño máximo y la cola esté llena, el método submit() se bloqueará cuando intente agregar nuevas tareas. ¿Debo implementar un RejectedExecutionHandler personalizado para eso o hay una forma existente de hacerlo utilizando una biblioteca Java estándar?

Una de las posibles soluciones que acabo de encontrar:

 public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException { semaphore.acquire(); try { exec.execute(new Runnable() { public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore.release(); throw e; } } } 

¿Hay alguna otra solución? Prefiero algo basado en RejectedExecutionHandler ya que parece una forma estándar de manejar tales situaciones.

Puedes usar ThreadPoolExecutor y un bloqueQueue:

 public class ImageManager { BlockingQueue blockingQueue = new ArrayBlockingQueue(blockQueueSize); RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); private ExecutorService executorService = new ThreadPoolExecutor(numOfThread, numOfThread, 0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler); private int downloadThumbnail(String fileListPath){ executorService.submit(new yourRunnable()); } } 

Vea cuatro alternativas para hacer esto: Crear un NotifyingBlockingThreadPoolExecutor

Debería usar CallerRunsPolicy , que ejecuta la tarea rechazada en el hilo de llamada. De esta forma, no puede enviar nuevas tareas al ejecutor hasta que se realice esa tarea, en ese momento habrá algunos hilos de grupo libres o el proceso se repetirá.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

De los documentos:

Tareas rechazadas

Las tareas nuevas enviadas en ejecución de método (java.lang.Runnable) se rechazarán cuando se haya cerrado el Ejecutor, y también cuando el Ejecutor use límites finitos para los hilos máximos y la capacidad de la cola de trabajo, y esté saturado. En cualquier caso, el método de ejecución invoca el método RejectedExecutionHandler.rejectedExecution (java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) de RejectedExecutionHandler. Se proporcionan cuatro políticas de controlador predefinidas:

  1. En el valor predeterminado ThreadPoolExecutor.AbortPolicy, el controlador arroja un tiempo de ejecución RejectedExecutionException en el momento del rechazo.
  2. En ThreadPoolExecutor.CallerRunsPolicy, el hilo que invoca ejecutar se ejecuta la tarea. Esto proporciona un mecanismo de control de retroalimentación simple que ralentizará la velocidad con la que se envían las tareas nuevas.
  3. En ThreadPoolExecutor.DiscardPolicy, una tarea que no se puede ejecutar simplemente se descarta.
  4. En ThreadPoolExecutor.DiscardOldestPolicy, si el ejecutor no se cierra, la tarea en la cabecera de la cola de trabajo se elimina y luego se vuelve a intentar la ejecución (que puede fallar nuevamente, haciendo que esto se repita).

Además, asegúrese de utilizar una cola limitada, como ArrayBlockingQueue, cuando llame al constructor ThreadPoolExecutor . De lo contrario, nada será rechazado.

Editar: en respuesta a su comentario, configure el tamaño de ArrayBlockingQueue para que sea igual al tamaño máximo del grupo de subprocesos y use AbortPolicy.

Editar 2: Ok, veo a lo que te refieres. ¿Qué beforeExecute() con esto: anula el método beforeExecute() para verificar que getActiveCount() no exceda getMaximumPoolSize() y, si lo hace, duerma e intente de nuevo?

Hibernate tiene una BlockPolicy que es simple y puede hacer lo que quiera:

Ver: ejecutores.java

 /** * A handler for rejected tasks that will have the caller block until * space is available. */ public static class BlockPolicy implements RejectedExecutionHandler { /** * Creates a BlockPolicy. */ public BlockPolicy() { } /** * Puts the Runnable to the blocking queue, effectively blocking * the delegating thread until space is available. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { try { e.getQueue().put( r ); } catch (InterruptedException e1) { log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r ); } } } 

La respuesta BoundedExecutor citada anteriormente de Java Concurrency in Practice solo funciona correctamente si utiliza una cola ilimitada para el Ejecutor, o el límite del semáforo no es mayor que el tamaño de la cola. El semáforo es estado compartido entre el subproceso que envía y los subprocesos del grupo, lo que permite saturar el ejecutor incluso si el tamaño de cola

El uso de CallerRunsPolicy solo es válido si sus tareas no se ejecutan para siempre, en cuyo caso su hilo de envío permanecerá en rejectedExecution para siempre, y una mala idea si sus tareas tardan mucho tiempo en ejecutarse, porque el hilo que envía no puede enviar ninguna nueva tareas o hacer cualquier otra cosa si está ejecutando una tarea en sí.

Si eso no es aceptable, sugiero verificar el tamaño de la cola acotada del ejecutor antes de enviar una tarea. Si la cola está llena, espere un momento antes de volver a intentar enviarla. El rendimiento sufrirá, pero sugiero que es una solución más simple que muchas de las otras soluciones propuestas y tiene la garantía de que ninguna tarea será rechazada.

La siguiente clase rodea un ThreadPoolExecutor y usa un semáforo para bloquear y luego la cola de trabajo está llena:

 public final class BlockingExecutor { private final Executor executor; private final Semaphore semaphore; public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) { BlockingQueue queue = new LinkedBlockingQueue(); this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory); this.semaphore = new Semaphore(queueSize + maxPoolSize); } private void execImpl (final Runnable command) throws InterruptedException { semaphore.acquire(); try { executor.execute(new Runnable() { @Override public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { // will never be thrown with an unbounded buffer (LinkedBlockingQueue) semaphore.release(); throw e; } } public void execute (Runnable command) throws InterruptedException { execImpl(command); } } 

Esta clase contenedora se basa en una solución dada en el libro Java Concurrency in Practice por Brian Goetz. La solución en el libro solo toma dos parámetros de constructor: un Executor y un límite utilizado para el semáforo. Esto se muestra en la respuesta dada por Fixpoint. Hay un problema con ese enfoque: puede ponerse en un estado donde los hilos de la agrupación están ocupados, la cola está llena, pero el semáforo acaba de liberar un permiso. ( semaphore.release() en el bloque finally). En este estado, una nueva tarea puede tomar el permiso recién lanzado, pero se rechaza porque la cola de tareas está llena. Por supuesto, esto no es algo que quieras; quieres bloquear en este caso.

Para resolver esto, debemos usar una cola ilimitada , como JCiP menciona claramente. El semáforo actúa como guardia, dando el efecto de un tamaño de cola virtual. Esto tiene el efecto secundario de que es posible que la unidad pueda contener maxPoolSize + virtualQueueSize + maxPoolSize . ¿Porqué es eso? Debido al semaphore.release() en el bloque finally. Si todos los subprocesos de grupo invocan esta instrucción al mismo tiempo, se maxPoolSize permisos de maxPoolSize , lo que permite el mismo número de tareas para ingresar a la unidad. Si estuviéramos utilizando una cola limitada, aún estaría llena, lo que daría como resultado una tarea rechazada. Ahora, porque sabemos que esto solo ocurre cuando un hilo de grupo está casi terminado, esto no es un problema. Sabemos que el hilo de la agrupación no se bloqueará, por lo que pronto se eliminará una tarea de la cola.

Sin embargo, puedes usar una cola limitada. Solo asegúrese de que su tamaño sea igual a virtualQueueSize + maxPoolSize . Los tamaños mayores son inútiles, el semáforo evitará que entren más elementos. Los tamaños más pequeños darán lugar a tareas rechazadas. La probabilidad de que las tareas sean rechazadas aumenta a medida que disminuye el tamaño. Por ejemplo, supongamos que quiere un ejecutor vinculado con maxPoolSize = 2 y virtualQueueSize = 5. Luego tome un semáforo con 5 + 2 = 7 permisos y un tamaño de cola real de 5 + 2 = 7. El número real de tareas que pueden estar en la unidad es entonces 2 + 5 + 2 = 9. Cuando el ejecutor está lleno (5 tareas en cola, 2 en el grupo de subprocesos, por lo que 0 permite disponibles) y TODOS los subprocesos de grupo liberan sus permisos, se pueden tomar exactamente 2 permisos para las tareas que entran.

Ahora la solución de JCiP es algo engorrosa de usar, ya que no impone todas estas restricciones (cola ilimitada, o limitada con esas restricciones matemáticas, etc.). Creo que esto solo sirve como un buen ejemplo para demostrar cómo se pueden crear nuevas clases de seguridad de subprocesos basadas en las partes que ya están disponibles, pero no como una clase completamente reutilizable y reutilizable. No creo que esta sea la intención del autor.

Lo sé, es un truco, pero en mi opinión el truco más limpio entre los que se ofrecen aquí 😉

Debido a que ThreadPoolExecutor utiliza la cola de locking “oferta” en lugar de “poner”, permite anular el comportamiento de “oferta” de la cola de locking:

 class BlockingQueueHack extends ArrayBlockingQueue { BlockingQueueHack(int size) { super(size); } public boolean offer(T task) { try { this.put(task); } catch (InterruptedException e) { throw new RuntimeException(e); } return true; } } ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5)); 

Lo probé y parece funcionar. La implementación de algunas políticas de tiempo de espera se deja como un ejercicio de lectura.

puedes usar un RejectedExecutionHandler personalizado como este

 ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size max_handlers, // max size timeout_in_seconds, // idle timeout TimeUnit.SECONDS, queue, new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // This will block if the queue is full try { executor.getQueue().put(r); } catch (InterruptedException e) { System.err.println(e.getMessage()); } } }); 

Cree su propia cola de locking para ser utilizada por el Ejecutor, con el comportamiento de locking que está buscando, siempre devolviendo la capacidad restante disponible (asegurándose de que el ejecutor no intente crear más hilos que su grupo central, o desencadenar el controlador de rechazo).

Creo que esto te dará el comportamiento de locking que estás buscando. Un controlador de rechazo nunca se ajustará a la factura, ya que eso indica que el ejecutor no puede realizar la tarea. Lo que podría imaginar es que obtienes algún tipo de “espera ocupada” en el controlador. Eso no es lo que quieres, quieres una cola para el ejecutor que bloquea a la persona que llama …

Deberías echar un vistazo a este enlace (notifying-blocking-thread-pool) que resume varias soluciones y finalmente da una elegante con notificación.

Para evitar problemas con la solución @FixPoint. Se podría usar ListeningExecutorService y liberar el semáforo onSuccess y onFailure dentro de FutureCallback.

Recientemente encontré que esta pregunta tiene el mismo problema. El OP no lo dice explícitamente, pero no queremos usar el RejectedExecutionHandler que ejecuta una tarea en el hilo del remitente, porque esto subutilizará los hilos de trabajo si esta tarea es de larga ejecución.

Leyendo todas las respuestas y comentarios, en particular la solución defectuosa con el semáforo o usando afterExecute . afterExecute más de cerca el código del ThreadPoolExecutor para ver si hay alguna salida. Me sorprendió ver que hay más de 2000 líneas de código (comentadas), algunas de las cuales me hacen sentir mareado . Dado el requisito bastante simple que realmente tengo — un productor, varios consumidores, dejar que el productor bloquee cuando ningún consumidor puede tomar el trabajo — decidí lanzar mi propia solución. No es un ExecutorService sino solo un Executor . Y no adapta la cantidad de subprocesos a la carga de trabajo, pero solo contiene un número fijo de subprocesos, que también se ajusta a mis requisitos. Aquí está el código. Siéntase libre de despotricar al respecto 🙂

 package x; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; /** * distributes {@code Runnable}s to a fixed number of threads. To keep the * code lean, this is not an {@code ExecutorService}. In particular there is * only very simple support to shut this executor down. */ public class ParallelExecutor implements Executor { // other bounded queues work as well and are useful to buffer peak loads private final BlockingQueue workQueue = new SynchronousQueue(); private final Thread[] threads; /*+**********************************************************************/ /** * creates the requested number of threads and starts them to wait for * incoming work */ public ParallelExecutor(int numThreads) { this.threads = new Thread[numThreads]; for(int i=0; i 

Creo que hay una forma bastante elegante de resolver este problema utilizando java.util.concurrent.Semaphore y delegando el comportamiento de Executor.newFixedThreadPool . El nuevo servicio ejecutor solo ejecutará una nueva tarea cuando haya un hilo para hacerlo. El locking es gestionado por Semáforo con número de permisos igual al número de subprocesos. Cuando una tarea finaliza, devuelve un permiso.

 public class FixedThreadBlockingExecutorService extends AbstractExecutorService { private final ExecutorService executor; private final Semaphore blockExecution; public FixedThreadBlockingExecutorService(int nTreads) { this.executor = Executors.newFixedThreadPool(nTreads); blockExecution = new Semaphore(nTreads); } @Override public void shutdown() { executor.shutdown(); } @Override public List shutdownNow() { return executor.shutdownNow(); } @Override public boolean isShutdown() { return executor.isShutdown(); } @Override public boolean isTerminated() { return executor.isTerminated(); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return executor.awaitTermination(timeout, unit); } @Override public void execute(Runnable command) { blockExecution.acquireUninterruptibly(); executor.execute(() -> { try { command.run(); } finally { blockExecution.release(); } }); } 

Tuve la misma necesidad en el pasado: un tipo de cola de locking con un tamaño fijo para cada cliente respaldado por un grupo de subprocesos compartidos. Terminé escribiendo mi propio tipo de ThreadPoolExecutor:

UserThreadPoolExecutor (cola de locking (por cliente) + threadpool (compartido entre todos los clientes))

Ver: https://github.com/d4rxh4wx/UserThreadPoolExecutor

Cada UserThreadPoolExecutor recibe un número máximo de hilos de un ThreadPoolExecutor compartido

Cada UserThreadPoolExecutor puede:

  • enviar una tarea al ejecutor del grupo de subprocesos compartido si no se alcanza su cuota. Si se alcanza su cuota, el trabajo está en cola (locking no consuntivo esperando CPU). Una vez que se completa una de las tareas enviadas, la cuota se reduce, permitiendo que otra tarea en espera sea enviada al ThreadPoolExecutor.
  • Espere a que se completen las tareas restantes

Encontré esta política de rechazo en el cliente de búsqueda elástica. Bloquea el hilo de la persona que llama en la cola de locking. Código debajo-

  static class ForceQueuePolicy implements XRejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { //should never happen since we never wait throw new EsRejectedExecutionException(e); } } @Override public long rejected() { return 0; } } 

Recientemente tuve la necesidad de lograr algo similar, pero en un ScheduledExecutorService .

También tuve que asegurarme de que manejaba la demora que se pasaba en el método y me aseguraba de que la tarea se enviara para ejecutarse en el momento en que la persona que llama espera o simplemente falla, lanzando así una RejectedExecutionException .

Otros métodos de ScheduledThreadPoolExecutor para ejecutar o enviar una tarea internamente llaman a #schedule que a su vez invocará los métodos anulados.

 import java.util.concurrent.*; public class BlockingScheduler extends ScheduledThreadPoolExecutor { private final Semaphore maxQueueSize; public BlockingScheduler(int corePoolSize, ThreadFactory threadFactory, int maxQueueSize) { super(corePoolSize, threadFactory, new AbortPolicy()); this.maxQueueSize = new Semaphore(maxQueueSize); } @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay)); return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS); } @Override public  ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay)); return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS); } @Override public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay)); return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS); } @Override public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit) { final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay)); return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS); } @Override protected void afterExecute(Runnable runnable, Throwable t) { super.afterExecute(runnable, t); try { if (t == null && runnable instanceof Future) { try { ((Future) runnable).get(); } catch (CancellationException | ExecutionException e) { t = e; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) { System.err.println(t); } } finally { releaseQueueUsage(); } } private long beforeSchedule(Runnable runnable, long delay) { try { return getQueuePermitAndModifiedDelay(delay); } catch (InterruptedException e) { getRejectedExecutionHandler().rejectedExecution(runnable, this); return 0; } } private long beforeSchedule(Callable callable, long delay) { try { return getQueuePermitAndModifiedDelay(delay); } catch (InterruptedException e) { getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this); return 0; } } private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException { final long beforeAcquireTimeStamp = System.currentTimeMillis(); maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS); final long afterAcquireTimeStamp = System.currentTimeMillis(); return afterAcquireTimeStamp - beforeAcquireTimeStamp; } private void releaseQueueUsage() { maxQueueSize.release(); } } 

Tengo el código aquí, agradeceré cualquier comentario. https://github.com/AmitabhAwasthi/BlockingScheduler

Aquí está la solución que parece funcionar muy bien. Se llama NotifyingBlockingThreadPoolExecutor .

Progtwig de demostración.

Editar: hay un problema con este código, el método await () tiene errores. Llamar a shutdown () + awaitTermination () parece funcionar bien.

No siempre me gusta el CallerRunsPolicy, especialmente porque permite que la tarea rechazada ‘omita la cola’ y se ejecute antes de las tareas que se enviaron anteriormente. Además, ejecutar la tarea en el hilo de llamada puede llevar mucho más tiempo que esperar a que esté disponible la primera ranura.

Resolví este problema usando un RejectedExecutionHandler personalizado, que simplemente bloquea el hilo de llamada durante un momento y luego intenta enviar la tarea de nuevo:

 public class BlockWhenQueueFull implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // The pool is full. Wait, then try again. try { long waitMs = 250; Thread.sleep(waitMs); } catch (InterruptedException interruptedException) {} executor.execute(r); } } 

Esta clase solo se puede usar en el ejecutor de grupo de hilos como un RejectedExecutinHandler como cualquier otro, por ejemplo:

 executorPool = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new SynchronousQueue(), new BlockWhenQueueFull()); 

El único inconveniente que veo es que el hilo de llamada podría bloquearse un poco más de lo estrictamente necesario (hasta 250 ms). Además, dado que este ejecutor se está llamando efectivamente de forma recursiva, esperar mucho tiempo para que un hilo esté disponible (horas) puede provocar un desbordamiento de la stack.

Sin embargo, personalmente me gusta este método. Es compacto, fácil de entender y funciona bien.