¿Cómo puedo iterar RDD en apache spark (scala)?

Uso el siguiente comando para completar un RDD con un conjunto de matrices que contienen 2 cadenas [“nombre de archivo”, “contenido”].

Ahora quiero iterar sobre cada una de esas ocurrencias para hacer algo con cada nombre de archivo y contenido.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*") 

Parece que no puedo encontrar ninguna documentación sobre cómo hacer esto.

Entonces, lo que quiero es esto:

 foreach occurrence-in-the-rdd{ //do stuff with the array found on loccation n of the RDD } 

Las operaciones fundamentales en Spark son map y filter .

 val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") } 

el txtRDD ahora solo contendrá archivos que tengan la extensión “.txt”

Y si quieres contar esas palabras, puedes decir

 //split the documents into words in one long list val words = txtRDD flatMap { case (id,text) => text.split("\\s+") } // give each word a count of 1 val wordT = words map (x => (x,1)) //sum up the counts for each word val wordCount = wordsT reduceByKey((a, b) => a + b) 

Desea utilizar mapPartitions cuando tenga que realizar una costosa inicialización, por ejemplo, si desea hacer Reconocimiento de entidad con nombre con una biblioteca como Stanford coreNLP tools.

Master map , filter , flatMap y reduce , y estás en camino de dominar Spark.

Llama a varios métodos en el RDD que aceptan funciones como parámetros.

 // set up an example -- an RDD of arrays val sparkConf = new SparkConf().setMaster("local").setAppName("Example") val sc = new SparkContext(sparkConf) val testData = Array(Array(1,2,3), Array(4,5,6,7,8)) val testRDD = sc.parallelize(testData, 2) // Print the RDD of arrays. testRDD.collect().foreach(a => println(a.size)) // Use map() to create an RDD with the array sizes. val countRDD = testRDD.map(a => a.size) // Print the elements of this new RDD. countRDD.collect().foreach(a => println(a)) // Use filter() to create an RDD with just the longer arrays. val bigRDD = testRDD.filter(a => a.size > 3) // Print each remaining array. bigRDD.collect().foreach(a => { a.foreach(e => print(e + " ")) println() }) } 

Tenga en cuenta que las funciones que escribe aceptan un solo elemento RDD como entrada y devuelven datos de algún tipo uniforme, por lo que crea un RDD del último tipo. Por ejemplo, countRDD es un RDD[Int] , mientras que bigRDD sigue siendo un RDD[Array[Int]] .

Probablemente sea tentador en algún momento escribir un foreach que modifique algunos otros datos, pero debe resistir por los motivos que se describen en esta pregunta y respuesta .

Editar: No intente imprimir RDD grandes s

Varios lectores han preguntado sobre el uso de collect() e println() para ver sus resultados, como en el ejemplo anterior. Por supuesto, esto solo funciona si se está ejecutando en un modo interactivo como Spark REPL (read-eval-print-loop). Lo mejor es llamar a collect() en el RDD para obtener una matriz secuencial para la impresión ordenada. Pero collect() puede traer demasiados datos y, en cualquier caso, puede imprimirse demasiado. Aquí hay algunas formas alternativas de obtener información sobre sus RDD , si son grandes:

  1. RDD.take() : Esto le da un control fino sobre la cantidad de elementos que obtiene, pero no de dónde vienen, definidos como los “primeros”, que es un concepto tratado por varias otras preguntas y respuestas aquí.

     // take() returns an Array so no need to collect() myHugeRDD.take(20).foreach(a => println(a)) 
  2. RDD.sample() : Esto le permite (aproximadamente) controlar la fracción de resultados que obtiene, si el muestreo usa reemplazo, e incluso opcionalmente el número aleatorio de semillas.

     // sample() does return an RDD so you may still want to collect() myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a)) 
  3. RDD.takeSample() : este es un híbrido: usa muestreo aleatorio que puede controlar, pero ambos le permiten especificar el número exacto de resultados y devolver una Array .

     // takeSample() returns an Array so no need to collect() myHugeRDD.takeSample(true, 20).foreach(a => println(a)) 
  4. RDD.count() : A veces, la mejor idea proviene de la cantidad de elementos con los que terminaste: a menudo hago esto primero.

     println(myHugeRDD.count()) 

Intentaría hacer uso de una función de mapeo de particiones. El siguiente código muestra cómo se puede procesar un conjunto de datos RDD en un bucle para que cada entrada pase por la misma función. Me temo que no tengo ningún conocimiento sobre Scala, así que todo lo que tengo para ofrecer es código Java . Sin embargo, no debería ser muy difícil traducirlo a Scala.

 JavaRDD res = file.mapPartitions(new FlatMapFunction  ,String>(){ @Override public Iterable call(Iterator  t) throws Exception { ArrayList tmpRes = new ArrayList <>(); String[] fillData = new String[2]; fillData[0] = "filename"; fillData[1] = "content"; while(t.hasNext()){ tmpRes.add(fillData); } return Arrays.asList(tmpRes); } }).cache(); 

lo que devuelve wholeTextFiles es un par RDD:

def wholeTextFiles (ruta: String, minPartitions: Int): RDD [(String, String)]

Lea un directorio de archivos de texto de HDFS, un sistema de archivos local (disponible en todos los nodos) o cualquier URI del sistema de archivos soportado por Hadoop. Cada archivo se lee como un registro único y se devuelve en un par clave-valor, donde la clave es la ruta de cada archivo, el valor es el contenido de cada archivo.

Aquí hay un ejemplo de cómo leer los archivos en una ruta local y luego imprimir cada nombre de archivo y contenido.

 val conf = new SparkConf().setAppName("scala-test").setMaster("local") val sc = new SparkContext(conf) sc.wholeTextFiles("file:///Users/leon/Documents/test/") .collect .foreach(t => println(t._1 + ":" + t._2)); 

el resultado:

 file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12} file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22} file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18} 

o convirtiendo el par RDD a un RDD primero

 sc.wholeTextFiles("file:///Users/leon/Documents/test/") .map(t => t._2) .collect .foreach { x => println(x)} 

el resultado:

 {"name":"tom","age":12} {"name":"john","age":22} {"name":"leon","age":18} 

Y creo que wholeTextFiles es más compatible con archivos pequeños.

 for (element <- YourRDD) { // do what you want with element in each iteration, and if you want the index of element, simply use a counter variable in this loop begining from 0 

Println (elemento._1) // esto imprimirá todos los nombres de archivo}