La función devuelve una lista vacía en Spark

A continuación se muestra el código para obtener la lista de nombres de archivos en un archivo comprimido

def getListOfFilesInRepo(zipFileRDD : RDD[(String,PortableDataStream)]) : (List[String]) = { val zipInputStream = zipFileRDD.values.map(x => new ZipInputStream(x.open)) val filesInZip = new ArrayBuffer[String]() var ze : Option[ZipEntry] = None zipInputStream.foreach(stream =>{ do{ ze = Option(stream.getNextEntry); ze.foreach{ze => if(ze.getName.endsWith("java") && !ze.isDirectory()){ var fileName:String = ze.getName.substring(ze.getName.lastIndexOf("/")+1,ze.getName.indexOf(".java")) filesInZip += fileName } } stream.closeEntry() } while(ze.isDefined) println(filesInZip.toList.length) // print 889 (correct) }) println(filesInZip.toList.length) // print 0 (WHY..?) (filesInZip.toList) } 

Ejecuto el código anterior de la siguiente manera:

 scala> val zipFileRDD = sc.binaryFiles("./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip") zipFileRDD: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = ./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip BinaryFileRDD[17] at binaryFiles at :25 scala> getListOfFilesInRepo(zipRDD) 889 0 res12: List[String] = List() 

¿Por qué no obtengo 889 y, en cambio, obtengo 0?

Sucede porque filesInZip no se comparte entre los trabajadores. foreach opera en una copia local de filesInZip y cuando termina esta copia simplemente se descarta y se recolecta la basura. Si desea mantener los resultados, debe usar la transformación (lo más probable es que sea flatMap ) y devolver los valores agregados recostackdos.

 def listFiles(stream: PortableDataStream): TraversableOnce[String] = ??? zipInputStream.flatMap(listFiles) 

Puede obtener más información de Understanding closures