¿Cómo puedo actualizar una variable de transmisión en la transmisión de chispa?

Tengo, creo, un caso de uso relativamente común para la transmisión de chispa:

Tengo una secuencia de objetos que me gustaría filtrar en base a algunos datos de referencia

Inicialmente, pensé que esto sería una cosa muy simple de lograr usando una variable Broadcast :

public void startSparkEngine { Broadcast refdataBroadcast = sparkContext.broadcast(getRefData()); final JavaDStream filteredStream = objectStream.filter(obj -> { final ReferenceData refData = refdataBroadcast.getValue(); return obj.getField().equals(refData.getField()); } filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); } 

Sin embargo, aunque con poca frecuencia, mis datos de referencia cambiarán periódicamente

Tenía la impresión de que podía modificar y retransmitir mi variable en el controlador y que se propagaría a cada uno de los trabajadores, sin embargo, el objeto Broadcast no se puede Serializable y debe ser final .

¿Qué alternativas tengo? Las tres soluciones que puedo pensar son:

  1. Mueva la búsqueda de datos de referencia a forEachPartition o forEachRdd para que resida por completo en los trabajadores. Sin embargo, los datos de referencia viven junto a una API REST, así que también necesitaría almacenar de alguna manera un temporizador / contador para detener el acceso al control remoto para cada elemento en la secuencia.

  2. Reinicie el contexto de chispa cada vez que cambie la refdata, con una nueva variable de difusión.

  3. Convierta los datos de referencia en un RDD , luego join los flujos de tal manera que ahora estoy transmitiendo Pair , aunque esto enviará los datos de referencia con cada objeto.

Extendiendo la respuesta Por @Rohan Aletty. Aquí hay un código de muestra de un BroadcastWrapper que actualiza la variable de difusión según algunos ttl

 public class BroadcastWrapper { private Broadcast broadcastVar; private Date lastUpdatedAt = Calendar.getInstance().getTime(); private static BroadcastWrapper obj = new BroadcastWrapper(); private BroadcastWrapper(){} public static BroadcastWrapper getInstance() { return obj; } public JavaSparkContext getSparkContext(SparkContext sc) { JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); return jsc; } public Broadcast updateAndGet(SparkContext sparkContext){ Date currentDate = Calendar.getInstance().getTime(); long diff = currentDate.getTime()-lastUpdatedAt.getTime(); if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms if (var != null) var.unpersist(); lastUpdatedAt = new Date(System.currentTimeMillis()); //Your logic to refresh ReferenceData data = getRefData(); var = getSparkContext(sparkContext).broadcast(data); } return var; } } 

Tu código se vería así:

 public void startSparkEngine() { final JavaDStream filteredStream = objectStream.transform(stream -> { Broadcast refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context()); stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField())); }); filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); } 

Esto también funcionó para mí en multi-cluster. Espero que esto ayude

Casi todos los que manejan aplicaciones de transmisión necesitan una forma de entrelazar (filtrar, buscar, etc.) los datos de referencia (desde BD, archivos, etc.) en los datos de transmisión. Tenemos una solución parcial de las dos partes

  1. Buscar datos de referencia para usar en operaciones de transmisión

    • crea el objeto CacheLookup con el TTL de caché deseado
    • envolver eso en Broadcast
    • utiliza CacheLookup como parte de la lógica de transmisión

Para la mayor parte esto funciona bien, excepto por lo siguiente

  1. Actualiza los datos de referencia

    No hay una forma definitiva de lograr esto a pesar de las sugerencias en estos hilos, es decir: eliminar la variable de difusión anterior y crear una nueva. Múltiples incógnitas como qué esperar entre estas operaciones.

Esta es una necesidad común, hubiera sido útil si hubiera una forma de enviar información para difundir la actualización de información variable. Con eso, es posible invalidar los cachés locales en “CacheLookup”

La segunda parte del problema aún no está resuelta. Me interesaría si hay algún enfoque viable para esto

No estoy seguro de si ya ha intentado esto, pero creo que se puede lograr una actualización de una variable de difusión sin apagar el SparkContext . Mediante el uso del método unpersist() , las copias de la variable de difusión se eliminan en cada ejecutor y tendrían que ser la variable que necesita ser retransmitida para que se pueda volver a acceder. Para su caso de uso, cuando desee actualizar su transmisión, puede:

  1. Espere a que sus ejecutores terminen con una serie de datos actual

  2. Sin usar la variable de difusión

  3. Actualiza la variable de difusión

  4. Retransmisión para enviar los nuevos datos de referencia a los ejecutores

Estoy sacando mucho de esta publicación, pero la persona que hizo la última respuesta afirmó haberla hecho funcionar localmente. Es importante tener en cuenta que probablemente desee establecer el locking como true en la versión no integrada para asegurarse de que los ejecutores eliminen los datos antiguos (de modo que los valores obsoletos no se leerán nuevamente en la siguiente iteración).

Recientemente se enfrentó un problema con esto. Pensé que podría ser útil para los usuarios de Scala.

La forma de Scala de hacer BroadCastWrapper es como el siguiente ejemplo.

 import java.io.{ ObjectInputStream, ObjectOutputStream } import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.StreamingContext import scala.reflect.ClassTag /* wrapper lets us update brodcast variables within DStreams' foreachRDD without running into serialization issues */ case class BroadcastWrapper[T: ClassTag]( @transient private val ssc: StreamingContext, @transient private val _v: T) { @transient private var v = ssc.sparkContext.broadcast(_v) def update(newValue: T, blocking: Boolean = false): Unit = { v.unpersist(blocking) v = ssc.sparkContext.broadcast(newValue) } def value: T = v.value private def writeObject(out: ObjectOutputStream): Unit = { out.writeObject(v) } private def readObject(in: ObjectInputStream): Unit = { v = in.readObject().asInstanceOf[Broadcast[T]] } } 

Cada vez que necesita llamar a la función de actualización para obtener una nueva variable de difusión.