Chispa extrayendo valores de una fila

Tengo el siguiente dataframe

val transactions_with_counts = sqlContext.sql( """SELECT user_id AS user_id, category_id AS category_id, COUNT(category_id) FROM transactions GROUP BY user_id, category_id""") 

Estoy tratando de convertir las filas en objetos de Calificación, pero como x (0) devuelve una matriz, esto falla

 val ratings = transactions_with_counts .map(x => Rating(x(0).toInt, x(1).toInt, x(2).toInt)) 

error: value toInt no es miembro de Any

Comencemos con algunos datos ficticios:

 val transactions = Seq((1, 2), (1, 4), (2, 3)).toDF("user_id", "category_id") val transactions_with_counts = transactions .groupBy($"user_id", $"category_id") .count transactions_with_counts.printSchema // root // |-- user_id: integer (nullable = false) // |-- category_id: integer (nullable = false) // |-- count: long (nullable = false) 

Hay algunas maneras de acceder a los valores de Row y mantener los tipos esperados:

  1. La coincidencia de patrones

     import org.apache.spark.sql.Row transactions_with_counts.map{ case Row(user_id: Int, category_id: Int, rating: Long) => Rating(user_id, category_id, rating) } 
  2. Mecanografiar get* métodos como getInt , getLong :

     transactions_with_counts.map( r => Rating(r.getInt(0), r.getInt(1), r.getLong(2)) ) 
  3. Método getAs que puede usar nombres e índices:

     transactions_with_counts.map(r => Rating( r.getAs[Int]("user_id"), r.getAs[Int]("category_id"), r.getAs[Long](2) )) 

    Se puede usar para extraer correctamente los tipos definidos por el usuario, incluido mllib.linalg.Vector . Obviamente, acceder por nombre requiere un esquema.

  4. Conversión a Dataset tipado estáticamente (Spark 1.6+ / 2.0+):

     transactions_with_counts.as[(Int, Int, Long)] 

Usando Datasets puede definir Ratings de la siguiente manera:

 case class Rating(user_id: Int, category_id:Int, count:Long) 

La clase de Clasificación aquí tiene un nombre de columna ‘conteo’ en lugar de ‘calificación’ como cero sugirió. Por lo tanto, la variable de calificación se asigna de la siguiente manera:

 val transactions_with_counts = transactions.groupBy($"user_id", $"category_id").count val rating = transactions_with_counts.as[Rating] 

De esta forma, no se encontrará con errores de tiempo de ejecución en Spark porque su nombre de columna de la clase de evaluación es idéntico al nombre de columna de ‘recuento’ generado por Spark en tiempo de ejecución.

Para acceder a un valor de una fila de Dataframe , necesita usar rdd.collect of Dataframe with for loop.

Considere que su Dataframe se ve a continuación.

 val df = Seq( (1,"James"), (2,"Albert"), (3,"Pete")).toDF("user_id","name") 

Use rdd.collect en la parte superior de su Dataframe . La variable de row contendrá cada fila de Dataframe del tipo de fila rdd . Para obtener cada elemento de una fila, use row.mkString(",") que contendrá el valor de cada fila en valores separados por comas. Usando la función split (función incorporada) puede acceder al valor de cada columna de la fila rdd con índice.

 for (row <- df.rdd.collect) { var user_id = row.mkString(",").split(",")(0) var category_id = row.mkString(",").split(",")(1) } 

El código anterior parece un poco más grande en comparación con los bucles de dataframe.foreach , pero obtendrá más control sobre su lógica utilizando el código anterior.