¿Cómo leer múltiples archivos de texto en un solo RDD?

Quiero leer un montón de archivos de texto de una ubicación hdfs y realizar un mapeo en él en una iteración usando chispa.

JavaRDD records = ctx.textFile(args[1], 1); es capaz de leer solo un archivo a la vez.

Quiero leer más de un archivo y procesarlos como un solo RDD. ¿Cómo?

Puede especificar directorios completos, usar comodines e incluso CSV de directorios y comodines. P.ej:

 sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file") 

Como Nick Chammas señala, esta es una exposición de FileInputFormat de Hadoop y, por lo tanto, también funciona con Hadoop (y Scalding).

Use la union siguiente manera:

 val sc = new SparkContext(...) val r1 = sc.textFile("xxx1") val r2 = sc.textFile("xxx2") ... val rdds = Seq(r1, r2, ...) val bigRdd = sc.union(rdds) 

Entonces el bigRdd es el RDD con todos los archivos.

Puede usar una sola invocación de archivo de texto para leer múltiples archivos. Scala:

 sc.textFile(','.join(files)) 

Puedes usar esto

Primero, puede obtener un búfer / lista de rutas S3:

 import scala.collection.JavaConverters._ import java.util.ArrayList import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import com.amazonaws.services.s3.model.ListObjectsRequest def listFiles(s3_bucket:String, base_prefix : String) = { var files = new ArrayList[String] //S3 Client and List Object Request var s3Client = new AmazonS3Client(); var objectListing: ObjectListing = null; var listObjectsRequest = new ListObjectsRequest(); //Your S3 Bucket listObjectsRequest.setBucketName(s3_bucket) //Your Folder path or Prefix listObjectsRequest.setPrefix(base_prefix) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); //Removing Base Directory Name files.remove(0) //Creating a Scala List for same files.asScala } 

Ahora pase este objeto List a la siguiente pieza de código, nota: sc es un objeto de SQLContext

 var df: DataFrame = null; for (file <- files) { val fileDf= sc.textFile(file) if (df!= null) { df= df.unionAll(fileDf) } else { df= fileDf } } 

Ahora tienes un RDD Unificado final, es decir, df

Opcional, y también puede reparticionarlo en un solo BigRDD

 val files = sc.textFile(filename, 1).repartition(1) 

Reparticionar siempre funciona: D

En PySpark, he encontrado una forma adicional útil de analizar archivos. Tal vez haya un equivalente en Scala, pero no me siento lo suficientemente cómodo para hacer una traducción funcional. Es, en efecto, una llamada de archivo de texto con la adición de tags (en el siguiente ejemplo la clave = nombre de archivo, valor = 1 línea del archivo).

Texto “etiquetado”

entrada:

 import glob from pyspark import SparkContext SparkContext.stop(sc) sc = SparkContext("local","example") # if running locally sqlContext = SQLContext(sc) for filename in glob.glob(Data_File + "/*"): Spark_Full += sc.textFile(filename).keyBy(lambda x: filename) 

output: array con cada entrada que contiene una tupla usando filename-as-key y con value = cada línea de archivo. (Técnicamente, con este método también puede usar una clave diferente además del nombre de ruta de archivo real, tal vez una representación de hash para guardar en la memoria). es decir.

 [('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'), ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'), ...] 

También puede recombinarse como una lista de líneas:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

 [('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']), ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])] 

O vuelva a combinar los archivos completos en cadenas únicas (en este ejemplo, el resultado es el mismo que obtiene de wholeTextFiles, pero con la cadena “file:” eliminada del archivo).

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()

puede usar – JavaRDD records = sc.wholeTextFiles (“ruta de su directorio”) aquí obtendrá la ruta de su archivo y el contenido de ese archivo. para que pueda realizar cualquier acción de un archivo completo a la vez que ahorra la sobrecarga

Todas las respuestas son correctas con sc.textFile

Me preguntaba por qué no wholeTextFiles en este caso …

 sc.wholeTextFiles(yourfileListFromFolder.mkString(",")) .flatMap{case (path, text) ... 

Una limitación es que tenemos que cargar archivos pequeños, de lo contrario el rendimiento será malo y puede llevar a OOM.

Nota :

  • El archivo completo debe caber en la memoria
  • Bueno para formatos de archivo que NO son divisibles por línea … como archivos XML

Referencia adicional para visitar

Hay una solución limpia directa disponible. Use el método wholeTextFiles (). Esto tomará un directorio y formará un par de valores clave. El RDD devuelto será un par RDD. Encuentra debajo la descripción de Spark docs :

SparkContext.wholeTextFiles le permite leer un directorio que contiene múltiples archivos de texto pequeños, y devuelve cada uno de ellos como pares (nombre de archivo, contenido). Esto está en contraste con textFile, que devolvería un registro por línea en cada archivo

PRUEBA ESTA interfaz utilizada para escribir un DataFrame en sistemas de almacenamiento externos (por ejemplo, sistemas de archivos, tiendas de valores clave, etc.). Use DataFrame.write () para acceder a esto.

Nuevo en la versión 1.4.

csv (path, mode = None, compression = None, sep = None, quote = None, escape = None, header = None, nullValue = None, escapeQuotes = None, quoteAll = None, dateFormat = None, timestampFormat = None) Guarda la contenido del DataFrame en formato CSV en la ruta especificada.

Parámetros: ruta: la ruta en cualquier modo de sistema de archivos compatible con Hadoop: especifica el comportamiento de la operación de guardar cuando los datos ya existen.

append: añada el contenido de este DataFrame a los datos existentes. sobrescribir: sobrescribe datos existentes. ignorar: ignore silenciosamente esta operación si los datos ya existen. error (caso predeterminado): Lanzar una excepción si los datos ya existen. compresión – códec de compresión para usar al guardar en el archivo. Este puede ser uno de los nombres abreviados insensibles a mayúsculas y minúsculas (ninguno, bzip2, gzip, lz4, snappy y deflate). sep: establece el carácter individual como separador para cada campo y valor. Si None está configurado, usa el valor predeterminado,,. quote: establece el único carácter utilizado para el escape de valores cotizados donde el separador puede ser parte del valor. Si None está configurado, usa el valor predeterminado, “. Si desea desactivar las citas, necesita establecer una cadena vacía. Escape – establece el único carácter utilizado para el escape de comillas dentro de un valor ya citado. , usa el valor predeterminado, \ escapeQuotes – Un indicador que indica si los valores que contienen comillas siempre deben estar entre comillas. Si None está establecido, usa el valor predeterminado true, escapando todos los valores que contienen un carácter de comilla. quoteAll – Una bandera que indica si todos los valores siempre deben ir entre comillas. Si no está configurado, usa el valor predeterminado falso, solo escapando valores que contienen un carácter de comilla. encabezado – escribe los nombres de las columnas como la primera línea. Si no está configurado, usa el valor predeterminado value, false. nullValue – establece la representación de cadena de un valor nulo. Si None está establecido, usa el valor predeterminado, cadena vacía. dateFormat – establece la cadena que indica un formato de fecha. Los formatos de fecha personalizados siguen los formatos en java.text .SimpleDate Formato. Esto aplica al tipo de fecha. Si None está configurado, usa el valor de valor predeterminado, aaaa-MM-dd. timestampFormat – establece la cadena que indica un formato de marca de tiempo. Los formatos de fecha personalizados siguen los formatos en java.text.SimpleDateFormat. Esto se aplica al tipo de marca de tiempo. Si None está configurado, usa el valor de valor predeterminado, aaaa-MM-dd’T’HH: mm: ss.SSSZZ.

 rdd = textFile('/data/{1.txt,2.txt}')