Stackoverflow debido al largo linaje RDD

Tengo miles de archivos pequeños en HDFS. Necesita procesar un subconjunto de archivos un poco más pequeño (que de nuevo es miles), fileList contiene una lista de rutas de archivos que deben procesarse.

// fileList == list of filepaths in HDFS var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD for (i  line.startsWith("#####")).map(line => (filePath, line)) masterRDD = masterRDD.union(sampleRDD) } masterRDD.first() 

// Una vez fuera del ciclo, realizar cualquier acción da como resultado un error de stackoverflow debido al largo linaje de RDD

 Exception in thread "main" java.lang.StackOverflowError at scala.runtime.AbstractFunction1.(AbstractFunction1.scala:12) at org.apache.spark.rdd.UnionRDD$$anonfun$1.(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) ===================================================================== ===================================================================== at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 

En general, puede usar puntos de control para romper linajes largos. Algunos más o menos similares a esto deberían funcionar:

 import org.apache.spark.rdd.RDD import scala.reflect.ClassTag val checkpointInterval: Int = ??? def loadAndFilter(path: String) = sc.textFile(path) .filter(_.startsWith("#####")) .map((path, _)) def mergeWithLocalCheckpoint[T: ClassTag](interval: Int) (acc: RDD[T], xi: (RDD[T], Int)) = { if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint else xi._1.union(acc) } val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)] fileList.map(loadAndFilter).zipWithIndex .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval)) 

En esta situación particular, una solución mucho más simple debería ser usar el método SparkContext.union :

 val masterRDD = sc.union( fileList.map(path => sc.textFile(path) .filter(_.startsWith("#####")) .map((path, _))) ) 

Una diferencia entre estos métodos debería ser obvia cuando eche un vistazo al DAG generado por loop / reduce :

enter image description here

y una sola union :

enter image description here

Por supuesto, si los archivos son pequeños, puede combinar wholeTextFiles con flatMap y leer todos los archivos a la vez:

 sc.wholeTextFiles(fileList.mkString(",")) .flatMap{case (path, text) => text.split("\n").filter(_.startsWith("#####")).map((path, _))}