cómo hacer saveAsTextFile NO divide la salida en múltiples archivos?

Cuando uso Scala en Spark, cada vez que vuelco los resultados usando saveAsTextFile , parece dividir el resultado en varias partes. Solo le paso un parámetro (ruta).

 val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) year.saveAsTextFile("year") 
  1. ¿El número de salidas corresponde a la cantidad de reductores que usa?
  2. ¿Esto significa que la salida está comprimida?
  3. Sé que puedo combinar la salida usando bash, pero ¿hay una opción para almacenar la salida en un solo archivo de texto, sin dividir? Miré los documentos de la API, pero no dice mucho sobre esto.

La razón por la que se guarda como archivos múltiples es porque el cálculo se distribuye. Si la salida es lo suficientemente pequeña como para pensar que puede caber en una máquina, entonces puede finalizar su progtwig con

 val arr = year.collect() 

Y luego guarde la matriz resultante como un archivo. Otra forma sería usar un particionador personalizado, partitionBy , y hacerlo de modo que todo vaya a una partición, aunque eso no es aconsejable porque no obtendrá paralelización.

Si necesita que el archivo se guarde con saveAsTextFile , puede usar coalesce(1,true).saveAsTextFile() . Esto básicamente significa que los cálculos se unen a una partición. También puede usar repartition(1) que es solo un contenedor para coalesce con el argumento de mezcla establecido en verdadero. Mirando a través de la fuente de RDD.scala es como me di cuenta de la mayoría de estas cosas, deberías echarle un vistazo.

Podría llamar a coalesce(1) y luego saveAsTextFile() , pero podría ser una mala idea si tiene muchos datos. Se generan archivos separados por división como en Hadoop para permitir que los mapeadores y los reductores por separado escriban en diferentes archivos. Tener un solo archivo de salida es solo una buena idea si tiene muy pocos datos, en cuyo caso también podría recostackr (), como dijo @aaronman.

Para aquellos que trabajan con un conjunto de datos más grande :

  • rdd.collect() no debe usarse en este caso, ya que recostackrá todos los datos como una Array en el controlador, que es la forma más fácil de obtener memoria insuficiente.

  • rdd.coalesce(1).saveAsTextFile() tampoco debe utilizarse ya que se perderá el paralelismo de las fases iniciales en un solo nodo, donde se almacenarán los datos.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() es la mejor opción simple ya que mantendrá el procesamiento de las tareas de subida paralelas y luego solo realizará la reproducción aleatoria en un nodo ( rdd.repartition(1).saveAsTextFile() es un sinónimo exacto).

  • rdd.saveAsSingleTextFile() como se rdd.saveAsSingleTextFile() continuación, permite almacenar el rdd en un solo archivo con un nombre específico, manteniendo las propiedades de paralelismo de rdd.coalesce(1, shuffle = true).saveAsTextFile() .

Lo que puede ser inconveniente con rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") es que en realidad produce un archivo cuya ruta es path/to/file.txt/part-00000 y no path/to/file.txt .

La siguiente solución rdd.saveAsSingleTextFile("path/to/file.txt") realmente producirá un archivo cuya ruta es path/to/file.txt :

 package com.whatever.package import org.apache.spark.rdd.RDD import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.io.compress.CompressionCodec object SparkHelper { // This is an implicit class so that saveAsSingleTextFile can be attached to // SparkContext and be called like this: sc.saveAsSingleTextFile implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal { def saveAsSingleTextFile(path: String): Unit = saveAsSingleTextFileInternal(path, None) def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = saveAsSingleTextFileInternal(path, Some(codec)) private def saveAsSingleTextFileInternal( path: String, codec: Option[Class[_ <: CompressionCodec]] ): Unit = { // The interface with hdfs: val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration) // Classic saveAsTextFile in a temporary folder: hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already codec match { case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec) case None => rdd.saveAsTextFile(s"$path.tmp") } // Merge the folder of resulting part-xxxxx into one file: hdfs.delete(new Path(path), true) // to make sure it's not there already FileUtil.copyMerge( hdfs, new Path(s"$path.tmp"), hdfs, new Path(path), true, rdd.sparkContext.hadoopConfiguration, null ) hdfs.delete(new Path(s"$path.tmp"), true) } } } 

que se puede usar de esta manera:

 import com.whatever.package.SparkHelper.RDDExtensions rdd.saveAsSingleTextFile("path/to/file.txt") // Or if the produced file is to be compressed: import org.apache.hadoop.io.compress.GzipCodec rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec]) 

Este fragmento primero almacena el rdd con rdd.saveAsTextFile("path/to/file.txt") en una path/to/file.txt.tmp carpeta temporal path/to/file.txt.tmp como si no quisiéramos almacenar datos en un archivo (lo que mantiene el procesamiento de tareas upstream paralelo).

Y solo entonces, utilizando la API del sistema de archivos hadoop , procedemos con la fusión ( FileUtil.copyMerge() ) de los diferentes archivos de salida para crear nuestra path/to/file.txt final de salida de archivo único path/to/file.txt .

Como han mencionado otros, puede recostackr o fusionar su conjunto de datos para forzar a Spark a producir un único archivo. Pero esto también limita el número de tareas de Spark que pueden funcionar en su conjunto de datos en paralelo. Prefiero dejar que cree cien archivos en el directorio HDFS de salida, luego use hadoop fs -getmerge /hdfs/dir /local/file.txt para extraer los resultados en un único archivo en el sistema de archivos local. Esto tiene más sentido cuando su resultado es un informe relativamente pequeño, por supuesto.

Podrás hacerlo en la próxima versión de Spark, en la versión actual 1.0.0 no es posible a menos que lo hagas manualmente de alguna manera, por ejemplo, como mencionaste, con una llamada de script bash.

También quiero mencionar que la documentación establece claramente que los usuarios deben tener cuidado cuando las llamadas se fusionan con un número realmente pequeño de particiones. esto puede provocar que las particiones en sentido ascendente hereden este número de particiones.

No recomendaría usar coalesce (1) a menos que realmente se requiera.

En Spark 1.6.1, el formato es el siguiente. Crea un solo archivo de salida. Es una buena práctica usarlo si la salida es lo suficientemente pequeña como para manejarlo. Básicamente lo que hace es devolver un nuevo RDD que se reduce a particiones numPartitions. Si estás haciendo una unión drástica, por ejemplo, para numPartitions = 1, esto puede hacer que su computación tenga lugar en menos nodos de los que desea (por ejemplo, un nodo en el caso de numPartitions = 1)

 pair_result.coalesce(1).saveAsTextFile("/app/data/") 

Puede llamar a repartition() y seguir de esta manera:

 val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) var repartitioned = year.repartition(1) repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00") 

enter image description here

Aquí está mi respuesta para generar un solo archivo. Acabo de agregar coalesce(1)

 val year = sc.textFile("apat63_99.txt") .map(_.split(",")(1)) .flatMap(_.split(",")) .map((_,1)) .reduceByKey((_+_)).map(_.swap) year.saveAsTextFile("year") 

Código:

 year.coalesce(1).saveAsTextFile("year")