¿Cómo mejorar el rendimiento de los trabajos Spark lentos utilizando DataFrame y la conexión JDBC?

Estoy intentando acceder a una tabla de Teradata de tamaño medio (~ 100 millones de filas) a través de JDBC en modo independiente en un solo nodo (local [*]).

Estoy usando Spark 1.4.1. y está configurado en una máquina muy poderosa (2 CPU, 24 núcleos, 126G de RAM).

He intentado varias configuraciones de memoria y opciones de ajuste para que funcione más rápido, pero ninguno de ellos tuvo un gran impacto.

Estoy seguro de que hay algo de lo que me estoy perdiendo y debajo está mi prueba final que tardó unos 11 minutos en obtener este conteo simple versus solo tardó 40 segundos usando una conexión JDBC a través de R para obtener los conteos.

bin/pyspark --driver-memory 40g --executor-memory 40g df = sqlContext.read.jdbc("jdbc:teradata://......) df.count() 

Cuando probé con la tabla BIG (registros 5B), no se obtuvieron resultados una vez completada la consulta.

Todas las operaciones de agregación se realizan después de que todo el conjunto de datos se recupera en la memoria en una colección de DataFrame . Así que hacer el conteo en Spark nunca será tan eficiente como lo sería directamente en TeraData. A veces vale la pena impulsar algunos cálculos en la base de datos creando vistas y luego mapeando esas vistas usando la API de JDBC.

Cada vez que utiliza el controlador JDBC para acceder a una tabla grande, debe especificar la estrategia de partición; de lo contrario, creará un DataFrame / RDD con una sola partición y sobrecargará la conexión JDBC individual.

En cambio, quieres probar la siguiente IA (desde Spark 1.4.0+):

 sqlctx.read.jdbc( url = "", table = "", columnName = "", lowerBound = minValue, upperBound = maxValue, numPartitions = 20, connectionProperties = new java.util.Properties() )

También hay una opción para empujar hacia abajo algunos filtros.

Si no tiene una columna integral uniformemente distribuida, quiere crear algunas particiones personalizadas especificando predicados personalizados (sentencias where ). Por ejemplo, supongamos que tiene una columna de marca de tiempo y desea dividir por rangos de fechas:

  val predicates = Array( "2015-06-20" -> "2015-06-30", "2015-07-01" -> "2015-07-10", "2015-07-11" -> "2015-07-20", "2015-07-21" -> "2015-07-31" ) .map { case (start, end) => s"cast(DAT_TME as date) >= date '$start' AND cast(DAT_TME as date) < = date '$end'" } predicates.foreach(println) // Below is the result of how predicates were formed //cast(DAT_TME as date) >= date '2015-06-20' AND cast(DAT_TME as date) < = date '2015-06-30' //cast(DAT_TME as date) >= date '2015-07-01' AND cast(DAT_TME as date) < = date '2015-07-10' //cast(DAT_TME as date) >= date '2015-07-11' AND cast(DAT_TME as date) < = date //'2015-07-20' //cast(DAT_TME as date) >= date '2015-07-21' AND cast(DAT_TME as date) < = date '2015-07-31' sqlctx.read.jdbc( url = "", table = "", predicates = predicates, connectionProperties = new java.util.Properties() )

DataFrame un DataFrame donde cada partición contendrá los registros de cada subconsulta asociada a los diferentes predicados.

Verifique el código fuente en DataFrameReader.scala

¿La tabla no serializada encaja en 40 GB? Si comienza a intercambiarse en el disco, el rendimiento disminuirá drásticamente.

De todos modos, cuando usas un JDBC estándar con syntax ansi SQL, aprovechas el motor de DB, así que si teradata (no sé teradata) contiene estadísticas sobre tu tabla, un clásico “seleccionar conteo (*) de la tabla” será muy rápido. En lugar de chispa, está cargando sus 100 millones de filas en la memoria con algo así como “seleccionar * de la tabla” y luego realizará un recuento en las filas de RDD. Es una carga de trabajo bastante diferente.