¿Cómo puedo detectar si un dataframe de chispa tiene una columna

Cuando creo un DataFrame a partir de un archivo json en spark sql, ¿cómo puedo saber si existe una columna antes de llamar a .select

ejemplo, por ejemplo, el esquema json

 { "a": { "b": 1, "c": 2 } } 

Esto es lo que quiero hacer:

 potential_columns = Seq("b", "c", "d") df = sqlContext.read.json(filename) potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column")) 

pero no puedo encontrar una buena función para hasColumn . Lo más cerca que he estado es probar si la columna está en este conjunto algo incómodo:

 scala> df.select("a.*").columns res17: Array[String] = Array(b, c) 

Solo asume que existe y deja que falle con Try . Simple y simple y admite un anidamiento arbitrario:

 import scala.util.Try import org.apache.spark.sql.DataFrame def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess val df = sqlContext.read.json(sc.parallelize( """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil)) hasColumn(df, "foobar") // Boolean = false hasColumn(df, "foo") // Boolean = true hasColumn(df, "foo.bar") // Boolean = true hasColumn(df, "foo.bar.foobar") // Boolean = true hasColumn(df, "foo.bar.foobaz") // Boolean = false 

O incluso más simple:

 val columns = Seq( "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz") columns.flatMap(c => Try(df(c)).toOption) // Seq[org.apache.spark.sql.Column] = List( // foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13) 

Python equivalente:

 from pyspark.sql.utils import AnalysisException from pyspark.sql import Row def has_column(df, col): try: df[col] return True except AnalysisException: return False df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF() has_column(df, "foobar") ## False has_column(df, "foo") ## True has_column(df, "foo.bar") ## True has_column(df, "foo.bar.foobar") ## True has_column(df, "foo.bar.foobaz") ## False 

Otra opción que normalmente uso es

 df.columns.contains("column-name-to-check") 

Esto devuelve un valor booleano

En realidad, ni siquiera necesita llamar a seleccionar para usar columnas, solo puede llamarlo en el dataframe mismo

 // define test data case class Test(a: Int, b: Int) val testList = List(Test(1,2), Test(3,4)) val testDF = sqlContext.createDataFrame(testList) // define the hasColumn function def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName) // then you can just use it on the DF with a given column name hasColumn(testDF, "a") // <-- true hasColumn(testDF, "c") // <-- false 

De forma alternativa, puede definir una clase implícita utilizando el patrón pimp my library para que el método hasColumn esté disponible en sus marcos de datos directamente

 implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) { def hasColumn(colName: String) = df.columns.contains(colName) } 

Entonces puedes usarlo como:

 testDF.hasColumn("a") // <-- true testDF.hasColumn("c") // <-- false 

Su otra opción para esto sería hacer alguna manipulación de matriz (en este caso, una intersect ) en las df.columns y sus df.columns .

 // Loading some data (so you can just copy & paste right into spark-shell) case class Document( a: String, b: String, c: String) val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF // The columns we want to extract val potential_columns = Seq("b", "c", "d") // Get the intersect of the potential columns and the actual columns, // we turn the array of strings into column objects // Finally turn the result into a vararg (: _*) df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show 

Por desgracia, esto no funcionará para su escenario de objeto interno anterior. Tendrá que mirar el esquema para eso.

Voy a cambiar tus potential_columns por nombres de columna totalmente calificados

 val potential_columns = Seq("ab", "ac", "ad") // Our object model case class Document( a: String, b: String, c: String) case class Document2( a: Document, b: String, c: String) // And some data... val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF // We go through each of the fields in the schema. // For StructTypes we return an array of parentName.fieldName // For everything else we return an array containing just the field name // We then flatten the complete list of field names // Then we intersect that with our potential_columns leaving us just a list of column we want // we turn the array of strings into column objects // Finally turn the result into a vararg (: _*) df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show 

Esto solo tiene un nivel de profundidad, por lo que para hacerlo genérico tendrías que hacer más trabajo.

Try no es óptimo, ya que evaluará la expresión dentro de Try antes de tomar la decisión.

Para grandes conjuntos de datos, use lo siguiente en Scala :

 df.schema.fieldNames.contains("column_name")