Spark 2.0 Dataset vs DataFrame

comenzando con spark 2.0.1 recibí algunas preguntas. Leí mucha documentación, pero hasta ahora no he podido encontrar suficientes respuestas:

  • Cuál es la diferencia entre
    • df.select("foo")
    • df.select($"foo")
  • entiendo correctamente que
    • myDataSet.map(foo.someVal) es seguro y no se convertirá en RDD sino que permanecerá en la representación del conjunto de datos / sin gastos adicionales (en cuanto al rendimiento para 2.0.0)
  • todos los demás comandos, por ej. select, … son solo azúcar sintáctico. No son seguros y se puede usar un mapa en su lugar. ¿Cómo podría df.select("foo") type-safe sin una statement del mapa?
    • ¿Por qué debería usar un UDF / UADF en lugar de un mapa (suponiendo que el mapa permanezca en la representación del conjunto de datos)?

  1. La diferencia entre df.select("foo") y df.select($"foo") es la firma. El primero toma al menos un String , el último cero o más Columns . No hay diferencia práctica más allá de eso.
  2. myDataSet.map(foo.someVal) verifica, pero como cualquier operación de Dataset usa RDD de objetos, y en comparación con DataFrame operaciones de DataFrame , hay una sobrecarga significativa. Echemos un vistazo a un ejemplo simple:

     case class FooBar(foo: Int, bar: String) val ds = Seq(FooBar(1, "x")).toDS ds.map(_.foo).explain 
     == Physical Plan == *SerializeFromObject [input[0, int, true] AS value#123] +- *MapElements , obj#122: int +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar +- LocalTableScan [foo#117, bar#118] 

    Como puede ver, este plan de ejecución requiere acceso a todos los campos y tiene que DeserializeToObject .

  3. No. En general, otros métodos no son azúcares sintácticos y generan un plan de ejecución significativamente diferente. Por ejemplo:

     ds.select($"foo").explain 
     == Physical Plan == LocalTableScan [foo#117] 

    En comparación con el plan que se muestra antes, puede acceder directamente a la columna. No es tanto una limitación de la API sino el resultado de una diferencia en la semántica operacional.

  4. ¿Cómo podría df.select (“foo”) type-safe sin una statement del mapa?

    No hay tal opción. Mientras que las columnas mecanografiadas le permiten transformar estáticamente Dataset en otro Dataset tipado estáticamente:

     ds.select($"bar".as[Int]) 

    no hay tipo seguro. Hay algunos otros bashs de incluir operaciones optimizadas de tipo seguro, como agregaciones tipadas , pero esta API experimental.

  5. ¿Por qué debería usar un UDF / UADF en lugar de un mapa?

    Depende completamente de ti. Cada estructura de datos distribuida en Spark ofrece sus propias ventajas y desventajas (véase, por ejemplo, Spark UDAF con ArrayType como problemas de rendimiento de bufferSchema ).

Personalmente, considero que Dataset estáticamente tipado es el menos útil:

  • No proporcione el mismo rango de optimizaciones que Dataset[Row] (aunque comparten el formato de almacenamiento y algunas optimizaciones del plan de ejecución, no se beneficia completamente de la generación de código o almacenamiento fuera de almacenamiento) ni acceso a todas las capacidades analíticas del DataFrame .

  • Las transformaciones tipadas son recuadros negros y crean efectivamente una barrera de análisis para el optimizador. Por ejemplo, las selecciones (filtros) no pueden ser presionadas sobre la transformación tipada:

     ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain 
     == Physical Plan == *Filter (foo#133 = 1) +- *Filter .apply +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- LocalTableScan [foo#133, bar#134] 

    Comparado con:

     ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain 
     == Physical Plan == *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- *Filter (foo#133 = 1) +- LocalTableScan [foo#133, bar#134] 

    Esto afecta funciones como pushdown de predicado o pushdown de proyección.

  • No son tan flexibles como los RDDs con solo un pequeño subconjunto de tipos admitidos de forma nativa.

  • La “Seguridad de tipo” con Encoders es discutible cuando el Dataset se convierte utilizando el método. Debido a que la forma de los datos no está codificada con una firma, un comstackdor solo puede verificar la existencia de un Encoder .

Preguntas relacionadas:

  • Realice una unión escrita a máquina en Scala con Spark Datasets
  • Spark 2.0 DataSets groupByKey y dividir el funcionamiento y tipo de seguridad

Spark Dataset es mucho más poderoso que Spark Dataframe . Pequeño ejemplo: solo puede crear Dataframe de Row , Tuple o cualquier tipo de datos primitivo, pero Dataset le da poder para crear Dataset de cualquier tipo no primitivo también. es decir, puede crear literalmente Dataset de tipo de objeto.

Ex:

 case class Employee(id:Int,name:String) Dataset[Employee] // is valid Dataframe[Employee] // is invalid