Tengo, creo, un caso de uso relativamente común para la transmisión de chispa:
Tengo una secuencia de objetos que me gustaría filtrar en base a algunos datos de referencia
Inicialmente, pensé que esto sería una cosa muy simple de lograr usando una variable Broadcast :
public void startSparkEngine { Broadcast refdataBroadcast = sparkContext.broadcast(getRefData()); final JavaDStream filteredStream = objectStream.filter(obj -> { final ReferenceData refData = refdataBroadcast.getValue(); return obj.getField().equals(refData.getField()); } filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); }
Sin embargo, aunque con poca frecuencia, mis datos de referencia cambiarán periódicamente
Tenía la impresión de que podía modificar y retransmitir mi variable en el controlador y que se propagaría a cada uno de los trabajadores, sin embargo, el objeto Broadcast
no se puede Serializable
y debe ser final
.
¿Qué alternativas tengo? Las tres soluciones que puedo pensar son:
Mueva la búsqueda de datos de referencia a forEachPartition
o forEachRdd
para que resida por completo en los trabajadores. Sin embargo, los datos de referencia viven junto a una API REST, así que también necesitaría almacenar de alguna manera un temporizador / contador para detener el acceso al control remoto para cada elemento en la secuencia.
Reinicie el contexto de chispa cada vez que cambie la refdata, con una nueva variable de difusión.
Convierta los datos de referencia en un RDD , luego join
los flujos de tal manera que ahora estoy transmitiendo Pair
, aunque esto enviará los datos de referencia con cada objeto.
Extendiendo la respuesta Por @Rohan Aletty. Aquí hay un código de muestra de un BroadcastWrapper que actualiza la variable de difusión según algunos ttl
public class BroadcastWrapper { private Broadcast broadcastVar; private Date lastUpdatedAt = Calendar.getInstance().getTime(); private static BroadcastWrapper obj = new BroadcastWrapper(); private BroadcastWrapper(){} public static BroadcastWrapper getInstance() { return obj; } public JavaSparkContext getSparkContext(SparkContext sc) { JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); return jsc; } public Broadcast updateAndGet(SparkContext sparkContext){ Date currentDate = Calendar.getInstance().getTime(); long diff = currentDate.getTime()-lastUpdatedAt.getTime(); if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms if (var != null) var.unpersist(); lastUpdatedAt = new Date(System.currentTimeMillis()); //Your logic to refresh ReferenceData data = getRefData(); var = getSparkContext(sparkContext).broadcast(data); } return var; } }
Tu código se vería así:
public void startSparkEngine() { final JavaDStream filteredStream = objectStream.transform(stream -> { Broadcast refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context()); stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField())); }); filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); }
Esto también funcionó para mí en multi-cluster. Espero que esto ayude
Casi todos los que manejan aplicaciones de transmisión necesitan una forma de entrelazar (filtrar, buscar, etc.) los datos de referencia (desde BD, archivos, etc.) en los datos de transmisión. Tenemos una solución parcial de las dos partes
Buscar datos de referencia para usar en operaciones de transmisión
Para la mayor parte esto funciona bien, excepto por lo siguiente
Actualiza los datos de referencia
No hay una forma definitiva de lograr esto a pesar de las sugerencias en estos hilos, es decir: eliminar la variable de difusión anterior y crear una nueva. Múltiples incógnitas como qué esperar entre estas operaciones.
Esta es una necesidad común, hubiera sido útil si hubiera una forma de enviar información para difundir la actualización de información variable. Con eso, es posible invalidar los cachés locales en “CacheLookup”
La segunda parte del problema aún no está resuelta. Me interesaría si hay algún enfoque viable para esto
No estoy seguro de si ya ha intentado esto, pero creo que se puede lograr una actualización de una variable de difusión sin apagar el SparkContext
. Mediante el uso del método unpersist()
, las copias de la variable de difusión se eliminan en cada ejecutor y tendrían que ser la variable que necesita ser retransmitida para que se pueda volver a acceder. Para su caso de uso, cuando desee actualizar su transmisión, puede:
Espere a que sus ejecutores terminen con una serie de datos actual
Sin usar la variable de difusión
Actualiza la variable de difusión
Retransmisión para enviar los nuevos datos de referencia a los ejecutores
Estoy sacando mucho de esta publicación, pero la persona que hizo la última respuesta afirmó haberla hecho funcionar localmente. Es importante tener en cuenta que probablemente desee establecer el locking como true
en la versión no integrada para asegurarse de que los ejecutores eliminen los datos antiguos (de modo que los valores obsoletos no se leerán nuevamente en la siguiente iteración).
Recientemente se enfrentó un problema con esto. Pensé que podría ser útil para los usuarios de Scala.
La forma de Scala de hacer BroadCastWrapper
es como el siguiente ejemplo.
import java.io.{ ObjectInputStream, ObjectOutputStream } import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.StreamingContext import scala.reflect.ClassTag /* wrapper lets us update brodcast variables within DStreams' foreachRDD without running into serialization issues */ case class BroadcastWrapper[T: ClassTag]( @transient private val ssc: StreamingContext, @transient private val _v: T) { @transient private var v = ssc.sparkContext.broadcast(_v) def update(newValue: T, blocking: Boolean = false): Unit = { v.unpersist(blocking) v = ssc.sparkContext.broadcast(newValue) } def value: T = v.value private def writeObject(out: ObjectOutputStream): Unit = { out.writeObject(v) } private def readObject(in: ObjectInputStream): Unit = { v = in.readObject().asInstanceOf[Broadcast[T]] } }
Cada vez que necesita llamar a la función de actualización para obtener una nueva variable de difusión.