Grupo de subprocesos personalizados en la secuencia paralela de Java 8

¿Es posible especificar un grupo de subprocesos personalizado para la secuencia paralela de Java 8? No puedo encontrarlo en ningún lado.

Imagine que tengo una aplicación de servidor y me gustaría usar transmisiones paralelas. Pero la aplicación es grande y tiene varios subprocesos, así que quiero dividirla en compartimentos. No quiero una tarea de ejecución lenta en un módulo de las tareas de applicationblock de otro módulo.

Si no puedo usar diferentes grupos de subprocesos para diferentes módulos, significa que no puedo usar transmisiones paralelas de forma segura en la mayoría de las situaciones del mundo real.

Prueba el siguiente ejemplo. Hay algunas tareas intensivas de CPU ejecutadas en hilos separados. Las tareas aprovechan las transmisiones paralelas. La primera tarea se rompe, por lo que cada paso lleva 1 segundo (simulado por el hilo de reposo). El problema es que otros hilos se atascan y esperan a que la tarea rota termine. Este es un ejemplo inventado, pero imagina una aplicación de servlet y alguien que envía una tarea de larga ejecución al conjunto compartido de fork tenedor.

public class ParallelTest { public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); es.execute(() -> runTask(1000)); //incorrect task es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.shutdown(); es.awaitTermination(60, TimeUnit.SECONDS); } private static void runTask(int delay) { range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max() .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max)); } public static boolean isPrime(long n) { return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0); } } 

En realidad, hay un truco sobre cómo ejecutar una operación paralela en un grupo de unión de horquilla específico. Si lo ejecuta como una tarea en un grupo de unión de horquilla, permanece allí y no utiliza el común.

 ForkJoinPool forkJoinPool = new ForkJoinPool(2); forkJoinPool.submit(() -> //parallel task here, for example IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()) ).get(); 

El truco se basa en ForkJoinTask.fork, que especifica: “Organiza la ejecución de esta tarea de forma asíncrona en el grupo en el que se está ejecutando la tarea actual, si corresponde, o utilizando el ForkJoinPool.commonPool () si no está enForkJoinPool ()”

Las transmisiones paralelas usan el valor predeterminado de ForkJoinPool.commonPool que de forma predeterminada tiene un subproceso menos como procesador , como lo devuelve Runtime.getRuntime().availableProcessors() (Esto significa que las transmisiones paralelas utilizan todos los procesadores porque también usan el hilo principal )

Para las aplicaciones que requieren pools separados o personalizados, se puede construir un ForkJoinPool con un nivel de paralelismo objective dado; por defecto, igual a la cantidad de procesadores disponibles.

Esto también significa que si ha nested transmisiones paralelas o múltiples secuencias paralelas iniciadas simultáneamente, todas compartirán el mismo grupo. Ventaja: nunca usará más que el predeterminado (cantidad de procesadores disponibles). Desventaja: es posible que no obtenga “todos los procesadores” asignados a cada flujo paralelo que inicie (si tiene más de uno). (Al parecer, puede usar un ManagedBlocker para eludir eso).

Para cambiar la forma en que se ejecutan las secuencias paralelas, puede

  • envíe la ejecución de secuencia paralela a su propia ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); o
  • puede cambiar el tamaño del grupo común utilizando las propiedades del sistema: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") para un paralelismo de destino de 20 subprocesos.

Ejemplo de este último en mi máquina que tiene 8 procesadores. Si ejecuto el siguiente progtwig:

 long start = System.currentTimeMillis(); IntStream s = IntStream.range(0, 20); //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); s.parallel().forEach(i -> { try { Thread.sleep(100); } catch (Exception ignore) {} System.out.print((System.currentTimeMillis() - start) + " "); }); 

El resultado es:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

Entonces puede ver que la secuencia paralela procesa 8 elementos a la vez, es decir, usa 8 hilos. Sin embargo, si elimino el comentario de la línea comentada, la salida es:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

Esta vez, la secuencia paralela ha utilizado 20 subprocesos y los 20 elementos en la secuencia se han procesado al mismo tiempo.

Alternativamente al truco de activar el cálculo paralelo dentro de su propia forkJoinPool, también puede pasar ese grupo al método CompletableFuture.supplyAsync como en:

 ForkJoinPool forkJoinPool = new ForkJoinPool(2); CompletableFuture> primes = CompletableFuture.supplyAsync(() -> //parallel task here, for example range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), forkJoinPool ); 

El uso de ForkJoinPool y enviar para una transmisión paralela no utiliza de manera confiable todos los hilos. Si observa esto (la secuencia paralela de un HashSet no se ejecuta en paralelo ) y esto ( ¿por qué la secuencia paralela no utiliza todos los hilos de ForkJoinPool? ), Verá el razonamiento.

Versión corta: si ForkJoinPool / submit no funciona para usted, use

 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10"); 

Hasta ahora, utilicé las soluciones descritas en las respuestas de esta pregunta. Ahora, se me ocurrió una pequeña biblioteca llamada Parallel Stream Support para eso:

 ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS); ParallelIntStreamSupport.range(1, 1_000_000, pool) .filter(PrimesPrint::isPrime) .collect(toList()) 

Pero como señaló @PabloMatiasGomez en los comentarios, existen inconvenientes con respecto al mecanismo de división de flujos paralelos que depende en gran medida del tamaño del conjunto común. Ver secuencia paralela desde un HashSet no se ejecuta en paralelo .

Estoy utilizando esta solución solo para tener pools separados para diferentes tipos de trabajo, pero no puedo establecer el tamaño del pool común en 1 incluso si no lo uso.

Para medir la cantidad real de hilos usados, puede verificar Thread.activeCount() :

  Runnable r = () -> IntStream .range(-42, +42) .parallel() .map(i -> Thread.activeCount()) .max() .ifPresent(System.out::println); ForkJoinPool.commonPool().submit(r).join(); new ForkJoinPool(42).submit(r).join(); 

Esto puede producir en una CPU de 4 núcleos una salida como:

 5 // common pool 23 // custom pool 

Sin .parallel() da:

 3 // common pool 4 // custom pool 

Ve a buscar AbacusUtil . El número de subproceso puede especificarse para transmisión en paralelo. Aquí está el código de ejemplo:

 LongStream.range(4, 1_000_000).parallel(threadNum)... 

Divulgación: soy el desarrollador de AbacusUtil.

Si no te molesta usar una biblioteca de terceros, con cyclops-reaction puedes mezclar secuencias secuenciales y paralelas dentro de la misma tubería y proporcionar ForkJoinPools personalizadas. Por ejemplo

  ReactiveSeq.range(1, 1_000_000) .foldParallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId())) .max(Comparator.naturalOrder())); 

O si quisiéramos continuar procesando dentro de una secuencia secuencial

  ReactiveSeq.range(1, 1_000_000) .parallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))) .map(this::processSequentially) .forEach(System.out::println); 

[Divulgación Soy el desarrollador principal de cyclops-react]

Probé la ForkJoinPool personalizada de la siguiente manera para ajustar el tamaño de la agrupación:

 private static Set ThreadNameSet = new HashSet<>(); private static Callable getSum() { List aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList()); return () -> aList.parallelStream() .peek((i) -> { String threadName = Thread.currentThread().getName(); ThreadNameSet.add(threadName); }) .reduce(0L, Long::sum); } private static void testForkJoinPool() { final int parallelism = 10; ForkJoinPool forkJoinPool = null; Long result = 0L; try { forkJoinPool = new ForkJoinPool(parallelism); result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { if (forkJoinPool != null) { forkJoinPool.shutdown(); //always remember to shutdown the pool } } out.println(result); out.println(ThreadNameSet); } 

Aquí está el resultado que dice que el grupo usa más hilos que el predeterminado 4 .

 50000005000000 [ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2] 

Pero en realidad hay un raro , cuando traté de lograr el mismo resultado usando ThreadPoolExecutor siguiente manera:

 BlockingDeque blockingDeque = new LinkedBlockingDeque(1000); ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread")); 

pero fallé.

Solo iniciará parallelStream en un nuevo hilo y luego todo lo demás será igual, lo que prueba nuevamente que parallelStream usará ForkJoinPool para iniciar sus hilos secundarios.

Nota: Parece haber una solución implementada en JDK 10 que garantiza que el grupo de subprocesos personalizados utiliza el número esperado de subprocesos.

La ejecución de secuencias paralelas dentro de una ForkJoinPool personalizada debe obedecer al paralelismo https://bugs.openjdk.java.net/browse/JDK-8190974