Flujos paralelos, colectores y seguridad de hilos

Vea el siguiente ejemplo simple que cuenta el número de ocurrencias de cada palabra en una lista:

Stream words = Stream.of("a", "b", "a", "c"); Map wordsCount = words.collect(toMap(s -> s, s -> 1, (i, j) -> i + j)); 

Al final, wordsCount es {a=2, b=1, c=1} .

Pero mi transmisión es muy grande y quiero paralelizar el trabajo, entonces escribo:

 Map wordsCount = words.parallel() .collect(toMap(s -> s, s -> 1, (i, j) -> i + j)); 

Sin embargo, he notado que wordsCount es un HashMap simple, por lo que me pregunto si necesito pedir explícitamente un mapa simultáneo para garantizar la seguridad del hilo:

 Map wordsCount = words.parallel() .collect(toConcurrentMap(s -> s, s -> 1, (i, j) -> i + j)); 

¿Se pueden usar de forma segura colectores no concurrentes con una transmisión en paralelo o solo debo usar las versiones concurrentes cuando recopilo de una transmisión paralela?

¿Se pueden usar de forma segura colectores no concurrentes con una transmisión en paralelo o solo debo usar las versiones concurrentes cuando recopilo de una transmisión paralela?

Es seguro utilizar un recostackdor no simultáneo en una operación de collect de una secuencia paralela.

En la especificación de la interfaz de Collector , en la sección con media docena de puntos de viñeta, está esto:

Para los recostackdores no concurrentes, cualquier resultado devuelto por el proveedor de resultados, el acumulador o las funciones del combinador deben estar confinados en serie. Esto permite que la recostackción se realice en paralelo sin que el recostackdor tenga que implementar ninguna sincronización adicional. La implementación de la reducción debe gestionar que la entrada esté adecuadamente particionada, que las particiones se procesen de forma aislada y que la combinación solo suceda una vez que se complete la acumulación.

Esto significa que las diversas implementaciones proporcionadas por la clase Collectors se pueden usar con transmisiones paralelas, aunque algunas de esas implementaciones podrían no ser recostackdores concurrentes. Esto también se aplica a cualquiera de sus propios recostackdores no concurrentes que pueda implementar. Se pueden usar de forma segura con transmisiones paralelas, siempre que los colectores no interfieran con la fuente de la stream, no tengan efectos secundarios, sean independientes de la orden, etc.

También recomiendo leer la sección de Reducción de Mutaciones de la documentación del paquete java.util.stream. En el medio de esta sección hay un ejemplo que se dice que es paralelizable, pero que recoge los resultados en una ArrayList , que no es segura para subprocesos.

La forma en que esto funciona es que una secuencia paralela que termina en un recostackdor no simultáneo asegura que diferentes subprocesos siempre estén operando en diferentes instancias de las colecciones intermedias de resultados. Es por eso que un recostackdor tiene una función de Supplier , para crear tantas colecciones intermedias como hilos, de modo que cada hilo se puede acumular por sí mismo. Cuando se fusionan resultados intermedios, se transfieren de manera segura entre hilos, y en cualquier momento dado, solo un hilo fusiona cualquier par de resultados intermedios.

Todos los recolectores, si siguen las reglas en la especificación, se pueden ejecutar en paralelo o secuencialmente. La preparación paralímpica es una parte clave del diseño aquí.

La distinción entre colectores concurrentes y no concurrentes tiene que ver con el enfoque de paralelización.

Un recostackdor ordinario (no simultáneo) opera fusionando subresultados. Por lo tanto, la fuente se divide en un grupo de fragmentos, cada fragmento se recostack en un contenedor de resultados (como una lista o un mapa) y luego los sub-resultados se fusionan en un contenedor de resultados más grande. Esto es seguro y preservador de orden, pero para algunos tipos de contenedores, especialmente los mapas, puede ser costoso, ya que la combinación de dos mapas por clave suele ser costosa.

Un recostackdor simultáneo crea, en su lugar, un contenedor de resultados, cuyas operaciones de inserción garantizan la seguridad de los subprocesos y los elementos de varios subprocesos. Con un contenedor de resultados altamente concurrente como ConcurrentHashMap, este enfoque puede funcionar mejor que la fusión de HashMaps ordinarios.

Por lo tanto, los recostackdores simultáneos son estrictamente optimizaciones sobre sus homólogos ordinarios. Y no vienen sin un costo; Debido a que los elementos están siendo expulsados ​​de muchos hilos, los colectores concurrentes generalmente no pueden preservar el orden de encuentro. (Pero, a menudo no te importa, cuando creas un histogtwig de conteo de palabras, no importa qué instancia de “foo” hayas contado primero).

Es seguro usar colecciones no concurrentes y contadores no atómicos con flujos paralelos.

Si echas un vistazo a la documentación de Stream :: collect , encuentras el siguiente párrafo:

Al igual que reduce(Object, BinaryOperator) , las operaciones de recostackción se pueden paralelizar sin requerir sincronización adicional.

Y para el método Stream :: reduce :

Si bien esto puede parecer una forma más indirecta de realizar una agregación en comparación con simplemente mutar un total acumulado en un bucle, las operaciones de reducción se paralelizan más elegantemente, sin necesidad de sincronización adicional y con un riesgo muy reducido de carreras de datos.

Esto podría ser un poco sorprendente. Sin embargo, tenga en cuenta que las transmisiones paralelas se basan en un modelo de unión por horquilla . Eso significa que la ejecución concurrente funciona de la siguiente manera:

  • dividir la secuencia en dos partes con aproximadamente el mismo tamaño
  • procesar cada parte individualmente
  • recoger los resultados de ambas partes y combinarlas en un resultado

En el segundo paso, los tres pasos se aplican recursivamente a las subsecuencias.

Un ejemplo debería dejarlo claro. los

 IntStream.range(0, 4) .parallel() .collect(Trace::new, Trace::accumulate, Trace::combine); 

El único propósito de la clase Trace es registrar el constructor y las llamadas al método. Si ejecuta esta statement, imprime las siguientes líneas:

 thread: 9 / operation: new thread: 10 / operation: new thread: 10 / operation: accumulate thread: 1 / operation: new thread: 1 / operation: accumulate thread: 1 / operation: combine thread: 11 / operation: new thread: 11 / operation: accumulate thread: 9 / operation: accumulate thread: 9 / operation: combine thread: 9 / operation: combine 

Puede ver que se han creado cuatro objetos Trace , que se ha llamado accumulate una vez en cada objeto, y que se ha usado tres veces para combinar los cuatro objetos en uno. Cada objeto solo puede tener acceso por un hilo a la vez. Eso hace que el código sea seguro para subprocesos, y lo mismo se aplica al método Collectors :: toMap .