Articles of apache spark

¿Cómo puedo pasar parámetros adicionales a UDF en SparkSql?

Quiero analizar las columnas de fecha en un DataFrame , y para cada columna de fecha, la resolución para la fecha puede cambiar (es decir, 2011/01/10 => 2011/01 si la resolución se establece en “Mes”). Escribí el siguiente código: def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame = { import org.apache.spark.sql.functions._ val convertDateFunc […]

¿Cómo crear un DataFrame vacío con un esquema específico?

Quiero crear en DataFrame con un esquema especificado en Scala. Intenté usar lectura JSON (me refiero a leer el archivo vacío) pero no creo que sea la mejor práctica.

¿Qué operaciones preservan el orden de RDD?

El RDD tiene una orden significativa (a diferencia de alguna orden aleatoria impuesta por el modelo de almacenamiento) si fue procesada por sortBy() , como se explica en esta respuesta . Ahora, ¿qué operaciones conservan esa orden? Por ejemplo, ¿está garantizado que (después de a.sortBy() ) a.map(f).zip(a) === a.map(x => (f(x),x)) Qué tal si a.filter(f).map(g) […]

Cambiar el nombre de los nombres de columna de un DataFrame en Spark Scala

Estoy intentando convertir todos los encabezados / nombres de columna de un DataFrame en Spark-Scala. a partir de ahora se me ocurrió el siguiente código que solo reemplaza el nombre de una sola columna. for( i <- 0 to origCols.length – 1) { df.withColumnRenamed( df.columns(i), df.columns(i).toLowerCase ); }

Cómo usar COGROUP para grandes conjuntos de datos

Tengo dos rdd’s saber val tab_a: RDD[(String, String)] y val tab_b: RDD[(String, String)] Estoy usando cogroup para esos datasets como: val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } } Estoy utilizando tab_c valores agrupados para la función de mapa y funciona bien para pequeños conjuntos de datos, pero en […]

¿Cómo dividir un dataframe en marcos de datos con los mismos valores de columna?

Usando Scala, ¿cómo puedo dividir el DataFrame en múltiples dataFrame (ya sea matriz o colección) con el mismo valor de columna? Por ejemplo, quiero dividir el siguiente DataFrame: ID Rate State 1 24 AL 2 35 MN 3 46 FL 4 34 AL 5 78 MN 6 99 FL a: conjunto de datos 1 ID […]

¿Cómo obtener ID de una tarea de mapa en Spark?

¿Hay alguna manera de obtener ID de una tarea de mapa en Spark? Por ejemplo, si cada tarea de mapa llama a una función definida por el usuario, ¿puedo obtener el ID de esa tarea de mapa desde esa función definida por el usuario?

Spark losing println () en stdout

Tengo el siguiente código: val blueCount = sc.accumulator[Long](0) val output = input.map { data => for (value <- data.getValues()) { if (record.getEnum() == DataEnum.BLUE) { blueCount += 1 println("Enum = BLUE : " + value.toString() } } data }.persist(StorageLevel.MEMORY_ONLY_SER) output.saveAsTextFile("myOutput") Entonces, el blueCount no es cero, ¡pero no obtuve ningún resultado de println ()! ¿Me […]

Cómo forzar la evaluación de DataFrame en Spark

Algunas veces (p. Ej., Para pruebas y marcación) deseo forzar la ejecución de las transformaciones definidas en un DataFrame. AFAIK llamando a una acción como count no garantiza que todas las Columns sean realmente calculadas, show solo puede calcular un subconjunto de todas las Rows (ver ejemplos a continuación) Mi solución es escribir el DataFrame […]

NullPointerException en Scala Spark, parece ser causado por el tipo de colección?

sessionIdList es de tipo: scala> sessionIdList res19: org.apache.spark.rdd.RDD [String] = MappedRDD [17] en distintos en: 30 Cuando bash ejecutar el código siguiente: val x = sc.parallelize(List(1,2,3)) val cartesianComp = x.cartesian(x).map(x => (x)) val kDistanceNeighbourhood = sessionIdList.map(s => { cartesianComp.filter(v => v != null) }) kDistanceNeighbourhood.take(1) Recibo una excepción 14/05/21 16:20:46 ERROR Executor: Exception in task […]