Mejor forma de convertir un campo de cadena en timestamp en Spark

Tengo un CSV en el que un campo es datetime en un formato específico. No puedo importarlo directamente en mi Dataframe porque necesita ser una marca de tiempo. Así que lo importo como una cadena y lo convierto en una Timestamp como esta

 import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.sql.Row def getTimestamp(x:Any) : Timestamp = { val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") if (x.toString() == "") return null else { val d = format.parse(x.toString()); val t = new Timestamp(d.getTime()); return t } } def convert(row : Row) : Row = { val d1 = getTimestamp(row(3)) return Row(row(0),row(1),row(2),d1) } 

¿Hay una forma mejor y más concisa de hacer esto con la API Dataframe o spark-sql? El método anterior requiere la creación de un RDD y dar el esquema para el Dataframe nuevamente.

Spark> = 2.2

 import org.apache.spark.sql.functions.to_timestamp val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") df.withColumn("ts", ts).show(2, false) // +---+-------------------+-------------------+ // |id |dts |ts | // +---+-------------------+-------------------+ // |1 |05/26/2016 01:01:01|2016-05-26 01:01:01| // |2 |#$@#@# |null | // +---+-------------------+-------------------+ 

Chispa> = 1.6, <2.2

Puede utilizar las funciones de procesamiento de fecha que se han introducido en Spark 1.5. Suponiendo que tiene los siguientes datos:

 val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts") 

Puede usar unix_timestamp para analizar cadenas y convertirlo en timestamp

 import org.apache.spark.sql.functions.unix_timestamp val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp") df.withColumn("ts", ts).show(2, false) // +---+-------------------+---------------------+ // |id |dts |ts | // +---+-------------------+---------------------+ // |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0| // |2 |#$@#@# |null | // +---+-------------------+---------------------+ 

Como puede ver, cubre el análisis y el manejo de errores.

Chispa> = 1.5, <1.6

Tendrás que usar usar algo como esto:

 unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp") 

o

 (unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp") 

debido a SPARK-11724 .

Spark <1.5

deberías poder usarlos con expr y HiveContext .

Todavía no he jugado con Spark SQL, pero creo que esto sería scala más idiomático (el uso nulo no se considera una buena práctica):

 def getTimestamp(s: String) : Option1536729788 = s match { case "" => None case _ => { val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") Try(new Timestamp(format.parse(s).getTime)) match { case Success(t) => Some(t) case Failure(_) => None } } } 

Tenga en cuenta que supongo que conoce los tipos de elementos Row antemano (si los lee de un archivo csv, todos son String ), es por eso que uso un tipo apropiado como String y not Any (todo es subtipo de Any ).

También depende de cómo quiera manejar las excepciones de análisis. En este caso, si se produce una excepción de análisis, simplemente se devuelve None .

Puede usarlo más adelante con:

 rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3)) 

Tengo la marca de tiempo ISO8601 en mi conjunto de datos y necesitaba convertirlo al formato “aaaa-MM-dd”. Esto es lo que hice:

 import org.joda.time.{DateTime, DateTimeZone} object DateUtils extends Serializable { def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC) def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC) } sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd")) 

Y solo puede usar el UDF en su consulta SQL de chispa.

Me gustaría mover el método getTimeStamp escrito por ti en mapPartitions de rdd y reutilizar GenericMutableRow entre filas en un iterador:

 val strRdd = sc.textFile("hdfs://path/to/cvs-file") val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter => new Iterator[Row] { val row = new GenericMutableRow(4) var current: Array[String] = _ def hasNext = iter.hasNext def next() = { current = iter.next() row(0) = current(0) row(1) = current(1) row(2) = current(2) val ts = getTimestamp(current(3)) if(ts != null) { row.update(3, ts) } else { row.setNullAt(3) } row } } } 

Y aún debe usar el esquema para generar un DataFrame

 val df = sqlContext.createDataFrame(rowRdd, tableSchema) 

El uso de GenericMutableRow dentro de una implementación de iterador se puede encontrar en Operador Agregado , InMemoryColumnarTableScan , ParquetTableOperations , etc.

Yo usaría https://github.com/databricks/spark-csv

Esto inferirá marcas de tiempo para usted.

 import com.databricks.spark.csv._ val rdd: RDD[String] = sc.textFile("csvfile.csv") val df : DataFrame = new CsvParser().withDelimiter('|') .withInferSchema(true) .withParseMode("DROPMALFORMED") .csvRdd(sqlContext, rdd)