Spark Row a JSON

Me gustaría crear un JSON a partir de un dataframe de Spark v.1.6 (con scala). Sé que existe la solución simple de hacer df.toJSON .

Sin embargo, mi problema se ve un poco diferente. Considere por ejemplo un dataframe con las siguientes columnas:

 | A | B | C1 | C2 | C3 | ------------------------------------------- | 1 | test | ab | 22 | TRUE | | 2 | mytest | gh | 17 | FALSE | 

Me gustaría tener al final un dataframe con

 | A | B | C | ---------------------------------------------------------------- | 1 | test | { "c1" : "ab", "c2" : 22, "c3" : TRUE } | | 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE } | 

donde C es un JSON que contiene C1 , C2 , C3 . Lamentablemente, en el momento de la comstackción no sé cómo se ve el dataframe (excepto las columnas A y B que siempre están “corregidas”).

En cuanto a la razón por la que necesito esto: estoy usando Protobuf para enviar los resultados. Desafortunadamente, mi dataframe a veces tiene más columnas de las esperadas y todavía las enviaría a través de Protobuf, pero no quiero especificar todas las columnas en la definición.

¿Cómo puedo conseguir esto?

Spark 2.1 debe tener soporte nativo para este caso de uso (ver # 15354 ).

 import org.apache.spark.sql.functions.to_json df.select(to_json(struct($"c1", $"c2", $"c3"))) 

Primero permitamos convertir C a una struct :

 val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C")) 

Esta es la estructura que se puede convertir a JSONL usando toJSON como antes:

 dfStruct.toJSON.collect // Array[String] = Array( // {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, // {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}}) 

No conozco ningún método integrado que pueda convertir una sola columna, pero puede convertirlo individualmente y join o usar su analizador JSON favorito en una UDF.

 case class C(C1: String, C2: Int, C3: Boolean) object CJsonizer { import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization.write implicit val formats = Serialization.formats(org.json4s.NoTypeHints) def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3)) } val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => CJsonizer.toJSON(c1, c2, c3)) df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3")) 

Aquí, no hay un analizador JSON, y se adapta a su esquema:

 import org.apache.spark.sql.functions.{col, concat, concat_ws, lit} df.select( col(df.columns(0)), col(df.columns(1)), concat( lit("{"), concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => { val c = dt._1; val t = dt._2; concat( lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "") ), col(c), lit(if(t=="StringType") "\""; else "") ) }):_*), lit("}") ) as "C" ).collect()