Encadenando múltiples trabajos de MapReduce en Hadoop

En muchas situaciones de la vida real donde aplica MapReduce, los algoritmos finales terminan siendo varios pasos de MapReduce.

es decir, Map1, Reduce1, Map2, Reduce2, y así sucesivamente.

Por lo tanto, tiene la salida de la última reducción que se necesita como entrada para el siguiente mapa.

Los datos intermedios son algo que (en general) no desea mantener una vez que la tubería se ha completado con éxito. Además, debido a que estos datos intermedios son en general una estructura de datos (como un “mapa” o un “conjunto”), no es necesario esforzarse demasiado para escribir y leer estos pares clave-valor.

¿Cuál es la forma recomendada de hacerlo en Hadoop?

¿Hay algún ejemplo (simple) que muestre cómo manejar estos datos intermedios de la manera correcta, incluida la limpieza posterior?

Creo que este tutorial sobre la red de desarrolladores de Yahoo te ayudará con esto: Chaining Jobs

Utiliza el JobClient.runJob() . La ruta de salida de los datos del primer trabajo se convierte en la ruta de entrada a su segundo trabajo. Estos deben transmitirse como argumentos a sus trabajos con el código apropiado para analizarlos y configurar los parámetros para el trabajo.

Creo que el método anterior podría ser, sin embargo, la forma en que lo hizo la antigua API mapeada, pero aún debería funcionar. Habrá un método similar en la nueva API mapreduce, pero no estoy seguro de qué se trata.

En cuanto a eliminar datos intermedios después de que un trabajo haya terminado, puede hacer esto en su código. La forma en que lo hice antes es usar algo como:

 FileSystem.delete(Path f, boolean recursive); 

Donde la ruta es la ubicación en HDFS de los datos. Debe asegurarse de que solo borre estos datos una vez que ningún otro trabajo lo requiera.

Hay muchas formas en que puedes hacerlo.

(1) trabajos en cascada

Cree el objeto JobConf “job1” para el primer trabajo y configure todos los parámetros con “input” como inputdirectory y “temp” como directorio de salida. Ejecute este trabajo:

 JobClient.run(job1). 

Inmediatamente debajo, cree el objeto JobConf “job2” para el segundo trabajo y configure todos los parámetros con “temp” como inputdirectory y “output” como directorio de salida. Ejecute este trabajo:

 JobClient.run(job2). 

(2) Cree dos objetos JobConf y establezca todos los parámetros en ellos igual que (1) excepto que no use JobClient.run.

Luego crea dos objetos Job con jobconfs como parámetros:

 Job job1=new Job(jobconf1); Job job2=new Job(jobconf2); 

Con el objeto jobControl, especifica las dependencias de trabajo y luego ejecuta los trabajos:

 JobControl jbcntrl=new JobControl("jbcntrl"); jbcntrl.addJob(job1); jbcntrl.addJob(job2); job2.addDependingJob(job1); jbcntrl.run(); 

(3) Si necesita una estructura algo así como Map + | Reducir | Mapa *, puede usar las clases ChainMapper y ChainReducer que vienen con Hadoop versión 0.19 en adelante.

Aclamaciones

En realidad, hay varias maneras de hacer esto. Me enfocaré en dos.

Una es a través de Riffle ( http://github.com/cwensel/riffle ) una biblioteca de anotaciones para identificar cosas dependientes y ‘ejecutarlas’ en orden de dependencia (topológico).

O puede usar una Cascade (y MapReduceFlow) en Cascading ( http://www.cascading.org/ ). Una versión futura admitirá las anotaciones de Riffle, pero funciona muy bien ahora con trabajos sin procesar de MR JobConf.

Una variante de esto es no administrar los trabajos de MR a mano en absoluto, sino desarrollar su aplicación utilizando la API en cascada. A continuación, el JobConf y el encadenamiento de trabajos se manejan internamente a través del planificador en cascada y las clases de flujo.

De esta forma, dedicará su tiempo a concentrarse en su problema, no en la mecánica de administrar trabajos de Hadoop, etc. Incluso puede aplicar capas diferentes de idiomas en la parte superior (como clojure o jruby) para simplificar aún más su desarrollo y aplicaciones. http://www.cascading.org/modules.html

He hecho un trabajo de encadenamiento utilizando objetos de JobConf uno tras otro. Tomé el ejemplo de WordCount para encadenar los trabajos. Un trabajo determina cuántas veces se repite una palabra en el resultado dado. El segundo trabajo toma la salida del primer trabajo como entrada y calcula el total de palabras en la entrada dada. A continuación se muestra el código que debe colocarse en la clase de controlador.

  //First Job - Counts, how many times a word encountered in a given file JobConf job1 = new JobConf(WordCount.class); job1.setJobName("WordCount"); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); job1.setMapperClass(WordCountMapper.class); job1.setCombinerClass(WordCountReducer.class); job1.setReducerClass(WordCountReducer.class); job1.setInputFormat(TextInputFormat.class); job1.setOutputFormat(TextOutputFormat.class); //Ensure that a folder with the "input_data" exists on HDFS and contains the input files FileInputFormat.setInputPaths(job1, new Path("input_data")); //"first_job_output" contains data that how many times a word occurred in the given file //This will be the input to the second job. For second job, input data name should be //"first_job_output". FileOutputFormat.setOutputPath(job1, new Path("first_job_output")); JobClient.runJob(job1); //Second Job - Counts total number of words in a given file JobConf job2 = new JobConf(TotalWords.class); job2.setJobName("TotalWords"); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); job2.setMapperClass(TotalWordsMapper.class); job2.setCombinerClass(TotalWordsReducer.class); job2.setReducerClass(TotalWordsReducer.class); job2.setInputFormat(TextInputFormat.class); job2.setOutputFormat(TextOutputFormat.class); //Path name for this job should match first job's output path name FileInputFormat.setInputPaths(job2, new Path("first_job_output")); //This will contain the final output. If you want to send this jobs output //as input to third job, then third jobs input path name should be "second_job_output" //In this way, jobs can be chained, sending output one to other as input and get the //final output FileOutputFormat.setOutputPath(job2, new Path("second_job_output")); JobClient.runJob(job2); 

El comando para ejecutar estos trabajos es:

bin / hadoop jar TotalWords.

Necesitamos dar el nombre de los trabajos finales para el comando. En el caso anterior, es TotalWords.

Puedes usar oozie para el procesamiento de tus trabajos de MapReduce. http://issues.apache.org/jira/browse/HADOOP-5303

Puede ejecutar la cadena MR de la manera que figura en el código.

TENGA EN CUENTA : solo se ha proporcionado el código de conductor

 public class WordCountSorting { // here the word keys shall be sorted //let us write the wordcount logic first public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException { //THE DRIVER CODE FOR MR CHAIN Configuration conf1=new Configuration(); Job j1=Job.getInstance(conf1); j1.setJarByClass(WordCountSorting.class); j1.setMapperClass(MyMapper.class); j1.setReducerClass(MyReducer.class); j1.setMapOutputKeyClass(Text.class); j1.setMapOutputValueClass(IntWritable.class); j1.setOutputKeyClass(LongWritable.class); j1.setOutputValueClass(Text.class); Path outputPath=new Path("FirstMapper"); FileInputFormat.addInputPath(j1,new Path(args[0])); FileOutputFormat.setOutputPath(j1,outputPath); outputPath.getFileSystem(conf1).delete(outputPath); j1.waitForCompletion(true); Configuration conf2=new Configuration(); Job j2=Job.getInstance(conf2); j2.setJarByClass(WordCountSorting.class); j2.setMapperClass(MyMapper2.class); j2.setNumReduceTasks(0); j2.setOutputKeyClass(Text.class); j2.setOutputValueClass(IntWritable.class); Path outputPath1=new Path(args[1]); FileInputFormat.addInputPath(j2, outputPath); FileOutputFormat.setOutputPath(j2, outputPath1); outputPath1.getFileSystem(conf2).delete(outputPath1, true); System.exit(j2.waitForCompletion(true)?0:1); } } 

LA SECUENCIA ES

( JOB1 ) MAP- > REDUCE-> ( JOB2 ) MAPA
Esto se hizo para obtener las claves ordenadas, pero hay más maneras, como el uso de un mapa de árbol
Sin embargo, quiero centrar su atención en la forma en que los Jobs han sido encadenados.
Gracias

Hay ejemplos en el proyecto Apache Mahout que encadena varios trabajos de MapReduce. Uno de los ejemplos se puede encontrar en:

RecommenderJob.java

http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob

Podemos utilizar el método waitForCompletion(true) del trabajo para definir la dependencia entre el trabajo.

En mi caso, tuve 3 trabajos que dependían el uno del otro. En la clase de controlador utilicé el siguiente código y funciona como se esperaba.

 public static void main(String[] args) throws Exception { // TODO Auto-generated method stub CCJobExecution ccJobExecution = new CCJobExecution(); Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]); Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]); Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]); System.out.println("****************Started Executing distanceTimeFraudJob ================"); distanceTimeFraudJob.submit(); if(distanceTimeFraudJob.waitForCompletion(true)) { System.out.println("=================Completed DistanceTimeFraudJob================= "); System.out.println("=================Started Executing spendingFraudJob ================"); spendingFraudJob.submit(); if(spendingFraudJob.waitForCompletion(true)) { System.out.println("=================Completed spendingFraudJob================= "); System.out.println("=================Started locationFraudJob================= "); locationFraudJob.submit(); if(locationFraudJob.waitForCompletion(true)) { System.out.println("=================Completed locationFraudJob================="); } } } } 

La nueva clase org.apache.hadoop.mapreduce.lib.chain.ChainMapper ayuda a este escenario

Aunque hay motores de flujo de trabajo de Hadoop basados ​​en servidores complejos, por ejemplo, oozie, tengo una biblioteca java simple que permite la ejecución de múltiples trabajos de Hadoop como un flujo de trabajo. La configuración del trabajo y el flujo de trabajo que definen la dependencia entre trabajos se configuran en un archivo JSON. Todo es configurable desde el exterior y no requiere ningún cambio en la implementación de reducir mapa existente para formar parte de un flujo de trabajo.

Detalles pueden ser encontrados aqui. El código fuente y el jar están disponibles en github.

http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

Pranab

Creo que Oozie ayuda a los trabajos consiguientes a recibir las entradas directamente del trabajo anterior. Esto evita la operación de E / S realizada con control de trabajo.

Si quiere encadenar programáticamente sus trabajos, querrá usar JobControl. El uso es bastante simple:

  JobControl jobControl = new JobControl(name); 

Después de eso, agrega instancias de ControlledJob. ControlledJob define un trabajo con sus dependencias, por lo tanto, acumula automáticamente las entradas y salidas para adaptarse a una “cadena” de trabajos.

  jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2)); jobControl.run(); 

comienza la cadena Querrá poner eso en un hilo Speerate. Esto permite verificar el estado de su cadena mientras se ejecuta:

  while (!jobControl.allFinished()) { System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size()); System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size()); System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size()); List successfulJobList = jobControl.getSuccessfulJobList(); System.out.println("Jobs in success state: " + successfulJobList.size()); List failedJobList = jobControl.getFailedJobList(); System.out.println("Jobs in failed state: " + failedJobList.size()); } 

Como mencionó en su requisito que quiere que o / p de MRJob1 sea el i / p de MRJob2 y así sucesivamente, puede considerar usar el flujo de trabajo de oozie para este uso. También podría considerar escribir sus datos intermedios en HDFS, ya que serán utilizados por el próximo MRJob. Y después de que el trabajo finalice, puede limpiar sus datos intermedios.