Esperando en una lista de Future

Tengo un método que devuelve una List de futuros

 List<Future> futures = getFutures(); 

Ahora quiero esperar hasta que todos los futuros finalicen el procesamiento correctamente o cualquiera de las tareas cuyo resultado sea devuelto por un futuro genere una excepción. Incluso si una tarea arroja una excepción, no tiene sentido esperar a los otros futuros.

El enfoque simple sería

 wait() { For(Future f : futures) { try { f.get(); } catch(Exception e) { //TODO catch specific exception // this future threw exception , means somone could not do its task return; } } } 

Pero el problema aquí es si, por ejemplo, el cuarto futuro arroja una excepción, entonces esperaré innecesariamente para que los primeros 3 futuros estén disponibles.

¿Cómo resolver esto? ¿Contestará la ayuda del pestillo de alguna manera? No puedo usar Future isDone porque el documento de Java dice

 boolean isDone() Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true. 

Puede utilizar un CompletionService para recibir los futuros tan pronto como estén listos y si uno de ellos lanza una excepción, cancele el procesamiento. Algo como esto:

 Executor executor = Executors.newFixedThreadPool(4); CompletionService completionService = new ExecutorCompletionService(executor); //4 tasks for(int i = 0; i < 4; i++) { completionService.submit(new Callable() { public SomeResult call() { ... return result; } }); } int received = 0; boolean erros = false; while(received < 4 && !errors) { Future resultFuture = completionService.take(); //blocks if none available try { SomeResult result = resultFuture.get(); received ++; ... // do something with the result } catch(Exception e) { //log errors = true; } } 

Creo que puede mejorar aún más para cancelar cualquier tarea que aún se esté ejecutando si uno de ellos arroja un error.

EDITAR: He encontrado un ejemplo más completo aquí: http://blog.teamlazerbeez.com/2009/04/29/java-completionservice/

Si está utilizando Java 8 , puede hacerlo de manera más fácil con CompletableFuture y CompletableFuture.allOf , que aplica la callback solo después de que se hayan completado todos los CompletaFuturos proporcionados.

 // Waits for all futures to complete and returns a list of results. // If a future completes exceptionally then the resulting future will too. public static  CompletableFuture> all(List> futures) { CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]); return CompletableFuture.allOf(cfs) .thenApply(() -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); } 

Puede usar un ExecutorCompletionService . La documentación incluso tiene un ejemplo para su caso de uso exacto:

Supongamos, en cambio, que desea utilizar el primer resultado no nulo del conjunto de tareas, ignorando las excepciones y cancelando todas las demás tareas cuando la primera esté lista:

 void solve(Executor e, Collection> solvers) throws InterruptedException { CompletionService ecs = new ExecutorCompletionService(e); int n = solvers.size(); List> futures = new ArrayList>(n); Result result = null; try { for (Callable s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) { } } } finally { for (Future f : futures) f.cancel(true); } if (result != null) use(result); } 

Lo importante a notar aquí es que ecs.take () obtendrá la primera tarea completada , no solo la primera enviada. Por lo tanto, debe obtenerlos en el orden de finalización de la ejecución (o lanzar una excepción).

Utilice CompletableFuture en Java 8

  // Kick of multiple, asynchronous lookups CompletableFuture page1 = gitHubLookupService.findUser("Test1"); CompletableFuture page2 = gitHubLookupService.findUser("Test2"); CompletableFuture page3 = gitHubLookupService.findUser("Test3"); // Wait until they are all done CompletableFuture.allOf(page1,page2,page3).join(); logger.info("--> " + page1.get()); 

tal vez esto ayude (nada se reemplazaría con hilo sin procesar, ¡sí!) Sugiero ejecutar cada tipo Future con un hilo separado (van en paralelo), luego, cuando uno de los errores tiene, simplemente señala al administrador (clase Handler ).

 class Handler{ //... private Thread thisThread; private boolean failed=false; private Thread[] trds; public void waitFor(){ thisThread=Thread.currentThread(); List> futures = getFutures(); trds=new Thread[futures.size()]; for (int i = 0; i < trds.length; i++) { RunTask rt=new RunTask(futures.get(i), this); trds[i]=new Thread(rt); } synchronized (this) { for(Thread tx:trds){ tx.start(); } } for(Thread tx:trds){ try {tx.join(); } catch (InterruptedException e) { System.out.println("Job failed!");break; } }if(!failed){System.out.println("Job Done");} } private List> getFutures() { return null; } public synchronized void cancelOther(){if(failed){return;} failed=true; for(Thread tx:trds){ tx.stop();//Deprecated but works here like a boss }thisThread.interrupt(); } //... } class RunTask implements Runnable{ private Future f;private Handler h; public RunTask(Future f,Handler h){this.f=f;this.h=h;} public void run(){ try{ f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation) }catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();} catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");} } } 

Debo decir que el código anterior sería un error (no se verificó), pero espero poder explicar la solución. por favor, inténtalo.

Si está utilizando Java 8 y no desea manipular CompletableFuture s, he escrito una herramienta para recuperar los resultados de una List> utilizando la transmisión. La clave es que tienes prohibido map(Future::get) mientras tira.

 public final class Futures { private Futures() {} public static  Collector, Collection, List> present() { return new FutureCollector<>(); } private static class FutureCollector implements Collector, Collection, List> { private final List exceptions = new LinkedList<>(); @Override public Supplier> supplier() { return LinkedList::new; } @Override public BiConsumer, Future> accumulator() { return (r, f) -> { try { r.add(f.get()); } catch (InterruptedException e) {} catch (ExecutionException e) { exceptions.add(e.getCause()); } }; } @Override public BinaryOperator> combiner() { return (l1, l2) -> { l1.addAll(l2); return l1; }; } @Override public Function, List> finisher() { return l -> { List ret = new ArrayList<>(l); if (!exceptions.isEmpty()) throw new AggregateException(exceptions, ret); return ret; }; } @Override public Set characteristics() { return java.util.Collections.emptySet(); } } 

Esto necesita una AggregateException que funcione como C # ‘s

 public class AggregateException extends RuntimeException { /** * */ private static final long serialVersionUID = -4477649337710077094L; private final List causes; private List< ?> successfulElements; public AggregateException(List causes, List< ?> l) { this.causes = causes; successfulElements = l; } public AggregateException(List causes) { this.causes = causes; } @Override public synchronized Throwable getCause() { return this; } public List getCauses() { return causes; } public List< ?> getSuccessfulElements() { return successfulElements; } public void setSuccessfulElements(List< ?> successfulElements) { this.successfulElements = successfulElements; } } 

Este componente actúa exactamente como la tarea de C #. Espere todo . Estoy trabajando en una variante que hace lo mismo que CompletableFuture.allOf (equivalente a Task.WhenAll )

La razón por la que hice esto es que estoy usando Spring’s ListenableFuture y no quiero CompletableFuture a CompletableFuture pesar de que es una forma más estándar

  /** * execute suppliers as future tasks then wait / join for getting results * @param functors a supplier(s) to execute * @return a list of results */ private List getResultsInFuture(Supplier< ?>... functors) { CompletableFuture[] futures = stream(functors) .map(CompletableFuture::supplyAsync) .collect(Collectors.toList()) .toArray(new CompletableFuture[functors.length]); CompletableFuture.allOf(futures).join(); return stream(futures).map(a-> { try { return a.get(); } catch (InterruptedException | ExecutionException e) { //logger.error("an error occurred during runtime execution a function",e); return null; } }).collect(Collectors.toList()); }; 

CompletionService tomará Callables con el método .submit () y podrá recuperar los futuros calculados con el método .take ().

Una cosa que no debes olvidar es terminar el ExecutorService llamando al método .shutdown (). Además, solo puede llamar a este método cuando haya guardado una referencia al servicio del ejecutor, así que asegúrese de guardar uno.

Código de ejemplo: para un número fijo de elementos de trabajo para trabajar en paralelo:

 ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); CompletionService completionService = new ExecutorCompletionService(service); ArrayList> futures = new ArrayList>(); for (String computeMe : elementsToCompute) { futures.add(completionService.submit(new YourCallableImplementor(computeMe))); } //now retrieve the futures after computation (auto wait for it) int received = 0; while(received < elementsToCompute.size()) { Future resultFuture = completionService.take(); YourCallableImplementor result = resultFuture.get(); received ++; } //important: shutdown your ExecutorService service.shutdown(); 

Código de ejemplo: para un número dynamic de elementos de trabajo para trabajar en paralelo:

 public void runIt(){ ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); CompletionService completionService = new ExecutorCompletionService(service); ArrayList> futures = new ArrayList>(); //Initial workload is 8 threads for (int i = 0; i < 9; i++) { futures.add(completionService.submit(write.new CallableImplementor())); } boolean finished = false; while (!finished) { try { Future resultFuture; resultFuture = completionService.take(); CallableImplementor result = resultFuture.get(); finished = doSomethingWith(result.getResult()); result.setResult(null); result = null; resultFuture = null; //After work package has been finished create new work package and add it to futures futures.add(completionService.submit(write.new CallableImplementor())); } catch (InterruptedException | ExecutionException e) { //handle interrupted and assert correct thread / work packet count } } //important: shutdown your ExecutorService service.shutdown(); } public class CallableImplementor implements Callable{ boolean result; @Override public CallableImplementor call() throws Exception { //business logic goes here return this; } public boolean getResult() { return result; } public void setResult(boolean result) { this.result = result; } } 

**

obtener una colección de tareas y ejecutarlas en hilos y luego unir resultados

**

  public final Stream< ?> getResultsInFuture(Supplier< ?>... functors) { CompletableFuture[] promises = Stream.of(functors) .map(CompletableFuture::supplyAsync) .collect(Collectors.toList())//serialize | projection operation .toArray(new CompletableFuture[functors.length]); try { return CompletableFuture.allOf(promises) .thenApply(p -> stream(promises) .map(CompletableFuture::join) .collect(Collectors.toList())) .handle((results, ex) -> { if (ex != null) { //logger.error("an error occurred during getting a future results", ex); return new ArrayList<>(); } return results; }) .get() .stream(); } catch (InterruptedException | ExecutionException e) { //logger.error("an error occurred during executing a function", e); return Stream.of(null); } }