MultipleOutputFormat en hadoop

Soy un novato en Hadoop. Estoy probando el progtwig Wordcount.

Ahora para probar múltiples archivos de salida, utilizo MultipleOutputFormat . este enlace me ayudó a hacerlo. http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html

en mi clase de manejo que tuve

  MultipleOutputs.addNamedOutput(conf, "even", org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(conf, "odd", org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, IntWritable.class);` 

y mi clase reducida se convirtió en esto

 public static class Reduce extends MapReduceBase implements Reducer { MultipleOutputs mos = null; public void configure(JobConf job) { mos = new MultipleOutputs(job); } public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } if (sum % 2 == 0) { mos.getCollector("even", reporter).collect(key, new IntWritable(sum)); }else { mos.getCollector("odd", reporter).collect(key, new IntWritable(sum)); } //output.collect(key, new IntWritable(sum)); } @Override public void close() throws IOException { // TODO Auto-generated method stub mos.close(); } } 

Las cosas funcionaron, pero recibo MUCHOS archivos (uno impar y uno incluso para cada mapa).

La pregunta es: ¿cómo puedo tener solo 2 archivos de salida (impar y par) para que cada salida impar de cada map-reduce se escriba en ese archivo impar, y lo mismo para par.

Cada reductor utiliza un OutputFormat para escribir registros en. Es por eso que obtienes un conjunto de archivos pares e impares por reductor. Esto es por diseño para que cada reductor pueda realizar escrituras en paralelo.

Si solo desea un único archivo par e impar, deberá establecer mapred.reduce.tasks en 1. Pero el rendimiento se verá afectado, ya que todos los correlacionadores se alimentarán en un solo reductor.

Otra opción es cambiar el proceso de lectura de estos archivos para aceptar múltiples archivos de entrada, o escribir un proceso separado que combine estos archivos.

Escribí una clase para hacer esto. Solo use su trabajo:

 job.setOutputFormatClass(m_customOutputFormatClass); 

Esta es mi clase:

 import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.
*

* WARNING: The number of different folder shuoldn't be large for one mapper since we keep an * {@link RecordWriter} instance per folder name. *

*

* In this class the folder name is defined by the written entry's key.
* To change this behavior simply extend this class and override the * {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own * {@link FolderNameExtractor} implementation. *

* * * @author ykesten * * @param - Keys type * @param - Values type */ public class HdMultipleFileOutputFormat extends TextOutputFormat { private String folderName; private class MultipleFilesRecordWriter extends RecordWriter { private Map> fileNameToWriter; private FolderNameExtractor fileNameExtractor; private TaskAttemptContext job; public MultipleFilesRecordWriter(FolderNameExtractor fileNameExtractor, TaskAttemptContext job) { fileNameToWriter = new HashMap>(); this.fileNameExtractor = fileNameExtractor; this.job = job; } @Override public void write(K key, V value) throws IOException, InterruptedException { String fileName = fileNameExtractor.extractFolderName(key, value); RecordWriter writer = fileNameToWriter.get(fileName); if (writer == null) { writer = createNewWriter(fileName, fileNameToWriter, job); if (writer == null) { throw new IOException("Unable to create writer for path: " + fileName); } } writer.write(key, value); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { for (Entry> entry : fileNameToWriter.entrySet()) { entry.getValue().close(context); } } } private synchronized RecordWriter createNewWriter(String folderName, Map> fileNameToWriter, TaskAttemptContext job) { try { this.folderName = folderName; RecordWriter writer = super.getRecordWriter(job); this.folderName = null; fileNameToWriter.put(folderName, writer); return writer; } catch (Exception e) { e.printStackTrace(); return null; } } @Override public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { Path path = super.getDefaultWorkFile(context, extension); if (folderName != null) { String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName(); path = new Path(newPath); } return path; } @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { return new MultipleFilesRecordWriter(getFolderNameExtractor(), job); } public FolderNameExtractor getFolderNameExtractor() { return new KeyFolderNameExtractor(); } public interface FolderNameExtractor { public String extractFolderName(K key, V value); } private static class KeyFolderNameExtractor implements FolderNameExtractor { public String extractFolderName(K key, V value) { return key.toString(); } } }

Se generarán múltiples archivos de salida en función del número de reductores.

Puede usar hadoop dfs -getmerge para salidas fusionadas