fusionar archivos de salida después de la fase de reducción

En mapreduce, cada tarea de reducción escribe su salida en un archivo llamado part-r-nnnnn donde nnnnn es una ID de partición asociada a la tarea de reducción. ¿ Asigna / reduce la fusión de estos archivos? Si es así, ¿cómo?

En lugar de fusionar el archivo por su cuenta, puede delegar toda la fusión de los archivos de salida reducida llamando al:

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt 

Nota Esto combina los archivos HDFS localmente. Asegúrese de tener suficiente espacio en disco antes de ejecutar

No, estos archivos no están fusionados por Hadoop. La cantidad de archivos que obtiene es la misma que la cantidad de tareas de reducción.

Si necesita eso como entrada para un próximo trabajo, entonces no se preocupe por tener archivos separados. Simplemente especifique todo el directorio como entrada para el próximo trabajo.

Si necesita los datos fuera del clúster, generalmente los fusiono en el extremo receptor cuando extraigo los datos del clúster.

Es decir algo como esto:

 hadoop fs -cat /some/where/on/hdfs/job-output/part-r-* > TheCombinedResultOfTheJob.txt 

Esa es la función que puede usar para fusionar archivos en HDFS

 public boolean getMergeInHdfs(String src, String dest) throws IllegalArgumentException, IOException { FileSystem fs = FileSystem.get(config); Path srcPath = new Path(src); Path dstPath = new Path(dest); // Check if the path already exists if (!(fs.exists(srcPath))) { logger.info("Path " + src + " does not exists!"); return false; } if (!(fs.exists(dstPath))) { logger.info("Path " + dest + " does not exists!"); return false; } return FileUtil.copyMerge(fs, srcPath, fs, dstPath, false, config, null); } 

Para archivos de texto solamente y HDFS como fuente y destino, use el siguiente comando:

hadoop fs -cat /input_hdfs_dir/* | hadoop fs -put - /output_hdfs_file

Esto concatenará todos los archivos en input_hdfs_dir y escribirá la salida en HDFS en output_hdfs_file . Tenga en cuenta que todos los datos serán devueltos al sistema local y luego nuevamente cargados en hdfs, aunque no se crean archivos temporales y esto sucede sobre la marcha utilizando UNIX pe.

Además, esto no funcionará con archivos que no sean de texto, como Avro, ORC, etc.

Para archivos binarios, puede hacer algo como esto (si tiene tablas Hive asignadas en los directorios):

insert overwrite table tbl select * from tbl

Dependiendo de su configuración, esto también podría crear más que archivos. Para crear un único archivo, establezca el número de reductores en 1 explícitamente usando mapreduce.job.reduces=1 o establezca la propiedad de la sección como hive.merge.mapredfiles=true .

Puede ejecutar una tarea adicional de mapa / reducir, donde el mapa y reducir no cambian los datos, y el particionador asigna todos los datos a un solo reductor.

Los archivos part-r-nnnnn se generan después de la fase de reducción designada por ‘r’ en el medio. Ahora, el hecho es que si tiene un reductor en ejecución, tendrá un archivo de salida como part-r-00000. Si la cantidad de reductores es 2, entonces tendrá la parte-r-00000 y la parte-r-00001, y así sucesivamente. Mire, si el archivo de salida es demasiado grande para caber en la memoria de la máquina ya que ha sido diseñado para ejecutarse en Máquinas de productos , entonces el archivo se divide. Según el MRv1, tiene un límite de 20 reductores para trabajar en su lógica. Puede tener más, pero lo mismo debe personalizarse en los archivos de configuración mapred-site.xml . Hablando de tu pregunta; puede usar getmerge o puede establecer el número de reductores en 1 insertando la siguiente statement en el código del controlador

 job.setNumReduceTasks(1); 

Espero que esto responda a su pregunta.

Además de mi respuesta anterior, tengo una respuesta más para ti que estaba intentando hace unos minutos. Puede usar CustomOutputFormat que se parece al código que se proporciona a continuación

 public class VictorOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter( TaskAttemptContext tac) throws IOException, InterruptedException { //step 1: GET THE CURRENT PATH Path currPath=FileOutputFormat.getOutputPath(tac); //Create the full path Path fullPath=new Path(currPath,"Aniruddha.txt"); //create the file in the file system FileSystem fs=currPath.getFileSystem(tac.getConfiguration()); FSDataOutputStream fileOut=fs.create(fullPath,tac); return new VictorRecordWriter(fileOut); } } 

Solo, eche un vistazo a la cuarta línea de la última. He usado mi propio nombre como nombre de archivo de salida y he probado el progtwig con 15 reductores. Aún así, el archivo sigue siendo el mismo. Por lo tanto, es posible obtener un único archivo de salida en lugar de dos o más, pero el tamaño del archivo de salida no debe exceder el tamaño de la memoria primaria, es decir, el archivo de salida debe caber en la memoria de la máquina de productos básicos. un problema con la división del archivo de salida. ¡¡Gracias!!

¿Por qué no utilizar un script de cerdo como este para fusionar archivos de partición?

 stuff = load "/path/to/dir/*" store stuff into "/path/to/mergedir" 

Si los archivos tienen encabezado, puedes deshacerte de él haciendo esto:

 hadoop fs -cat /path/to/hdfs/job-output/part-* | grep -v "header" > output.csv 

luego agregue el encabezado manualmente para output.csv

. ¿Asigna / reduce la fusión de estos archivos?

No. No se fusiona.

Puede usar IdentityReducer para lograr su objective.

No realiza ninguna reducción y escribe todos los valores de entrada directamente en la salida.

 public void reduce(K key, Iterator values, OutputCollector output, Reporter reporter) throws IOException 

Escribe todas las claves y valores directamente en la salida.

Eche un vistazo a las publicaciones relacionadas de SE:

hadoop: diferencia entre 0 reductor y reductor de identidad?