ExecutorService que interrumpe las tareas después de un tiempo de espera

Estoy buscando una implementación ExecutorService que se puede proporcionar con un tiempo de espera. Las tareas que se envían al ExecutorService se interrumpen si tardan más tiempo que el tiempo de espera en ejecutarse. Implementar tal bestia no es una tarea tan difícil, pero me pregunto si alguien sabe de una implementación existente.

Esto es lo que surgió en función de algunos de los debates a continuación. ¿Algún comentario?

import java.util.List; import java.util.concurrent.*; public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor { private final long timeout; private final TimeUnit timeoutUnit; private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap runningTasks = new ConcurrentHashMap(); public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } @Override public void shutdown() { timeoutExecutor.shutdown(); super.shutdown(); } @Override public List shutdownNow() { timeoutExecutor.shutdownNow(); return super.shutdownNow(); } @Override protected void beforeExecute(Thread t, Runnable r) { if(timeout > 0) { final ScheduledFuture scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit); runningTasks.put(r, scheduled); } } @Override protected void afterExecute(Runnable r, Throwable t) { ScheduledFuture timeoutTask = runningTasks.remove(r); if(timeoutTask != null) { timeoutTask.cancel(false); } } class TimeoutTask implements Runnable { private final Thread thread; public TimeoutTask(Thread thread) { this.thread = thread; } @Override public void run() { thread.interrupt(); } } } 

Puede usar un ScheduledExecutorService para esto. Primero lo enviaría solo una vez para comenzar inmediatamente y retener el futuro que se crea. Después de eso, puede enviar una nueva tarea que cancelaría el futuro retenido después de un período de tiempo.

  ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); final Future handler = executor.submit(new Callable(){ ... }); executor.schedule(new Runnable(){ public void run(){ handler.cancel(); } }, 10000, TimeUnit.MILLISECONDS); 

Esto ejecutará su controlador (la funcionalidad principal será interrumpida) por 10 segundos, luego cancelará (es decir, interrumpirá) esa tarea específica.

Lamentablemente, la solución es defectuosa. Hay un tipo de error con ScheduledThreadPoolExecutor , también se informa en esta pregunta : la cancelación de una tarea enviada no libera por completo los recursos de memoria asociados con la tarea; los recursos se lanzan solo cuando la tarea expira.

Si, por lo tanto, crea un TimeoutThreadPoolExecutor con un tiempo de caducidad bastante largo (un uso típico) y envía tareas lo suficientemente rápido, termina llenando la memoria, incluso si las tareas realmente se completaron con éxito.

Puede ver el problema con el siguiente progtwig de prueba (muy crudo):

 public static void main(String[] args) throws InterruptedException { ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue(), 10, TimeUnit.MINUTES); //ExecutorService service = Executors.newFixedThreadPool(1); try { final AtomicInteger counter = new AtomicInteger(); for (long i = 0; i < 10000000; i++) { service.submit(new Runnable() { @Override public void run() { counter.incrementAndGet(); } }); if (i % 10000 == 0) { System.out.println(i + "/" + counter.get()); while (i > counter.get()) { Thread.sleep(10); } } } } finally { service.shutdown(); } } 

El progtwig agota la memoria disponible, aunque espera a que se complete el Runnable s generado.

Pensé en esto por un tiempo, pero lamentablemente no pude encontrar una buena solución.

EDITAR: descubrí que este problema se informó como error JDK 6602600 , y parece que se ha solucionado recientemente.

Envuelva la tarea en FutureTask y puede especificar el tiempo de espera para FutureTask. Mira el ejemplo en mi respuesta a esta pregunta,

java native Process timeout

¿Qué le parece usar el método ExecutorService.shutDownNow() como se describe en http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html ? Parece ser la solución más simple.

Parece que el problema no está en el error JDK 6602600 (se resolvió en 2010-05-22), sino en una llamada incorrecta de suspensión (10) en círculo. Además, note que el hilo principal debe dar CHANCE directamente a otros hilos para realizar sus tareas por invocación SLEEP (0) en CADA twig del círculo exterior. Creo que es mejor utilizar Thread.yield () en lugar de Thread.sleep (0)

El resultado corregido parte del código de problema anterior es tal como este:

 ....................... ........................ Thread.yield(); if (i % 1000== 0) { System.out.println(i + "/" + counter.get()+ "/"+service.toString()); } // // while (i > counter.get()) { // Thread.sleep(10); // } 

Funciona correctamente con la cantidad de contador externo de hasta 150 000 000 círculos probados.

Después de una tonelada de tiempo para inspeccionar,
Finalmente, invokeAll método invokeAll de ExecutorService para resolver este problema.
Eso interrumpirá estrictamente la tarea mientras se ejecuta la tarea.
Aquí hay un ejemplo

 ExecutorService executorService = Executors.newCachedThreadPool(); try { List> callables = new ArrayList<>(); // Add your long time task (callable) callables.add(new VaryLongTimeTask()); // Assign tasks for specific execution timeout (eg 2 sec) List> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS); for (Future future : futures) { // Getting result } } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown(); 

El profesional es que también puede enviar ListenableFuture al mismo ExecutorService .
Simplemente cambie ligeramente la primera línea de código.

 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); 

ListeningExecutorService es la función de Listening de ExecutorService en google guava project ( com.google.guava ))

Usando la respuesta de John W, creé una implementación que iniciaba correctamente el tiempo de espera cuando la tarea inicia su ejecución. Incluso escribo una prueba unitaria para ello 🙂

Sin embargo, no satisface mis necesidades, ya que algunas operaciones IO no interrumpen cuando se llama a Future.cancel() es decir, cuando se Thread.interrupted() ).

De todos modos, si alguien está interesado, creé una esencia: https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

¿Qué pasa con esta idea alternativa?

  • dos tienen dos ejecutores:
    • uno para :
      • enviar la tarea, sin preocuparse por el tiempo de espera de la tarea
      • agregando el futuro resultado y el momento en que debería terminar en una estructura interna
    • uno para ejecutar un trabajo interno que verifica la estructura interna si algunas tareas tienen un tiempo de espera y si deben cancelarse.

Pequeña muestra está aquí:

 public class AlternativeExecutorService { private final CopyOnWriteArrayList futureQueue = new CopyOnWriteArrayList(); private final ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job private final ListeningExecutorService threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for private ScheduledFuture scheduledFuture; private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L; public AlternativeExecutorService() { scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS); } public void pushTask(OwnTask task) { ListenableFuture future = threadExecutor.submit(task); // -> create your Callable futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end } public void shutdownInternalScheduledExecutor() { scheduledFuture.cancel(true); scheduledExecutor.shutdownNow(); } long getCurrentMillisecondsTime() { return Calendar.getInstance().get(Calendar.MILLISECOND); } class ListenableFutureTask { private final ListenableFuture future; private final OwnTask task; private final long milliSecEndTime; private ListenableFutureTask(ListenableFuture future, OwnTask task, long milliSecStartTime) { this.future = future; this.task = task; this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS); } ListenableFuture getFuture() { return future; } OwnTask getTask() { return task; } long getMilliSecEndTime() { return milliSecEndTime; } } class TimeoutManagerJob implements Runnable { CopyOnWriteArrayList getCopyOnWriteArrayList() { return futureQueue; } @Override public void run() { long currentMileSecValue = getCurrentMillisecondsTime(); for (ListenableFutureTask futureTask : futureQueue) { consumeFuture(futureTask, currentMileSecValue); } } private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue) { ListenableFuture future = futureTask.getFuture(); boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue; if (isTimeout) { if (!future.isDone()) { future.cancel(true); } futureQueue.remove(futureTask); } } } class OwnTask implements Callable { private long timeoutDuration; private TimeUnit timeUnit; OwnTask(long timeoutDuration, TimeUnit timeUnit) { this.timeoutDuration = timeoutDuration; this.timeUnit = timeUnit; } @Override public Void call() throws Exception { // do logic return null; } public long getTimeoutDuration() { return timeoutDuration; } public TimeUnit getTimeUnit() { return timeUnit; } } }