Igualdad de clase de caso en Apache Spark

¿Por qué la coincidencia de patrones en Spark no funciona igual que en Scala? Vea el siguiente ejemplo … la función f() intenta coincidir con el patrón en la clase, que funciona en Scala REPL pero falla en Spark y da como resultado todo “???”. f2() es una solución que obtiene el resultado deseado en Spark usando .isInstanceOf() , pero entiendo que es una mala forma en Scala.

Cualquier ayuda sobre el patrón que coincida con la forma correcta en este escenario en Spark sería muy apreciada.

 abstract class a extends Serializable {val a: Int} case class b(a: Int) extends a case class bNull(a: Int=0) extends a val x: List[a] = List(b(0), b(1), bNull()) val xRdd = sc.parallelize(x) 

bash de coincidencia de patrones que funciona en Scala REPL pero falla en Spark

 def f(x: a) = x match { case b(n) => "b" case bNull(n) => "bnull" case _ => "???" } 

solución alternativa que funciona en Spark, pero es mala forma (creo)

 def f2(x: a) = { if (x.isInstanceOf[b]) { "b" } else if (x.isInstanceOf[bNull]) { "bnull" } else { "???" } } 

Ver resultados

 xRdd.map(f).collect //does not work in Spark // result: Array("???", "???", "???") xRdd.map(f2).collect // works in Spark // resut: Array("b", "b", "bnull") x.map(f(_)) // works in Scala REPL // result: List("b", "b", "bnull") 

Versiones utilizadas … Los resultados de chispa se ejecutan en chispa-caparazón (Spark 1.6 en AWS EMR-4.3) Scala REPL en SBT 0.13.9 (Scala 2.10.5)

Este es un problema conocido con Spark REPL. Puede encontrar más detalles en SPARK-2620 . Afecta a varias operaciones en Spark REPL, incluida la mayoría de las transformaciones en los PairwiseRDDs . Por ejemplo:

 case class Foo(x: Int) val foos = Seq(Foo(1), Foo(1), Foo(2), Foo(2)) foos.distinct.size // Int = 2 val foosRdd = sc.parallelize(foos, 4) foosRdd.distinct.count // Long = 4 foosRdd.map((_, 1)).reduceByKey(_ + _).collect // Array[(Foo, Int)] = Array((Foo(1),1), (Foo(1),1), (Foo(2),1), (Foo(2),1)) foosRdd.first == foos.head // Boolean = false Foo.unapply(foosRdd.first) == Foo.unapply(foos.head) // Boolean = true 

Lo que lo empeora es que los resultados dependen de la distribución de datos:

 sc.parallelize(foos, 1).distinct.count // Long = 2 sc.parallelize(foos, 1).map((_, 1)).reduceByKey(_ + _).collect // Array[(Foo, Int)] = Array((Foo(2),2), (Foo(1),2)) 

Lo más simple que puede hacer es definir y empaquetar las clases de casos requeridas fuera de REPL. Cualquier código enviado directamente usando spark-submit debería funcionar también.

En Scala 2.11+ puede crear un paquete directamente en REPL con paste -raw .

 scala> :paste -raw // Entering paste mode (ctrl-D to finish) package bar case class Bar(x: Int) // Exiting paste mode, now interpreting. scala> import bar.Bar import bar.Bar scala> sc.parallelize(Seq(Bar(1), Bar(1), Bar(2), Bar(2))).distinct.collect res1: Array[bar.Bar] = Array(Bar(1), Bar(2))