Reader # lines () se paraleliza mal debido a la política de tamaño de lote no configurable en su spliterator

No puedo lograr una buena paralelización del procesamiento de flujo cuando la fuente de flujo es un Reader . Al ejecutar el siguiente código en una CPU de cuatro núcleos, observo 3 núcleos que se usan al principio, luego una caída repentina a solo dos, luego un núcleo. La utilización general de la CPU ronda el 50%.

Tenga en cuenta las siguientes características del ejemplo:

  • hay solo 6,000 líneas;
  • cada línea tarda aproximadamente 20 ms en procesarse;
  • todo el procedimiento toma alrededor de un minuto.

Eso significa que toda la presión está en la CPU y que las E / S son mínimas. El ejemplo es un pato sentado para la paralelización automática.

 import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; ... class imports elided ... public class Main { static final AtomicLong totalTime = new AtomicLong(); public static void main(String[] args) throws IOException { final long start = System.nanoTime(); final Path inputPath = createInput(); System.out.println("Start processing"); try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) { Files.lines(inputPath).parallel().map(Main::processLine) .forEach(w::println); } final double cpuTime = totalTime.get(), realTime = System.nanoTime()-start; final int cores = Runtime.getRuntime().availableProcessors(); System.out.println(" Cores: " + cores); System.out.format(" CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1)); System.out.format(" Real time: %.2f s\n", realTime/SECONDS.toNanos(1)); System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores); } private static String processLine(String line) { final long localStart = System.nanoTime(); double ret = 0; for (int i = 0; i < line.length(); i++) for (int j = 0; j < line.length(); j++) ret += Math.pow(line.charAt(i), line.charAt(j)/32.0); final long took = System.nanoTime()-localStart; totalTime.getAndAdd(took); return NANOSECONDS.toMillis(took) + " " + ret; } private static Path createInput() throws IOException { final Path inputPath = Paths.get("input.txt"); try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) { for (int i = 0; i < 6_000; i++) { final String text = String.valueOf(System.nanoTime()); for (int j = 0; j < 25; j++) w.print(text); w.println(); } } return inputPath; } } 

Mi salida típica:

  Cores: 4 CPU time: 110.23 s Real time: 53.60 s CPU utilization: 51.41% 

A modo de comparación, si utilizo una variante ligeramente modificada donde primero recopilo en una lista y luego procedo:

 Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine) .forEach(w::println); 

Obtengo esta salida típica:

  Cores: 4 CPU time: 138.43 s Real time: 35.00 s CPU utilization: 98.87% 

¿Qué podría explicar ese efecto, y cómo puedo evitarlo para obtener la plena utilización?

Tenga en cuenta que originalmente he observado esto en un lector de flujo de entrada de servlet, por lo que no es específico de un FileReader .

Aquí está la respuesta, explicada en el código fuente de Spliterators.IteratorSpliterator , el usado por BufferedReader#lines() :

  @Override public Spliterator trySplit() { /* * Split into arrays of arithmetically increasing batch * sizes. This will only improve parallel performance if * per-element Consumer actions are more costly than * transferring them into an array. The use of an * arithmetic progression in split sizes provides overhead * vs parallelism bounds that do not particularly favor or * penalize cases of lightweight vs heavyweight element * operations, across combinations of #elements vs #cores, * whether or not either are known. We generate * O(sqrt(#elements)) splits, allowing O(sqrt(#cores)) * potential speedup. */ Iterator i; long s; if ((i = it) == null) { i = it = collection.iterator(); s = est = (long) collection.size(); } else s = est; if (s > 1 && i.hasNext()) { int n = batch + BATCH_UNIT; if (n > s) n = (int) s; if (n > MAX_BATCH) n = MAX_BATCH; Object[] a = new Object[n]; int j = 0; do { a[j] = i.next(); } while (++j < n && i.hasNext()); batch = j; if (est != Long.MAX_VALUE) est -= j; return new ArraySpliterator<>(a, 0, j, characteristics); } return null; } 

También son dignas de mención las constantes:

 static final int BATCH_UNIT = 1 << 10; // batch array size increment static final int MAX_BATCH = 1 << 25; // max batch array size; 

Entonces en mi ejemplo, donde uso 6.000 elementos, obtengo solo tres lotes porque el paso del tamaño del lote es 1024. Eso explica mi observación de que inicialmente se usan tres núcleos, que caen a dos y luego a uno a medida que se completan los lotes más pequeños. Mientras tanto probé un ejemplo modificado con 60,000 elementos y luego obtuve casi el 100% de la utilización de la CPU.

Para resolver mi problema, he desarrollado el siguiente código que me permite convertir cualquier flujo existente en uno cuyo Spliterator#trySplit en lotes del tamaño especificado. La forma más sencilla de usarlo para el caso de uso de mi pregunta es así:

 toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20) 

En un nivel inferior, la clase a continuación es un envoltorio de spliterator que cambia el comportamiento trySplit del trySplit envuelto y deja otros aspectos sin cambios.


 import static java.util.Spliterators.spliterator; import static java.util.stream.StreamSupport.stream; import java.util.Comparator; import java.util.Spliterator; import java.util.function.Consumer; import java.util.stream.Stream; public class FixedBatchSpliteratorWrapper implements Spliterator { private final Spliterator spliterator; private final int batchSize; private final int characteristics; private long est; public FixedBatchSpliteratorWrapper(Spliterator toWrap, long est, int batchSize) { final int c = toWrap.characteristics(); this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c; this.spliterator = toWrap; this.est = est; this.batchSize = batchSize; } public FixedBatchSpliteratorWrapper(Spliterator toWrap, int batchSize) { this(toWrap, toWrap.estimateSize(), batchSize); } public static  Stream toFixedBatchStream(Stream in, int batchSize) { return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true); } @Override public Spliterator trySplit() { final HoldingConsumer holder = new HoldingConsumer<>(); if (!spliterator.tryAdvance(holder)) return null; final Object[] a = new Object[batchSize]; int j = 0; do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder)); if (est != Long.MAX_VALUE) est -= j; return spliterator(a, 0, j, characteristics()); } @Override public boolean tryAdvance(Consumer action) { return spliterator.tryAdvance(action); } @Override public void forEachRemaining(Consumer action) { spliterator.forEachRemaining(action); } @Override public Comparator getComparator() { if (hasCharacteristics(SORTED)) return null; throw new IllegalStateException(); } @Override public long estimateSize() { return est; } @Override public int characteristics() { return characteristics; } static final class HoldingConsumer implements Consumer { Object value; @Override public void accept(T value) { this.value = value; } } } 

Este problema se ha resuelto en cierta medida en las comstackciones de acceso temprano de Java-9. The Files.lines fue reescrito y ahora, al dividirse, salta al medio del archivo mapeado en memoria. Aquí están los resultados en mi máquina (que tiene 4 núcleos HyperThreading = 8 hilos de hardware):

Java 8u60:

 Start processing Cores: 8 CPU time: 73,50 s Real time: 36,54 s CPU utilization: 25,15% 

Java 9b82:

 Start processing Cores: 8 CPU time: 79,64 s Real time: 10,48 s CPU utilization: 94,95% 

Como puede ver, tanto el uso en tiempo real como la utilización de la CPU han mejorado mucho.

Sin embargo, esta optimización tiene algunas limitaciones. Actualmente solo funciona con varias codificaciones (a saber, UTF-8, ISO_8859_1 y US_ASCII), ya que para la encoding arbitraria no se sabe exactamente cómo se codifica el salto de línea. Está limitado a los archivos de no más de 2Gb de tamaño (debido a las limitaciones de MappedByteBuffer en Java) y, por supuesto, no funciona para algunos archivos no regulares (como dispositivos de caracteres, conductos con nombre que no se pueden mapear en la memoria). En tales casos, la implementación anterior se usa como alternativa.

La ejecución paralela de las transmisiones se basa en un modelo fork-join. Para las transmisiones ordenadas , la ejecución en paralelo solo funciona, si la secuencia se puede dividir en partes, siguiendo estrictamente entre sí. En general, eso no es posible con las secuencias generadas por BufferedReader . Sin embargo, en teoría, la ejecución en paralelo debería ser posible para las secuencias desordenadas:

 BufferedReader reader = ...; reader.lines().unordered().map(...); 

No estoy seguro de si la secuencia devuelta por BufferedReader admite este tipo de ejecución en paralelo. Una alternativa muy simple es crear una lista intermedia:

 BufferedReader reader = ...; reader.lines().collect(toList()).parallelStream().map(...); 

En este caso, la ejecución paralela comienza después de que se hayan leído todas las líneas. Esto podría ser un problema si leer las líneas lleva mucho tiempo. En este caso, recomiendo usar un ExecutorService para ejecución paralela en lugar de transmisiones paralelas :

 ExecutorService executor = ...; BufferedReader reader = ...; reader.lines() .map(line -> executor.submit(() -> ... line ...)) .collect(toList()) .stream() .map(future -> future.get()) .map(...); 

Para encontrar la verdadera causa de esto, necesita profundizar en el origen Files.lines() , que llama a BufferedReader.lines() , que es el siguiente:

 public Stream lines() { Iterator iter = new Iterator() { String nextLine = null; @Override public boolean hasNext() { if (nextLine != null) { return true; } else { try { nextLine = readLine(); return (nextLine != null); } catch (IOException e) { throw new UncheckedIOException(e); } } } @Override public String next() { if (nextLine != null || hasNext()) { String line = nextLine; nextLine = null; return line; } else { throw new NoSuchElementException(); } } }; return StreamSupport.stream(Spliterators.spliteratorUnknownSize( iter, Spliterator.ORDERED | Spliterator.NONNULL), false); } 

Aquí devuelve un Stream que es:

  • De tamaño desconocido
  • Ordenado
  • No nulo
  • No paralelo (el argumento false al final de StreamSupport.stream()

Y por lo tanto, realmente no estoy seguro de si está incluso sujeto a ser paralelizado, esto podría encontrarse profundizando aún más en la fuente.

Lo que sé es que las transmisiones paralelas se proporcionan explícitamente en las API de Java. Tomemos como ejemplo List , tiene un List.stream() y List.parallelStream() .