Estoy tratando de ejecutar muchas tareas usando un ThreadPoolExecutor. A continuación se muestra un ejemplo hipotético:
def workQueue = new ArrayBlockingQueue(3, false) def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue) for(int i = 0; i < 100000; i++) threadPoolExecutor.execute(runnable)
El problema es que obtengo rápidamente una java.util.concurrent.RejectedExecutionException ya que el número de tareas excede el tamaño de la cola de trabajo. Sin embargo, el comportamiento deseado que estoy buscando es tener el bloque de hilos principal hasta que haya espacio en la cola. Cuál es la mejor manera de lograr esto?
En algunas circunstancias muy limitadas, puede implementar un java.util.concurrent.RejectedExecutionHandler que hace lo que necesita.
RejectedExecutionHandler block = new RejectedExecutionHandler() { rejectedExecution(Runnable r, ThreadPoolExecutor executor) { executor.getQueue().put( r ); } }; ThreadPoolExecutor pool = new ... pool.setRejectedExecutionHandler(block);
Ahora. Esta es una muy mala idea por los siguientes motivos
Una estrategia casi siempre mejor es instalar ThreadPoolExecutor.CallerRunsPolicy que estrangulará tu aplicación ejecutando la tarea en el hilo que llama a execute ().
Sin embargo, a veces una estrategia de locking, con todos sus riesgos inherentes, es realmente lo que desea. Yo diría bajo estas condiciones
Entonces, como digo. Rara vez se necesita y puede ser peligroso, pero ahí lo tienes.
Buena suerte.
Podría usar un semaphore
para bloquear el acceso de los hilos al grupo.
ExecutorService service = new ThreadPoolExecutor( 3, 3, 1, TimeUnit.HOURS, new ArrayBlockingQueue<>(6, false) ); Semaphore lock = new Semaphore(6); // equal to queue capacity for (int i = 0; i < 100000; i++ ) { try { lock.acquire(); service.submit(() -> { try { task.run(); } finally { lock.release(); } }); } catch (InterruptedException e) { throw new RuntimeException(e); } }
Algunos errores
El tamaño de la cola debe ser mayor que el número de hilos centrales. Si tuviéramos que hacer el tamaño de la cola 3, lo que terminaría sucediendo es:
El ejemplo anterior se traduce para enhebrar el hilo principal que bloquea el hilo 1. Puede parecer un período pequeño, pero ahora multiplica la frecuencia por días y meses. De repente, los cortos períodos de tiempo sumn una gran cantidad de tiempo desperdiciado.
Lo que debe hacer es ajustar su ThreadPoolExecutor en Executor, que limita explícitamente la cantidad de operaciones ejecutadas simultáneamente dentro de él:
private static class BlockingExecutor implements Executor { final Semaphore semaphore; final Executor delegate; private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) { semaphore = new Semaphore(concurrentTasksLimit); this.delegate = delegate; } @Override public void execute(final Runnable command) { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); return; } final Runnable wrapped = () -> { try { command.run(); } finally { semaphore.release(); } }; delegate.execute(wrapped); } }
Puede ajustar concurrentTasksLimit a threadPoolSize + queueSize de su delegado ejecutor y resolverá su problema
Aquí está mi fragmento de código en este caso:
public void executeBlocking( Runnable command ) { if ( threadPool == null ) { logger.error( "Thread pool '{}' not initialized.", threadPoolName ); return; } ThreadPool threadPoolMonitor = this; boolean accepted = false; do { try { threadPool.execute( new Runnable() { @Override public void run() { try { command.run(); } // to make sure that the monitor is freed on exit finally { // Notify all the threads waiting for the resource, if any. synchronized ( threadPoolMonitor ) { threadPoolMonitor.notifyAll(); } } } } ); accepted = true; } catch ( RejectedExecutionException e ) { // Thread pool is full try { // Block until one of the threads finishes its job and exits. synchronized ( threadPoolMonitor ) { threadPoolMonitor.wait(); } } catch ( InterruptedException ignored ) { // return immediately break; } } } while ( !accepted ); }
threadPool es una instancia local de java.util.concurrent.ExecutorService que ya se ha inicializado.
Resolví este problema usando un RejectedExecutionHandler personalizado, que simplemente bloquea el hilo de llamada durante un momento y luego intenta enviar la tarea de nuevo:
public class BlockWhenQueueFull implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // The pool is full. Wait, then try again. try { long waitMs = 250; Thread.sleep(waitMs); } catch (InterruptedException interruptedException) {} executor.execute(r); } }
Esta clase solo se puede usar en el ejecutor de grupo de hilos como un RejectedExecutionHandler como cualquier otro. En este ejemplo:
executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())
El único inconveniente que veo es que el hilo de llamada podría bloquearse un poco más de lo estrictamente necesario (hasta 250 ms). Para muchas tareas de ejecución breve, quizás disminuya el tiempo de espera a 10 ms o menos. Además, dado que este ejecutor se está llamando efectivamente de forma recursiva, esperar mucho tiempo para que un hilo esté disponible (horas) puede provocar un desbordamiento de la stack.
Sin embargo, personalmente me gusta este método. Es compacto, fácil de entender y funciona bien. ¿Me estoy perdiendo algo importante?
Esto es lo que terminé haciendo:
int NUM_THREADS = 6; Semaphore lock = new Semaphore(NUM_THREADS); ExecutorService pool = Executors.newCachedThreadPool(); for (int i = 0; i < 100000; i++) { try { lock.acquire(); } catch (InterruptedException e) { throw new RuntimeException(e); } pool.execute(() -> { try { // Task logic } finally { lock.release(); } }); }