Java: ExecutorService que bloquea el envío después de un determinado tamaño de cola

Estoy tratando de codificar una solución en la que una única hebra produce tareas intensivas de E / S que se pueden realizar en paralelo. Cada tarea tiene importantes datos en memoria. Así que quiero poder limitar el número de tareas pendientes en un momento.

Si creo ThreadPoolExecutor de esta manera:

ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(maxQueue)); 

Luego el executor.submit(callable) lanza RejectedExecutionException cuando la cola se llena y todos los hilos ya están ocupados.

¿Qué puedo hacer para bloquear executor.submit(callable) cuando la cola está llena y todos los hilos están ocupados?

EDITAR : Intenté esto :

 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 

Y de alguna manera logra el efecto que quiero lograr, pero de una manera poco elegante (básicamente, los hilos rechazados se ejecutan en el hilo de llamada, por lo que esto impide que el hilo de llamada envíe más).

EDITAR: (5 años después de hacer la pregunta)

Para cualquiera que lea esta pregunta y sus respuestas, no tome la respuesta aceptada como una solución correcta. Por favor, lea todas las respuestas y comentarios.

He hecho lo mismo. El truco es crear un BlockingQueue donde el método offer () es realmente un put (). (puedes usar cualquier base de BlockingQueue impl que quieras).

 public class LimitedQueue extends LinkedBlockingQueue { public LimitedQueue(int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } } 

Tenga en cuenta que esto solo funciona para el grupo de subprocesos donde corePoolSize==maxPoolSize así que tenga cuidado allí (ver comentarios).

Aquí es cómo resolví esto en mi extremo:

(Nota: esta solución bloquea el hilo que envía el invocable, por lo que evita que se genere RejectedExecutionException)

 public class BoundedExecutor extends ThreadPoolExecutor{ private final Semaphore semaphore; public BoundedExecutor(int bound) { super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); semaphore = new Semaphore(bound); } /**Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ public  Future submitButBlockIfFull(final Callable task) throws InterruptedException{ semaphore.acquire(); return submit(task); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); semaphore.release(); } } 

La respuesta actualmente aceptada tiene un problema potencialmente importante: cambia el comportamiento de ThreadPoolExecutor.execute de modo que si tiene un corePoolSize < maxPoolSize , la lógica ThreadPoolExecutor nunca agregará trabajadores adicionales más allá del núcleo.

Desde ThreadPoolExecutor .execute (Runnable):

  if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); 

Específicamente, ese último bloque 'else' nunca será golpeado.

Una mejor alternativa es hacer algo similar a lo que ya está haciendo OP: use un RejectedExecutionHandler para hacer la misma lógica de presentación:

 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e); } } 

Hay algunas cosas a tener en cuenta con este enfoque, como se señala en los comentarios (en referencia a esta respuesta ):

  1. Si corePoolSize==0 , entonces hay una condición de carrera donde todos los hilos en el grupo pueden morir antes de que la tarea sea visible
  2. Usar una implementación que envuelva las tareas de cola (no aplicable a ThreadPoolExecutor ) dará como resultado problemas a menos que el manejador también lo envuelva de la misma manera.

Teniendo en cuenta estos inconvenientes, esta solución funcionará para la mayoría de los ThreadPoolExecutors típicos, y manejará adecuadamente el caso donde corePoolSize < maxPoolSize .

Creo que es tan simple como usar un ArrayBlockingQueue lugar de un LinkedBlockingQueue .

Ignorame … eso es totalmente incorrecto. ThreadPoolExecutor llama a Queue#offer no put que tendrá el efecto que necesita.

Puede extender ThreadPoolExecutor y proporcionar una implementación de execute(Runnable) que las llamadas put en lugar de la offer .

Eso no parece una respuesta completamente satisfactoria, me temo.

Tuve el problema similar y lo implementé usando los beforeExecute/afterExecute desde ThreadPoolExecutor :

 import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Blocks current task execution if there is not enough resources for it. * Maximum task count usage controlled by maxTaskCount property. */ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final ReentrantLock taskLock = new ReentrantLock(); private final Condition unpaused = taskLock.newCondition(); private final int maxTaskCount; private volatile int currentTaskCount; public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, int maxTaskCount) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.maxTaskCount = maxTaskCount; } /** * Executes task if there is enough system resources for it. Otherwise * waits. */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); taskLock.lock(); try { // Spin while we will not have enough capacity for this job while (maxTaskCount < currentTaskCount) { try { unpaused.await(); } catch (InterruptedException e) { t.interrupt(); } } currentTaskCount++; } finally { taskLock.unlock(); } } /** * Signalling that one more task is welcome */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); taskLock.lock(); try { currentTaskCount--; unpaused.signalAll(); } finally { taskLock.unlock(); } } } 

Esto debería ser lo suficientemente bueno para ti. Por cierto, la implementación original se basó en el tamaño de la tarea porque una tarea podía ser más grande 100 veces que otra y enviar dos tareas enormes estaba matando la caja, pero ejecutar una grande y bastante pequeña estaba bien. Si sus tareas intensivas de E / S son más o menos del mismo tamaño, podría utilizar esta clase; de ​​lo contrario, hágamelo saber y publicaré la implementación basada en el tamaño.

PD. Desea comprobar ThreadPoolExecutor javadoc. Es una guía muy útil para el usuario de Doug Lea sobre cómo se puede personalizar fácilmente.

Sé que esta es una pregunta antigua, pero tenía un problema similar: crear nuevas tareas era muy rápido y si había demasiadas OutOfMemoryError porque las tareas existentes no se completaban lo suficientemente rápido.

En mi caso, Callables se envían y necesito el resultado, por lo tanto, necesito almacenar todos los Futures devueltos por executor.submit() . Mi solución fue poner los Futures en una BlockingQueue con un tamaño máximo. Una vez que la cola está llena, no se generan más tareas hasta que se completen algunas (elementos eliminados de la cola). En pseudo-código:

 final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads); final LinkedBlockingQueue futures = new LinkedBlockingQueue<>(maxQueueSize); try { Thread taskGenerator = new Thread() { @Override public void run() { while (reader.hasNext) { Callable task = generateTask(reader.next()); Future future = executor.submit(task); try { // if queue is full blocks until a task // is completed and hence no future tasks are submitted. futures.put(compoundFuture); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } executor.shutdown(); } } taskGenerator.start(); // read from queue as long as task are being generated // or while Queue has elements in it while (taskGenerator.isAlive() || !futures.isEmpty()) { Future compoundFuture = futures.take(); // do something } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } catch (ExecutionException ex) { throw new MyException(ex); } finally { executor.shutdownNow(); } 

Implementé una solución siguiendo el patrón del decorador y usando un semáforo para controlar el número de tareas ejecutadas. Puedes usarlo con cualquier Executor y:

  • Especifique el máximo de tareas en curso
  • Especifique el tiempo máximo de espera para esperar un permiso de ejecución de tareas (si se supera el tiempo de espera y no se adquiere ningún permiso, se lanza una RejectedExecutionException )
 import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.time.Duration; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; public class BlockingOnFullQueueExecutorDecorator implements Executor { private static final class PermitReleasingDecorator implements Runnable { @Nonnull private final Runnable delegate; @Nonnull private final Semaphore semaphore; private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) { this.delegate = task; this.semaphore = semaphoreToRelease; } @Override public void run() { try { this.delegate.run(); } finally { // however execution goes, release permit for next task this.semaphore.release(); } } @Override public final String toString() { return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate); } } @Nonnull private final Semaphore taskLimit; @Nonnull private final Duration timeout; @Nonnull private final Executor delegate; public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) { this.delegate = Objects.requireNonNull(executor, "'executor' must not be null"); if (maximumTaskNumber < 1) { throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber)); } this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null"); if (this.timeout.isNegative()) { throw new IllegalArgumentException("'maximumTimeout' must not be negative"); } this.taskLimit = new Semaphore(maximumTaskNumber); } @Override public final void execute(final Runnable command) { Objects.requireNonNull(command, "'command' must not be null"); try { // attempt to acquire permit for task execution if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate)); } } catch (final InterruptedException e) { // restore interrupt status Thread.currentThread().interrupt(); throw new IllegalStateException(e); } this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit)); } @Override public final String toString() { return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(), this.timeout, this.delegate); } }