¿Por qué la secuencia paralela no usa todos los hilos de ForkJoinPool?

Así que sé que si usa parallelStream sin una ForkJoinPool personalizada, utilizará ForkJoinPool por defecto, que tiene un subproceso menos por defecto, ya que tiene procesadores.

Entonces, como se establece aquí (y también en la otra respuesta a esa pregunta) para tener más paralelismo, tienes que:

envíe la ejecución de secuencia paralela a su propia ForkJoinPool: yourFJP.submit (() -> stream.parallel (). forEach (doSomething));

Entonces, hice esto:

 import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream; import com.google.common.collect.Sets; public class Main { public static void main(String[] args) throws InterruptedException, ExecutionException { ForkJoinPool forkJoinPool = new ForkJoinPool(1000); IntStream stream = IntStream.range(0, 999999); final Set thNames = Collections.synchronizedSet(new HashSet()); forkJoinPool.submit(() -> { stream.parallel().forEach(n -> { System.out.println("Processing n: " + n); try { Thread.sleep(500); thNames.add(Thread.currentThread().getName()); System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount()); } catch (Exception e) { throw new RuntimeException(e); } }); }).get(); } } 

Hice un conjunto de nombres de subprocesos para ver cuántos subprocesos se están creando, y también registré el número de subprocesos activos que tiene el grupo y ambos números no crecen más de 16, lo que significa que el paralelismo aquí es no siendo más de 16 (¿por qué incluso 16?). Si no uso forkJoinPool, obtengo 4 como paralelismo, que es de acuerdo con la cantidad de procesadores que tengo.

¿Por qué me da 16 y no 1000?

Actualizar

Originalmente, esta respuesta fue una explicación elaborada que afirmaba que ForkJoinPool aplica presión de retroceso y ni siquiera alcanza el nivel de paralelismo prescrito, porque siempre hay trabajadores inactivos disponibles para procesar la transmisión.

Eso es incorrecto

La respuesta real se proporciona en la pregunta original a la que se marcó como duplicada. ForkJoinPool personalizado para el procesamiento de flujo no es oficialmente compatible, y cuando se utiliza forEach , el paralelismo de grupo predeterminado se usa para determinar el comportamiento del spliterator de flujo.

A continuación, se incluye un ejemplo de cómo cuando las tareas se envían manualmente a ForkJoinPool personalizado, el recuento de subprocesos activo del grupo alcanza fácilmente su nivel de paralelismo:

 for (int i = 0; i < 1_000_000; ++i) { forkJoinPool.submit(() -> { try { Thread.sleep(1); thNames.add(Thread.currentThread().getName()); System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism()); } catch (Exception e) { throw new RuntimeException(e); } }); } 

Gracias a Stuart Marks por señalar esto ya Sotirios Delimanolis por argumentar que mi respuesta original es incorrecta 🙂

Me parece que cuando envía una lambda al FJP, lambda usará el grupo común y no el FJP. Sotirios Delimanolis demostró esto con su comentario, arriba. Lo que está enviando es una tarea que se ejecuta en su FJP.

Intente crear un perfil de este código para ver qué hilos se están utilizando en realidad.

No puede nombrar los hilos dentro del FJP.