java.lang.ClassCastException utilizando expresiones lambda en spark job en servidor remoto

Estoy intentando crear una API web para mis trabajos de chispa de apache usando el framework sparkjava.com. Mi código es:

@Override public void init() { get("/hello", (req, res) -> { String sourcePath = "hdfs://spark:54310/input/*"; SparkConf conf = new SparkConf().setAppName("LineCount"); conf.setJars(new String[] { "/home/sam/resin-4.0.42/webapps/test.war" }); File configFile = new File("config.properties"); String sparkURI = "spark://hamrah:7077"; conf.setMaster(sparkURI); conf.set("spark.driver.allowMultipleContexts", "true"); JavaSparkContext sc = new JavaSparkContext(conf); @SuppressWarnings("resource") JavaRDD log = sc.textFile(sourcePath); JavaRDD lines = log.filter(x -> { return true; }); return lines.count(); }); } 

Si elimino la expresión lambda o la pongo dentro de un simple contenedor en lugar de un servicio web (de alguna manera un servlet) se ejecutará sin ningún error. Pero usar una expresión lambda dentro de un servlet dará como resultado esta excepción:

 15/01/28 10:36:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hamrah): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

PD: Probé la combinación de jersey y javaspark con embarcadero, gato y resina y todos ellos me llevaron al mismo resultado.

Lo que tienes aquí es un error de seguimiento que enmascara el error original.

Cuando las instancias de lambda se serializan, usan writeReplace para disolver su implementación específica de JRE del formulario persistente que es una instancia de SerializedLambda . Cuando se restaure la instancia SerializedLambda , se readResolve su método readResolve para reconstituir la instancia lambda apropiada. Como dice la documentación, lo hará al invocar un método especial de la clase que definió el lambda original (ver también esta respuesta ). El punto importante es que la clase original es necesaria y eso es lo que falta en tu caso.

Pero hay un … especial … comportamiento del ObjectInputStream . Cuando encuentra una excepción, no rescata de inmediato. Grabará la excepción y continuará el proceso, marcando todos los objetos que se están leyendo actualmente, por lo que también depende de que el objeto erróneo sea erróneo. Solo al final del proceso lanzará la excepción original que encontró. Lo que lo hace tan extraño es que también continuará tratando de establecer los campos de estos objetos. Pero cuando mira el método ObjectInputStream.readOrdinaryObject línea 1806:

 … if (obj != null && handles.lookupException(passHandle) == null && desc.hasReadResolveMethod()) { Object rep = desc.invokeReadResolve(obj); if (unshared && rep.getClass().isArray()) { rep = cloneArray(rep); } if (rep != obj) { handles.setObject(passHandle, obj = rep); } } return obj; } 

ve que no llama al método lookupException cuando lookupException informa una excepción no null . Pero cuando no sucedió la sustitución, no es una buena idea continuar tratando de establecer los valores de campo de la referencia, pero eso es exactamente lo que sucede aquí, por lo tanto, produce una ClassCastException .

Puede reproducir fácilmente el problema:

 public class Holder implements Serializable { Runnable r; } public class Defining { public static Holder get() { final Holder holder = new Holder(); holder.r=(Runnable&Serializable)()->{}; return holder; } } public class Writing { static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser"); public static void main(String... arg) throws IOException { try(FileOutputStream os=new FileOutputStream(f); ObjectOutputStream oos=new ObjectOutputStream(os)) { oos.writeObject(Defining.get()); } System.out.println("written to "+f); } } public class Reading { static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser"); public static void main(String... arg) throws IOException, ClassNotFoundException { try(FileInputStream is=new FileInputStream(f); ObjectInputStream ois=new ObjectInputStream(is)) { Holder h=(Holder)ois.readObject(); System.out.println(hr); hrrun(); } System.out.println("read from "+f); } } 

Comstack estas cuatro clases y ejecuta Writing . A continuación, elimine el archivo de clase Defining.class y ejecute Reading . Entonces obtendrás un

 Exception in thread "main" java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field test.Holder.r of type java.lang.Runnable in instance of test.Holder at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) 

(Probado con 1.8.0_20)


La conclusión es que puede olvidarse de este problema de serialización una vez que se entiende lo que está sucediendo, todo lo que tiene que hacer para resolver su problema es asegurarse de que la clase que definió la expresión lambda también esté disponible en el tiempo de ejecución donde está el lambda deserializado

Ejemplo de Spark Job para ejecutar directamente desde IDE (spark-submit distribuye el jar por defecto):

 SparkConf sconf = new SparkConf() .set("spark.eventLog.dir", "hdfs://nn:8020/user/spark/applicationHistory") .set("spark.eventLog.enabled", "true") .setJars(new String[]{"/path/to/jar/with/your/class.jar"}) .setMaster("spark://spark.standalone.uri:7077"); 

Supongo que su problema es el auto-box fallido. En el código

 x -> { return true; } 

pasa ( String->boolean ) lambda (es Predicate ) mientras que el método de filtro toma ( String->Boolean ) lambda (es Function ). Así que te ofrezco cambiar el código a

 x -> { return Boolean.TRUE; } 

Incluye detalles en tu pregunta, por favor. Se aprecia la salida de uname -a y java -version . Proporcione sscce si es posible.

Tuve el mismo error y reemplacé el lambda con una clase interna, luego funcionó. Realmente no entiendo por qué, y la reproducción de este error fue extremadamente difícil (teníamos un servidor que exhibía el comportamiento, y en ninguna otra parte).

Causa problemas de serialización (utiliza lambdas, provoca un error SerializedLambda )

 this.variable = () -> { ..... } 

Rendimiento java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field MyObject.val$variable

Trabajos

 this.variable = new MyInterface() { public void myMethod() { ..... } }; 

Tal vez puedas simplemente recolocar tu lambda Java8 con una función spark.scala.Function

reemplazar

 output = rdds.map(x->this.function(x)).collect() 

con:

 output = rdds.map(new Function(){ public Double call(Double x){ return MyClass.this.function(x); } }).collect();