Cómo transportar un RDD en Spark

Tengo un RDD como este:

1 2 3 4 5 6 7 8 9 

Es una matriz. Ahora quiero transponer el RDD de esta manera:

 1 4 7 2 5 8 3 6 9 

¿Cómo puedo hacer esto?

Digamos que tienes una matriz N × M.

Si tanto N como M son tan pequeños que puede mantener elementos N × M en la memoria, no tiene mucho sentido usar un RDD. Pero transponerlo es fácil:

 val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) val transposed = sc.parallelize(rdd.collect.toSeq.transpose) 

Si N o M son tan grandes que no puede mantener N o M entradas en la memoria, entonces no puede tener una línea RDD de este tamaño. O bien la matriz original o la transpuesta es imposible de representar en este caso.

N y M pueden ser de un tamaño mediano: puede mantener N o M entradas en la memoria, pero no puede contener N × M entradas. En este caso, debe hacer estallar la matriz y volver a armarla:

 val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) // Split the matrix into one number per line. val byColumnAndRow = rdd.zipWithIndex.flatMap { case (row, rowIndex) => row.zipWithIndex.map { case (number, columnIndex) => columnIndex -> (rowIndex, number) } } // Build up the transposed matrix. Group and sort by column index first. val byColumn = byColumnAndRow.groupByKey.sortByKey().values // Then sort by row index. val transposed = byColumn.map { indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2) } 

Un primer borrador sin usar collect (), por lo que todo funciona del lado del trabajador y no se hace nada en el controlador:

 val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position .map(v => (v._2, v._1)) // key by column position .groupByKey.sortByKey // regroup on column position, thus all elements from the first column will be in the first row .map(_._2) // discard the key, keep only value 

El problema con esta solución es que las columnas en la matriz transpuesta terminarán barajadas si la operación se realiza en un sistema distribuido. Pensará en una versión mejorada

Mi idea es que además de adjuntar el ‘número de columna’ a cada elemento de la matriz, también adjuntamos el ‘número de fila’. Por lo tanto, podríamos realizar una clave por posición de columna y reagrupar por clave como en el ejemplo, pero luego podríamos reordenar cada fila en el número de fila y luego quitar los números de fila / columna del resultado. Simplemente no tengo forma de saber el número de fila cuando importo un archivo en un RDD.

Puede pensar que es difícil adjuntar una columna y un número de fila a cada elemento de la matriz, pero supongo que ese es el precio a pagar para tener la posibilidad de procesar su entrada en forma distribuida y así manejar enormes matrices.

Actualizaré la respuesta cuando encuentre una solución al problema de ordenar.

A partir de Spark 1.6 puede usar la operación de pivote en DataFrames, dependiendo de la forma real de sus datos, si lo coloca en un DF podría pivotar columnas en filas, el siguiente blog de databricks es muy útil ya que describe en detalle un número de casos de uso pivotantes con ejemplos de código