Spark-Obteniendo nombre de archivo en RDDs

Estoy tratando de procesar 4 directorios de archivos de texto que siguen creciendo cada día. Lo que tengo que hacer es, si alguien está tratando de buscar un número de factura, debería darles la lista de archivos que lo tienen.

Pude mapear y reducir los valores en archivos de texto al cargarlos como RDD. Pero, ¿cómo puedo obtener el nombre del archivo y otros atributos del archivo?

Desde Spark 1.6 puede combinar la fuente de datos de text y la función de nombre de input_file_name siguiente manera:

Scala :

 import org.apache.spark.sql.functions.input_file_name val inputPath: String = ??? spark.read.text(inputPath) .select(input_file_name, $"value") .as[(String, String)] // Optionally convert to Dataset .rdd // or RDD 

Python :

( Las versiones anteriores a 2.x son defectuosas y es posible que no conserven nombres cuando se convierten a RDD ):

 from pyspark.sql.functions import input_file_name (spark.read.text(input_path) .select(input_file_name(), "value")) .rdd) 

Esto también se puede usar con otros formatos de entrada.

Si sus archivos de texto son lo suficientemente pequeños, puede usar SparkContext.wholeTextFiles que devuelve un RDD de (filename,content) de (filename,content) .

Si tus archivos de texto son demasiado grandes para SparkContext.wholeTextFiles , deberías usar un InputFormat personalizado (simple) y luego llamar a SparkContext.hadoopRDD

El InputFormat necesitaría devolver una tupla (nombre de archivo, línea) en lugar de línea, entonces usted podría filtrar utilizando un predicado que mira el contenido de la línea, luego lo singularice y recoja los nombres de los archivos.

Desde Spark, el código sería algo así como:

 val ft = classOf[FileNamerInputFormat] val kt = classOf[String] val vt = classOf[String] val hadoopConfig = new Configuration(sc.hadoopConfiguration) sc.newAPIHadoopFile(path, ft, kt, vt, hadoopConfig) .filter { case (f, l) => isInteresting(l) } .map { case (f, _) => f } .distinct() .collect() 

Puedes probar esto si estás en pyspark:

  test = sc.wholeTextFiles("pathtofile") 

Obtendrás un RDD resultante con el primer elemento = filepath y el segundo elemento = content

Puede usar WholeTextFile() para lograr esto. Sin embargo, si los archivos de entrada son grandes, sería contraproducente utilizar WholeTextFile() ya que pone todo el contenido del archivo en un solo registro.

La mejor forma de recuperar nombres de archivos en dicho escenario es usar mapPartitionsWithInputSplit() . Puede encontrar un ejemplo de trabajo usando este escenario en mi blog .

Parece excesivo usar Spark directamente … Si estos datos van a ser ‘recostackdos’ para el controlador, ¿por qué no usar la API HDFS? A menudo Hadoop se incluye con Spark. Aquí hay un ejemplo:

 import org.apache.hadoop.fs._ import org.apache.hadoop.conf._ val fileSpec = "/data/Invoices/20171123/21" val conf = new Configuration() val fs = org.apache.hadoop.fs.FileSystem.get(new URI("hdfs://nameNodeEneteredHere"),conf) val path = new Path(fileSpec) // if(fs.exists(path) && fs.isDirectory(path) == true) ... val fileList = fs.listStatus(path) 

Luego, con println(fileList(0)) , la información (formateada) como este primer elemento (como ejemplo) puede verse como org.apache.hadoop.fs.FileStatus :

 FileStatus { path=hdfs://nameNodeEneteredHere/Invoices-0001.avro; isDirectory=false; length=29665563; replication=3; blocksize=134217728; modification_time=1511810355666; access_time=1511838291440; owner=codeaperature; group=supergroup; permission=rw-r--r--; isSymlink=false } 

Donde fileList(0).getPath dará hdfs://nameNodeEneteredHere/Invoices-0001.avro .

Supongo que este medio de lectura de archivos sería principalmente con el namenode HDFS y no dentro de cada ejecutor. TLDR; Apuesto a que Spark probablemente sondearía el namenode para obtener RDD. Si la llamada Spark subyacente sondea el namenode para administrar los RDD, quizás lo anterior sea una solución eficiente. Aún así, los comentarios contributivos que sugieran cualquier dirección serían bienvenidos.