Manejo de excepciones de las tareas de Java ExecutorService

ThreadPoolExecutor usar la clase ThreadPoolExecutor de Java para ejecutar una gran cantidad de tareas pesadas con un número fijo de hilos. Cada una de las tareas tiene muchos lugares durante los cuales puede fallar debido a excepciones.

He subclasificado ThreadPoolExecutor y he anulado el método afterExecute , que se supone que proporciona las excepciones no detectadas encontradas al ejecutar una tarea. Sin embargo, parece que no puedo hacer que funcione.

Por ejemplo:

 public class ThreadPoolErrors extends ThreadPoolExecutor { public ThreadPoolErrors() { super( 1, // core threads 1, // max threads 1, // timeout TimeUnit.MINUTES, // timeout units new LinkedBlockingQueue() // work queue ); } protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if(t != null) { System.out.println("Got an error: " + t); } else { System.out.println("Everything's fine--situation normal!"); } } public static void main( String [] args) { ThreadPoolErrors threadPool = new ThreadPoolErrors(); threadPool.submit( new Runnable() { public void run() { throw new RuntimeException("Ouch! Got an error."); } } ); threadPool.shutdown(); } } 

La salida de este progtwig es “Todo está bien – ¡situación normal!” aunque el único Runnable enviado al grupo de subprocesos arroja una excepción. ¿Alguna pista de lo que está pasando aquí?

¡Gracias!

De los documentos :

Nota: Cuando las acciones se envuelven en tareas (como FutureTask) de forma explícita o mediante métodos como enviar, estos objetos de tareas capturan y mantienen excepciones computacionales, por lo que no causan una terminación abrupta, y las excepciones internas no se pasan a este método .

Cuando envía un Runnable, queda envuelto en un futuro.

Su AfterExecute debería ser algo como esto:

 public final class ExtendedExecutor extends ThreadPoolExecutor { // ... protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future) { try { Future future = (Future) r; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } if (t != null) { System.out.println(t); } } } 

ADVERTENCIA : se debe tener en cuenta que esta solución bloqueará el hilo de llamada.


Si desea procesar las excepciones lanzadas por la tarea, generalmente es mejor usar Callable lugar de Runnable .

Se permite que Callable.call() arroje excepciones comprobadas, y éstas se propagan nuevamente a la Callable.call() llamada:

 Callable task = ... Future future = executor.submit(task); try { future.get(); } catch (ExecutionException ex) { ex.getCause().printStackTrace(); } 

Si Callable.call() arroja una excepción, esto será envuelto en una ExecutionException y lanzado por Future.get() .

Es probable que esto sea mucho más preferible a la subclasificación de ThreadPoolExecutor . También le da la oportunidad de volver a enviar la tarea si la excepción es recuperable.

La explicación de este comportamiento está en el javadoc para afterExecute :

Nota: Cuando las acciones se envuelven en tareas (como FutureTask) de forma explícita o mediante métodos como enviar, estos objetos de tareas capturan y mantienen excepciones computacionales, por lo que no causan una terminación abrupta, y las excepciones internas no se pasan a este método .

Lo solucioné envolviendo el ejecutable suministrado enviado al ejecutor.

 CompletableFuture.runAsync( () -> { try { runnable.run(); } catch (Throwable e) { Log.info(Concurrency.class, "runAsync", e); } }, executorService ); 

Estoy usando la clase VerboseRunnable de jcabi-log , que se traga todas las excepciones y las registra. Muy conveniente, por ejemplo:

 import com.jcabi.log.VerboseRunnable; scheduler.scheduleWithFixedDelay( new VerboseRunnable( Runnable() { public void run() { // the code, which may throw } }, true // it means that all exceptions will be swallowed and logged ), 1, 1, TimeUnit.MILLISECONDS ); 

Otra solución sería usar ManagedTask y ManagedTaskListener .

Necesita un Callable o Runnable que implemente la interfaz ManagedTask .

El método getManagedTaskListener devuelve la instancia que desea.

 public ManagedTaskListener getManagedTaskListener() { 

Y implementa en ManagedTaskListener el método taskDone :

 @Override public void taskDone(Future future, ManagedExecutorService executor, Object task, Throwable exception) { if (exception != null) { LOGGER.log(Level.SEVERE, exception.getMessage()); } } 

Más detalles sobre el ciclo de vida de la tarea administrada y el oyente .

Si desea supervisar la ejecución de la tarea, puede girar 1 o 2 hilos (quizás más dependiendo de la carga) y usarlos para tomar tareas de un contenedor ExecutionCompletionService.

Si su ExecutorService proviene de una fuente externa (es decir, no es posible subclase ThreadPoolExecutor y anular afterExecute() ), puede usar un proxy dynamic para lograr el comportamiento deseado:

 public static ExecutorService errorAware(final ExecutorService executor) { return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {ExecutorService.class}, (proxy, method, args) -> { if (method.getName().equals("submit")) { final Object arg0 = args[0]; if (arg0 instanceof Runnable) { args[0] = new Runnable() { @Override public void run() { final Runnable task = (Runnable) arg0; try { task.run(); if (task instanceof Future) { final Future future = (Future) task; if (future.isDone()) { try { future.get(); } catch (final CancellationException ce) { // Your error-handling code here ce.printStackTrace(); } catch (final ExecutionException ee) { // Your error-handling code here ee.getCause().printStackTrace(); } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); } } } } catch (final RuntimeException re) { // Your error-handling code here re.printStackTrace(); throw re; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } else if (arg0 instanceof Callable) { args[0] = new Callable() { @Override public Object call() throws Exception { final Callable task = (Callable) arg0; try { return task.call(); } catch (final Exception e) { // Your error-handling code here e.printStackTrace(); throw e; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } } return method.invoke(executor, args); }); } 

Esto se debe a AbstractExecutorService :: submit está envolviendo su runnable en RunnableFuture (nada más que FutureTask ) como a continuación

 AbstractExecutorService.java public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); /////////HERE//////// execute(ftask); return ftask; } 

Luego, execute lo pasará a Worker y Worker.run() llamará al siguiente.

 ThreadPoolExecutor.java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); /////////HERE//////// } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } 

Finalmente task.run(); en la llamada de código anterior llamaremos a FutureTask.run() . Aquí está el código del controlador de excepción, debido a esto NO está obteniendo la excepción esperada.

 class FutureTask implements RunnableFuture public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { /////////HERE//////// result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } 

Esto funciona

  • Se deriva de SingleThreadExecutor, pero puedes adaptarlo fácilmente
  • Código de Java 8 lamdas, pero fácil de arreglar

Creará un Ejecutor con un solo hilo, que puede obtener muchas tareas; y esperará a que la ejecución actual finalice para comenzar con la siguiente

En caso de error o excepción de uncaugth, el uncaughtExceptionHandler lo atrapará

 clase final pública SingleThreadExecutorWithExceptions {

     public static ExecutorService newSingleThreadExecutorWithExceptions (final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

         ThreadFactory factory = (Runnable runnable) -> {
             final Thread newThread = new Thread (ejecutable, "SingleThreadExecutorWithExceptions");
             newThread.setUncaughtExceptionHandler ((final Thread caugthThread, Throwable throwable throwable) -> {
                 uncaughtExceptionHandler.uncaughtException (caugthThread, throwable);
             });
             devolver newThread;
         };
         devolver nuevo FinalizableDelegatedExecutorService
                 (nuevo ThreadPoolExecutor (1, 1,
                         0L, TimeUnit.MILLISECONDS,
                         nueva LinkedBlockingQueue (),
                         fábrica){


                     vacío protegido afterExecute (Runnable runnable, Throwable throwable) {
                         super.afterExecute (ejecutable, arrojable);
                         if (throwable == null && instance ejecutable de Future) {
                             tratar {
                                 Futuro futuro = (futuro) ejecutable;
                                 if (future.isDone ()) {
                                     future.get ();
                                 }
                             } catch (CancellationException ce) {
                                 throwable = ce;
                             } catch (ExecutionException ee) {
                                 throwable = ee.getCause ();
                             } catch (InterruptedException ie) {
                                 Thread.currentThread (). Interrupt ();  // ignorar / restablecer
                             }
                         }
                         if (throwable! = null) {
                             uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), throwable);
                         }
                     }
                 });
     }



     clase estática privada FinalizableDelegatedExecutorService
             extiende DelegatedExecutorService {
         FinalizableDelegatedExecutorService (ExecutorService executor) {
             super (ejecutor);
         }
         protected void finalize () {
             super.shutdown ();
         }
     }

     / **
      * Una clase contenedora que expone solo los métodos ExecutorService
      * de una implementación ExecutorService.
      * /
     clase estática privada DelegatedExecutorService extends AbstractExecutorService {
         privado final ExecutorService e;
         DelegatedExecutorService (ExecutorService executor) {e = ejecutor;  }
         public void execute (comando Runnable) {e.execute (comando);  }
         cierre de vacío público () {e.shutdown ();  }
         public List shutdownNow () {return e.shutdownNow ();  }
         public boolean isShutdown () {return e.isShutdown ();  }
         public boolean isTerminated () {return e.isTerminated ();  }
         público booleano awaitTermination (tiempo de espera prolongado, unidad TimeUnit)
                 throws InterruptedException {
             return e.awaitTermination (tiempo de espera, unidad);
         }
         público Future submit (tarea ejecutable) {
             return e.submit (tarea);
         }
         público Presentación futura (tarea invocable) {
             return e.submit (tarea);
         }
         público Presentación futura (tarea ejecutable, resultado T) {
             return e.submit (tarea, resultado);
         }
         public List> invokeAll (Collection> tasks)
                 throws InterruptedException {
             return e.invokeAll (tareas);
         }
         public List> invokeAll (Collection> tareas,
                                              tiempo de espera largo, unidad TimeUnit)
                 throws InterruptedException {
             return e.invokeAll (tareas, tiempo de espera, unidad);
         }
         public T invokeAny (Colección> tareas)
                 throws InterruptedException, ExecutionException {
             devolver e.invokeAny (tareas);
         }
         public T invokeAny (Colección> tareas,
                                tiempo de espera largo, unidad TimeUnit)
                 throws InterruptedException, ExecutionException, TimeoutException {
             return e.invokeAny (tareas, tiempo de espera, unidad);
         }
     }



     private SingleThreadExecutorWithExceptions () {}
 }

En lugar de subclasificar ThreadPoolExecutor, le proporcionaría una instancia ThreadFactory que crea nuevos subprocesos y les proporciona un UncaughtExceptionHandler