Cómo forzar la evaluación de DataFrame en Spark

Algunas veces (p. Ej., Para pruebas y marcación) deseo forzar la ejecución de las transformaciones definidas en un DataFrame. AFAIK llamando a una acción como count no garantiza que todas las Columns sean realmente calculadas, show solo puede calcular un subconjunto de todas las Rows (ver ejemplos a continuación)

Mi solución es escribir el DataFrame en HDFS usando df.write.saveAsTable , pero esto “desordena” mi sistema con tablas que no quiero guardar más.

Entonces, ¿cuál es la mejor manera de activar la evaluación de un DataFrame ?

Editar:

Tenga en cuenta que también hay una discusión reciente sobre la lista de desarrolladores de chispa: http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each- row-td21018.html

Hice un pequeño ejemplo que muestra que count con DataFrame no evalúa todo (probado con Spark 1.6.3 y spark-master = local[2] ):

 val df = sc.parallelize(Seq(1)).toDF("id") val myUDF = udf((i:Int) => {throw new RuntimeException;i}) df.withColumn("test",myUDF($"id")).count // runs fine df.withColumn("test",myUDF($"id")).show() // gives Exception 

Usando la misma lógica, aquí un ejemplo que show no evalúa todas las filas:

 val df = sc.parallelize(1 to 10).toDF("id") val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i}) df.withColumn("test",myUDF($"id")).show(5) // runs fine df.withColumn("test",myUDF($"id")).show(10) // gives Exception 

Edit 2: Para Eliasah: La excepción dice esto:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): java.lang.RuntimeException at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(:68) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:68) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:68) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) . . . . Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1376) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:74) . . . . 

Supongo que simplemente obtener un rdd subyacente de DataFrame y desencadenar una acción sobre él debería lograr lo que estás buscando.

 df.withColumn("test",myUDF($"id")).rdd.count // this gives proper exceptions 

Es un poco tarde, pero esta es la razón fundamental: count no actúa igual en RDD y DataFrame .

En DataFrame s hay una optimización, ya que en algunos casos no es necesario cargar datos para saber realmente la cantidad de elementos que tiene (especialmente en el caso del suyo donde no se barajan los datos). Por lo tanto, el DataFrame materializado cuando se llama al count no cargará ningún dato y no pasará al lanzamiento de excepción. Puede realizar fácilmente el experimento definiendo su propia DefaultSource y Relation y ver que el count llamadas en un DataFrame siempre terminará en el método buildScan sin columnas requiredColumns sin importar cuántas columnas haya seleccionado (cf. org.apache.spark.sql.sources.interfaces para entender más). En realidad es una optimización muy eficiente 😉

Sin embargo, en RDD no existen tales optimizaciones (por eso siempre se debe tratar de usar DataFrame s cuando sea posible). Por lo tanto, el count de RDD ejecuta todo el linaje y devuelve la sum de todos los tamaños de los iteradores que componen cualquier partición.

Llamar dataframe.count entra en la primera explicación, pero al llamar dataframe.rdd.count pasa al segundo cuando construiste un RDD fuera de tu DataFrame . Tenga en cuenta que la invocación de dataframe.cache().count dataframe fuerza a que el dataframe de datos se materialice ya que requiere que Spark dataframe en caché los resultados (por lo tanto, necesita cargar todos los datos y transformarlos). Pero tiene el efecto secundario de almacenar sus datos en caché …

Parece que df.cache.count es el camino a seguir:

 scala> val myUDF = udf((i:Int) => {if(i==1000) throw new RuntimeException;i}) myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(,IntegerType,Some(List(IntegerType))) scala> val df = sc.parallelize(1 to 1000).toDF("id") df: org.apache.spark.sql.DataFrame = [id: int] scala> df.withColumn("test",myUDF($"id")).show(10) [rdd_51_0] +---+----+ | id|test| +---+----+ | 1| 1| | 2| 2| | 3| 3| | 4| 4| | 5| 5| | 6| 6| | 7| 7| | 8| 8| | 9| 9| | 10| 10| +---+----+ only showing top 10 rows scala> df.withColumn("test",myUDF($"id")).count res13: Long = 1000 scala> df.withColumn("test",myUDF($"id")).cache.count org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (int) => int) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) . . . Caused by: java.lang.RuntimeException 

Fuente