Cómo sobrescribir el directorio de salida en chispa

Tengo una aplicación de transmisión de chispa que produce un conjunto de datos por cada minuto. Necesito guardar / sobrescribir los resultados de los datos procesados.

Cuando traté de sobrescribir el conjunto de datos org.apache.hadoop.mapred.FileAlreadyExistsException detiene la ejecución.

Establecí el conjunto de propiedades Spark set("spark.files.overwrite","true") , pero no hay suerte.

¿Cómo sobrescribir o eliminar archivos de la chispa?

ACTUALIZACIÓN: Sugerir usar Dataframes , más algo como ... .write.mode(SaveMode.Overwrite) ...

Para versiones anteriores prueba

 yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false") val sc = SparkContext(yourSparkConf) 

En 1.1.0 puede establecer configuraciones de conf mediante el script spark-submit con el indicador –conf.

ADVERTENCIA: Según @piggybox, hay un error en Spark, donde solo sobrescribirá los archivos que necesita para escribir sus archivos part- , y los demás archivos no se eliminarán.

La documentación para el parámetro spark.files.overwrite dice esto: “Si sobrescribir los archivos agregados a través de SparkContext.addFile() cuando el archivo de destino existe y sus contenidos no coinciden con los de la fuente”. Por lo tanto, no tiene ningún efecto en el método saveAsTextFiles.

Puede hacer esto antes de guardar el archivo:

 val hadoopConf = new org.apache.hadoop.conf.Configuration() val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf) try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } } 

Aas explicado aquí: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html

Desde la documentación de pyspark.sql.DataFrame.save (actualmente en 1.3.1), puede especificar mode='overwrite' al guardar un DataFrame:

 myDataFrame.save(path='myPath', source='parquet', mode='overwrite') 

He verificado que esto incluso eliminará los archivos de la partición que quedaron. Entonces, si tuviera 10 particiones / archivos originalmente, pero luego sobrescribió la carpeta con un DataFrame que solo tenía 6 particiones, la carpeta resultante tendrá las 6 particiones / archivos.

Consulte la documentación de Spark SQL para obtener más información sobre las opciones de modo.

ya que df.save(path, source, mode) está en desuso, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )

use df.write.format(source).mode("overwrite").save(path)
donde df.write es DataFrameWriter

‘fuente’ puede ser (“com.databricks.spark.avro” | “parquet” | “json”)

df.write.mode (‘overwrite’). parquet (“/ output / folder / path”) funciona si desea sobrescribir un archivo de parquet usando python. Esto está en la chispa 1.6.2. API puede ser diferente en versiones posteriores

  val jobName = "WordCount"; //overwrite the output directory in spark set("spark.hadoop.validateOutputSpecs", "false") val conf = new SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false"); val sc = new SparkContext(conf) 

Esta versión sobrecargada de la función de guardar funciona para mí:

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf (“Sobrescribir”))

El ejemplo anterior sobrescribiría una carpeta existente. El modo de guardar puede tomar estos parámetros también ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Agregar : el modo Agregar significa que al guardar un DataFrame en una fuente de datos, si ya existen datos / tabla, se espera que los contenidos del DataFrame se anexen a los datos existentes.

ErrorIfExists : el modo ErrorIfExists significa que al guardar un DataFrame en una fuente de datos, si ya existen datos, se espera que se genere una excepción.

Ignorar : el modo Ignorar significa que al guardar un DataFrame en una fuente de datos, si ya existen datos, se espera que la operación de guardar no guarde los contenidos del DataFrame y no modifique los datos existentes.

Si está dispuesto a usar su propio formato de salida personalizado, también podrá obtener el comportamiento deseado con RDD.

Eche un vistazo a las siguientes clases: FileOutputFormat , FileOutputCommitter

En formato de salida de archivo tiene un método llamado checkOutputSpecs, que está verificando si existe el directorio de salida. En FileOutputCommitter tiene el commitJob que generalmente transfiere datos del directorio temporal a su lugar final.

No pude verificarlo aún (lo haría, tan pronto como tengo pocos minutos libres) pero teóricamente: si extiendo FileOutputFormat y anulo checkOutputSpecs a un método que no arroja excepción en el directorio ya existe, y ajusto el El método commitJob de mi committer de salida personalizado para realizar la lógica que yo quiera (por ejemplo, anular algunos de los archivos, anexar otros) de lo que podría ser capaz de lograr el comportamiento deseado con los RDD también.

El formato de salida se pasa a: saveAsNewAPIHadoopFile (que es el método saveAsTextFile llamado también para guardar realmente los archivos). Y el compromiso de Salida está configurado en el nivel de la aplicación.