¿Hay una buena manera de extraer trozos de datos de una secuencia Java 8?

I un proceso de ETL Estoy recuperando muchas entidades de un Repositorio de datos de spring. Entonces estoy usando un flujo paralelo para mapear las entidades a diferentes. Puedo usar un consumidor para almacenar esas entidades nuevas en otro repository, una por una, o recostackrlas en una lista y almacenarlas en una única operación masiva. El primero es costoso, mientras que el posterior puede exceder la memoria disponible.

¿Hay una buena manera de recolectar cierta cantidad de elementos en la secuencia (como lo hace el límite), consumir ese trozo y continuar en paralelo hasta que se procesen todos los elementos?

Mi enfoque para las operaciones masivas con fragmentación es usar un envoltorio divisor del particionador y otro envoltorio que anula la política de división predeterminada (progresión aritmética de tamaños de lotes en incrementos de 1024) a la división simple de lotes fijos. Úselo así:

Stream existingStream = ...; Stream> partitioned = partition(existingStream, 100, 1); partitioned.forEach(chunk -> ... process the chunk ...); 

Aquí está el código completo:

 import java.util.ArrayList; import java.util.List; import java.util.Spliterator; import java.util.Spliterators.AbstractSpliterator; import java.util.function.Consumer; import java.util.stream.Stream; import java.util.stream.StreamSupport; public class PartitioningSpliterator extends AbstractSpliterator> { private final Spliterator spliterator; private final int partitionSize; public PartitioningSpliterator(Spliterator toWrap, int partitionSize) { super(toWrap.estimateSize(), toWrap.characteristics() | Spliterator.NONNULL); if (partitionSize <= 0) throw new IllegalArgumentException( "Partition size must be positive, but was " + partitionSize); this.spliterator = toWrap; this.partitionSize = partitionSize; } public static  Stream> partition(Stream in, int size) { return StreamSupport.stream(new PartitioningSpliterator(in.spliterator(), size), false); } public static  Stream> partition(Stream in, int size, int batchSize) { return StreamSupport.stream( new FixedBatchSpliterator<>(new PartitioningSpliterator<>(in.spliterator(), size), batchSize), false); } @Override public boolean tryAdvance(Consumer> action) { final ArrayList partition = new ArrayList<>(partitionSize); while (spliterator.tryAdvance(partition::add) && partition.size() < partitionSize); if (partition.isEmpty()) return false; action.accept(partition); return true; } @Override public long estimateSize() { final long est = spliterator.estimateSize(); return est == Long.MAX_VALUE? est : est / partitionSize + (est % partitionSize > 0? 1 : 0); } } 

 import static java.util.Spliterators.spliterator; import java.util.Comparator; import java.util.Spliterator; import java.util.function.Consumer; public abstract class FixedBatchSpliteratorBase implements Spliterator { private final int batchSize; private final int characteristics; private long est; public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) { characteristics |= ORDERED; if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED; this.characteristics = characteristics; this.batchSize = batchSize; this.est = est; } public FixedBatchSpliteratorBase(int characteristics, int batchSize) { this(characteristics, batchSize, Long.MAX_VALUE); } public FixedBatchSpliteratorBase(int characteristics) { this(characteristics, 64, Long.MAX_VALUE); } @Override public Spliterator trySplit() { final HoldingConsumer holder = new HoldingConsumer<>(); if (!tryAdvance(holder)) return null; final Object[] a = new Object[batchSize]; int j = 0; do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder)); if (est != Long.MAX_VALUE) est -= j; return spliterator(a, 0, j, characteristics()); } @Override public Comparator getComparator() { if (hasCharacteristics(SORTED)) return null; throw new IllegalStateException(); } @Override public long estimateSize() { return est; } @Override public int characteristics() { return characteristics; } static final class HoldingConsumer implements Consumer { Object value; @Override public void accept(T value) { this.value = value; } } } 

 import static java.util.stream.StreamSupport.stream; import java.util.Spliterator; import java.util.function.Consumer; import java.util.stream.Stream; public class FixedBatchSpliterator extends FixedBatchSpliteratorBase { private final Spliterator spliterator; public FixedBatchSpliterator(Spliterator toWrap, int batchSize, long est) { super(toWrap.characteristics(), batchSize, est); this.spliterator = toWrap; } public FixedBatchSpliterator(Spliterator toWrap, int batchSize) { this(toWrap, batchSize, toWrap.estimateSize()); } public FixedBatchSpliterator(Spliterator toWrap) { this(toWrap, 64, toWrap.estimateSize()); } public static  Stream withBatchSize(Stream in, int batchSize) { return stream(new FixedBatchSpliterator<>(in.spliterator(), batchSize), true); } public static  FixedBatchSpliterator batchedSpliterator(Spliterator toWrap, int batchSize) { return new FixedBatchSpliterator<>(toWrap, batchSize); } @Override public boolean tryAdvance(Consumer action) { return spliterator.tryAdvance(action); } @Override public void forEachRemaining(Consumer action) { spliterator.forEachRemaining(action); } } 

Es posible que pueda escribir su propio Collector que acumula entidades y luego realiza actualizaciones masivas.

El método Collector.accumulator() puede agregar las entidades a un caché temporal interno hasta que el caché crezca demasiado. Cuando la memoria caché es lo suficientemente grande, puede hacer una tienda masiva en su otro repository.

Collector.merge() necesita combinar cachés de recostackdor de 2 hilos en un solo caché (y posiblemente combinar)

Finalmente, se llama al método Collector.finisher() cuando se realiza el flujo, así que guarde aquí todo lo que quede en el caché.

Como ya está usando una transmisión paralela y parece estar bien haciendo múltiples cargas al mismo tiempo, supongo que ya ha manejado la seguridad de la hebra.

ACTUALIZAR

Mi comentario sobre la seguridad de la hebra y las transmisiones paralelas se refería al almacenamiento / almacenamiento real en el repository, no a la concurrencia en la colección temporal.

Cada recostackdor debería (creo) ejecutarse en su propio hilo. Una secuencia paralela debería crear múltiples instancias de recostackdor llamando al supplier() varias veces. De modo que puede tratar una instancia de recostackdor como un único subproceso y debería funcionar bien.

Por ejemplo, en el Javadoc para java.util.IntSummaryStatistics dice:

Esta implementación no es segura para subprocesos. Sin embargo, es seguro utilizar Collectors.toIntStatistics () en una secuencia paralela, porque la implementación paralela de Stream.collect () proporciona la partición necesaria, el aislamiento y la fusión de resultados para una ejecución en paralelo segura y eficiente.

Puede usar un colector personalizado para hacerlo elegantemente.

Por favor vea mi respuesta a una pregunta similar aquí:

Colector de procesamiento por lotes personalizado

Luego, puede procesar por lotes el flujo en paralelo utilizando el recostackdor anterior para almacenar los registros en su repository, ejemplo de uso:

 List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); int batchSize = 3; Consumer> batchProcessor = xs -> repository.save(xs); input.parallelStream() .map(i -> i + 1) .collect(StreamUtils.batchCollector(batchSize, batchProcessor)); 
  @Test public void streamTest(){ Stream data = Stream.generate(() -> { //Block on IO return blockOnIO(); }); AtomicInteger countDown = new AtomicInteger(1000); final ArrayList[] buffer = new ArrayList[]{new ArrayList()}; Object syncO = new Object(); data.parallel().unordered().map(i -> i * 1000).forEach(i->{ System.out.println(String.format("FE %s %d",Thread.currentThread().getName(), buffer[0].size())); int c; ArrayList export=null; synchronized (syncO) { c = countDown.addAndGet(-1); buffer[0].add(i); if (c == 0) { export=buffer[0]; buffer[0] = new ArrayList(); countDown.set(1000); } } if(export !=null){ sendBatch(export); } }); //export any remaining sendBatch(buffer[0]); } Integer blockOnIO(){ try { Thread.sleep(50); return Integer.valueOf((int)Math.random()*1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } void sendBatch(ArrayList al){ assert al.size() == 1000; System.out.println(String.format("LOAD %s %d",Thread.currentThread().getName(), al.size())); } 

Esto es quizás algo anticuado, pero debe lograr el procesamiento por lotes con un mínimo de locking.

Producirá resultados como

 FE ForkJoinPool.commonPool-worker-2 996 FE ForkJoinPool.commonPool-worker-5 996 FE ForkJoinPool.commonPool-worker-4 998 FE ForkJoinPool.commonPool-worker-3 999 LOAD ForkJoinPool.commonPool-worker-3 1000 FE ForkJoinPool.commonPool-worker-6 0 FE ForkJoinPool.commonPool-worker-1 2 FE ForkJoinPool.commonPool-worker-7 2 FE ForkJoinPool.commonPool-worker-2 4 

Aquí hay una solución de My Library: AbacusUtil :

 stream.split(batchSize).parallel(threadNum).map(yourBatchProcessFunction);