Escribir un solo archivo CSV usando spark-csv

Estoy usando https://github.com/databricks/spark-csv , estoy intentando escribir un solo archivo CSV, pero no puedo, está creando una carpeta.

Necesita una función de Scala que tome parámetros como la ruta y el nombre del archivo y escriba ese archivo CSV.

Está creando una carpeta con múltiples archivos, porque cada partición se guarda individualmente. Si necesita un único archivo de salida (aún en una carpeta) puede repartition (preferido si los datos de subida son grandes, pero requieren una mezcla):

 df .repartition(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv") 

o se coalesce :

 df .coalesce(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv") 

dataframe antes de guardar:

Todos los datos se escribirán en mydata.csv/part-00000 . Antes de usar esta opción, asegúrese de entender qué está pasando y cuál es el costo de transferir todos los datos a un solo trabajador . Si usa el sistema de archivos distribuido con replicación, los datos se transferirán varias veces, primero se buscarán en un solo trabajador y posteriormente se distribuirán a través de nodos de almacenamiento.

Alternativamente, puede dejar su código tal como está y usar herramientas de uso general como cat o HDFS getmerge para simplemente fusionar todas las partes posteriormente.

Si está ejecutando Spark con HDFS, he estado resolviendo el problema escribiendo archivos csv normalmente y aprovechando HDFS para hacer la fusión. Estoy haciendo eso en Spark (1.6) directamente:

 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ def merge(srcPath: String, dstPath: String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) // the "true" setting deletes the source files once they are merged into the new output } val newData = << create your dataframe >> val outputfile = "/user/feeds/project/outputs/subject" var filename = "myinsights" var outputFileName = outputfile + "/temp_" + filename var mergedFileName = outputfile + "/merged_" + filename var mergeFindGlob = outputFileName newData.write .format("com.databricks.spark.csv") .option("header", "false") .mode("overwrite") .save(outputFileName) merge(mergeFindGlob, mergedFileName ) newData.unpersist() 

No recuerdo dónde aprendí este truco, pero podría funcionar para ti.

Puede que llegue un poco tarde al juego aquí, pero usar coalesce(1) o repartition(1) puede funcionar para pequeños conjuntos de datos, pero los grandes conjuntos de datos se lanzarán a una partición en un nodo. Es probable que arroje errores OOM, o en el mejor de los casos, procese lentamente.

Le sugiero que use la función FileUtil.copyMerge() de Hadoop API. Esto fusionará las salidas en un solo archivo.

EDITAR – Esto efectivamente trae los datos al controlador en lugar de un nodo ejecutor. Coalesce() estaría bien si un solo ejecutor tiene más RAM para usar que el controlador.

EDIT 2: copyMerge() se elimina en Hadoop 3.0. Consulte el siguiente artículo de desbordamiento de stack para obtener más información sobre cómo trabajar con la versión más reciente: Hadoop cómo hacer CopyMerge en Hadoop 3.0

Si está utilizando Databricks y puede acomodar todos los datos en RAM en un trabajador (y así puede usar .coalesce(1) ), puede usar dbfs para buscar y mover el archivo CSV resultante:

 val fileprefix= "/mnt/aws/path/file-prefix" dataset .coalesce(1) .write //.mode("overwrite") // I usually don't use this, but you may want to. .option("header", "true") .option("delimiter","\t") .csv(fileprefix+".tmp") val partition_path = dbutils.fs.ls(fileprefix+".tmp/") .filter(file=>file.name.endsWith(".csv"))(0).path dbutils.fs.cp(partition_path,fileprefix+".tab") dbutils.fs.rm(fileprefix+".tmp",recurse=true) 

Si su archivo no cabe en la memoria RAM del trabajador, le recomendamos que considere la sugerencia de chaotic3quilibrium para usar FileUtils.copyMerge () . No he hecho esto, y todavía no sé si es posible o no, por ejemplo, en S3.

Esta respuesta se basa en las respuestas anteriores a esta pregunta, así como en mis propias pruebas del fragmento de código proporcionado. Originalmente lo publiqué en Databricks y lo vuelvo a publicar aquí.

La mejor documentación para la opción recursiva de dbfs’s rm que he encontrado está en un foro de Databricks .

repartición / fusión a 1 partición antes de guardar (aún obtendría una carpeta pero tendría un archivo de parte)

puede usar rdd.coalesce(1, true).saveAsTextFile(path)

almacenará los datos como archivo individual en path / part-00000

Hay una forma más de usar Java

 import java.io._ def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) { val p = new java.io.PrintWriter(f); try { op(p) } finally { p.close() } } printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}