¿Cómo funciona HashPartitioner?

Leí en la documentación de HashPartitioner . Desafortunadamente, no se explicó mucho, excepto las llamadas API. Estoy bajo la suposición de que HashPartitioner divide el conjunto distribuido en función del hash de las claves. Por ejemplo, si mis datos son como

 (1,1), (1,2), (1,3), (2,1), (2,2), (2,3) 

Entonces, el particionador colocaría esto en diferentes particiones con las mismas claves cayendo en la misma partición. Sin embargo, no entiendo la importancia del argumento constructor

 new HashPartitoner(numPartitions) //What does numPartitions do? 

Para el conjunto de datos anterior, ¿cómo diferirían los resultados si lo hiciera?

 new HashPartitoner(1) new HashPartitoner(2) new HashPartitoner(10) 

Entonces, ¿cómo funciona HashPartitioner realidad?

Bueno, hagamos que su conjunto de datos sea marginalmente más interesante:

 val rdd = sc.parallelize(for { x <- 1 to 3 y <- 1 to 2 } yield (x, None), 8) 

Tenemos seis elementos:

 rdd.count 
 Long = 6 

sin particionador

 rdd.partitioner 
 Option[org.apache.spark.Partitioner] = None 

y ocho particiones:

 rdd.partitions.length 
 Int = 8 

Ahora permitamos que el pequeño ayudante cuente el número de elementos por partición:

 import org.apache.spark.rdd.RDD def countByPartition(rdd: RDD[(Int, None.type)]) = { rdd.mapPartitions(iter => Iterator(iter.length)) } 

Como no tenemos el particionador, nuestro conjunto de datos se distribuye uniformemente entre particiones ( Esquema de particionamiento predeterminado en Spark ):

 countByPartition(rdd).collect() 
 Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1) 

distribución inicial

Ahora vamos a reparticionar nuestro conjunto de datos:

 import org.apache.spark.HashPartitioner val rddOneP = rdd.partitionBy(new HashPartitioner(1)) 

Como el parámetro pasado a HashPartitioner define el número de particiones, esperamos una partición:

 rddOneP.partitions.length 
 Int = 1 

Como solo tenemos una partición, contiene todos los elementos:

 countByPartition(rddOneP).collect 
 Array[Int] = Array(6) 

hash-partitioner-1

Tenga en cuenta que el orden de los valores después de la reproducción aleatoria no es determinista.

De la misma manera si usamos HashPartitioner(2)

 val rddTwoP = rdd.partitionBy(new HashPartitioner(2)) 

obtendremos 2 particiones:

 rddTwoP.partitions.length 
 Int = 2 

Dado que rdd está particionado por datos clave, ya no se distribuirá uniformemente:

 countByPartition(rddTwoP).collect() 
 Array[Int] = Array(2, 4) 

Porque con tener tres claves y solo dos valores diferentes de hashCode mod numPartitions no hay nada inesperado aquí:

 (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2)) 
 scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1)) 

Solo para confirmar lo anterior:

 rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect() 
 Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3)) 

hash-partitioner-2

Finalmente, con HashPartitioner(7) obtenemos siete particiones, tres no vacías con 2 elementos cada una:

 val rddSevenP = rdd.partitionBy(new HashPartitioner(7)) rddSevenP.partitions.length 
 Int = 7 
 countByPartition(rddTenP).collect() 
 Array[Int] = Array(0, 2, 2, 2, 0, 0, 0) 

hash-partitioner-7

Resumen y notas

  • HashPartitioner toma un solo argumento que define el número de particiones
  • los valores se asignan a particiones usando hash de claves. hash función hash puede variar según el idioma (Scala RDD puede usar hashCode , DataSets usar MurmurHash 3, PySpark, portable_hash ).

    En un caso simple como este, donde key es un número entero pequeño, puede suponer que hash es una identidad ( i = hash(i) ).

    La API de Scala usa nonNegativeMod para determinar la partición en base al hash calculado,

  • si la distribución de claves no es uniforme, puede terminar en situaciones en las que parte de su clúster está inactiva

  • las llaves tienen que ser lavables. Puede consultar mi respuesta a la lista A como clave para reducirByKey de PySpark para leer sobre problemas específicos de PySpark. Otro problema posible se destaca por la documentación de HashPartitioner :

    Los arrays de Java tienen hashCodes basados ​​en las identidades de las matrices en lugar de sus contenidos, por lo que intentar particionar un RDD [Array [ ]] o RDD [(Array [ ], _)] usando un HashPartitioner producirá un resultado inesperado o incorrecto.

  • En Python 3, debes asegurarte de que hashing sea consistente. Consulte ¿Qué significa Excepción: la aleatoriedad del hash de cadena debe deshabilitarse mediante PYTHONHASHSEED mean en pyspark?

  • El particionador hash no es ni inyectivo ni surjectivo. Se pueden asignar múltiples claves a una sola partición y algunas particiones pueden permanecer vacías.

  • Tenga en cuenta que actualmente los métodos basados ​​en hash no funcionan en Scala cuando se combinan con clases de casos definidas REPL ( Igualdad de clase de caso en Apache Spark ).

  • HashPartitioner (o cualquier otro Partitioner ) mezcla los datos. A menos que la partición se reutilice entre varias operaciones, no reduce la cantidad de datos que se barajan.

RDD se distribuye esto significa que está dividido en algunas partes. Cada una de estas particiones es potencialmente en una máquina diferente. El particionador hash con arument numPartitions numPartitions en qué partición colocar el par (key, value) de la siguiente manera:

  1. Crea particiones numPartitions exactamente.
  2. Lugares (key, value) en la partición con el número Hash(key) % numPartitions

El método HashPartitioner.getPartition toma una clave como argumento y devuelve el índice de la partición a la que pertenece la clave. El particionador tiene que saber cuáles son los índices válidos, por lo que devuelve los números en el rango correcto. El número de particiones se especifica a través del argumento del constructor numPartitions .

La implementación devuelve aproximadamente key.hashCode() % numPartitions . Ver Partitioner.scala para más detalles.