Spark read file from S3 using sc.textFile (“s3n: // …)

Intentando leer un archivo ubicado en S3 usando spark-shell:

scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log") lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at :12 scala> myRdd.count java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) ... etc ... 

IOException: No FileSystem para el esquema: se ha producido un error s3n con:

  • Spark 1.31 o 1.40 en la máquina dev (sin Hadoop libs)
  • Funciona desde Hortonworks Sandbox HDP v2.2.4 (Hadoop 2.60) que integra Spark 1.2.1 fuera de la caja
  • Usando el esquema s3: // o s3n: //

¿Cuál es la causa de este error? ¿Falta la dependencia, la configuración faltante o el mal uso de sc.textFile() ?

O puede deberse a un error que afecta a la comstackción de Spark específica de Hadoop 2.60, ya que esta publicación parece sugerir. Voy a probar Spark para Hadoop 2.40 para ver si esto resuelve el problema.

Confirmó que esto está relacionado con la comstackción Spark contra Hadoop 2.60. Acabo de instalar Spark 1.4.0 “Preconstruido para Hadoop 2.4 y posterior” (en lugar de Hadoop 2.6). Y el código ahora funciona bien.

sc.textFile("s3n://bucketname/Filename") ahora genera otro error:

 java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). 

El siguiente código utiliza el formato S3 URL para mostrar que Spark puede leer el archivo S3. Usando la máquina dev (sin Hadoop libs).

 scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt") lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at :21 scala> lyrics.count res1: Long = 9 

Mejor aún : el código anterior con las credenciales de AWS en línea en el URI de S3N se romperá si la clave secreta de AWS tiene un “/” hacia adelante. Configurar las credenciales de AWS en SparkContext lo arreglará. El código funciona si el archivo S3 es público o privado.

 sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/" val myRDD = sc.textFile("s3n://myBucket/MyFilePattern") myRDD.count 

A pesar de que esta pregunta ya tiene una respuesta aceptada, creo que todavía faltan los detalles exactos de por qué sucede esto. Entonces creo que podría haber un lugar para una respuesta más.

Si agrega la dependencia requerida de hadoop-aws , su código debería funcionar.

Iniciando Hadoop 2.6.0, el conector s3 FS se ha movido a una biblioteca separada llamada hadoop-aws. También hay un Jira para eso: mueva el código del conector FS relacionado con s3 a hadoop-aws .

Esto significa que cualquier versión de chispa que se haya creado contra Hadoop 2.6.0 o posterior deberá usar otra dependencia externa para poder conectarse al Sistema de archivos S3.
Aquí hay un ejemplo de sbt que he probado y que está funcionando como esperaba usando Apache Spark 1.6.2 construido contra Hadoop 2.6.0:

libraryDependencies + = “org.apache.hadoop”% “hadoop-aws”% “2.6.0”

En mi caso, encontré algunos problemas de dependencias, por lo que resolví agregando la exclusión:

libraryDependencies + = “org.apache.hadoop”% “hadoop-aws”% “2.6.0” exclude (“tomcat”, “jasper-comstackdor”) excludeAll ExclusionRule (organization = “javax.servlet”)

En otra nota relacionada, todavía tengo que probarlo, pero se recomienda utilizar el sistema de archivos “s3a” y no el “s3n” a partir de Hadoop 2.6.0.

La tercera generación, s3a: sistema de archivos. Diseñado para ser un interruptor en reemplazo de s3n :, este enlace del sistema de archivos admite archivos más grandes y promete un mayor rendimiento.

Puede agregar el parámetro –packages con el jar apropiado: a su presentación:

 bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py 

Este es un código de chispa de muestra que puede leer los archivos presentes en s3

 val hadoopConf = sparkContext.hadoopConfiguration hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", s3Key) hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret) var jobInput = sparkContext.textFile("s3://" + s3_location) 

Se encontró con el mismo problema en Spark 2.0.2. Lo resolvió alimentándolo con los flasks. Esto es lo que ejecuté:

 $ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,jackson-annotations-2.7.0.jar,jackson-core-2.7.0.jar,jackson-databind-2.7.0.jar,joda-time-2.9.6.jar scala> val hadoopConf = sc.hadoopConfiguration scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem") scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId) scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey) scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) scala> sqlContext.read.parquet("s3://your-s3-bucket/") 

obviamente, necesitas tener los flasks en el camino donde estás ejecutando la chispa de

Para Spark 1.4.x “Pre construido para Hadoop 2.6 y posterior”:

Acabo de copiar los paquetes necesarios S3, S3native de hadoop-aws-2.6.0.jar a spark-assembly-1.4.1-hadoop2.6.0.jar.

Después de eso, reinicié el chispero y funciona. No olvide verificar el propietario y el modo del conjunto jar.

Hay un Spark JIRA, SPARK-7481 , abierto a partir de hoy, 20 de octubre de 2016, para agregar un módulo de chispa-nube que incluye dependencias transitorias en todo s3a y azul wasb: need, junto con las pruebas.

Y un Spark PR para que coincida. Así es como recibo el soporte de s3a en mis comstackciones de chispa

Si lo haces a mano, debes obtener el JAR de hadoop-aws de la versión exacta que tiene el rest de tus JARS de hadoop, y una versión de los JAR de AWS 100% sincronizada con lo que Hadoop aws compiló. Para Hadoop 2.7. {1, 2, 3, …}

 hadoop-aws-2.7.x.jar aws-java-sdk-1.7.4.jar joda-time-2.9.3.jar + jackson-*-2.6.5.jar 

Pegue todos estos en SPARK_HOME/jars . Ejecute la chispa con sus credenciales configuradas en Env vars o en spark-default.conf

la prueba más simple es ¿puedes hacer un recuento de líneas de un archivo CSV?

 val landsatCSV = "s3a://landsat-pds/scene_list.gz" val lines = sc.textFile(landsatCSV) val lineCount = lines.count() 

Consigue un número: todo está bien. Obtener un seguimiento de stack. Malas noticias.

Tuve que copiar los archivos jar de una descarga de hadoop en el $SPARK_HOME/jars . Usar la bandera --jars o la bandera --packages para spark-submit no funcionó.

Detalles:

  • Spark 2.3.0
  • Hadoop descargado fue 2.7.6
  • Dos archivos jar copiados fueron de (hadoop dir)/share/hadoop/tools/lib/
    • aws-java-sdk-1.7.4.jar
    • hadoop-aws-2.7.6.jar

S3N no es un formato de archivo predeterminado. Debe comstackr su versión de Spark con una versión de Hadoop que tenga las bibliotecas adicionales utilizadas para la compatibilidad con AWS. Información adicional que encontré aquí, https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce

Probablemente tengas que usar s3a: / scheme en lugar de s3: / o s3n: / Sin embargo, no está funcionando de la caja (para mí) para el chispero. Veo la siguiente stacktrace:

 java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:31) at $iwC$$iwC$$iwC$$iwC$$iwC.(:33) at $iwC$$iwC$$iwC$$iwC.(:35) at $iwC$$iwC$$iwC.(:37) at $iwC$$iwC.(:39) at $iwC.(:41) at (:43) at .(:47) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072) ... 68 more 

Lo que creo – tienes que agregar manualmente la dependencia de hadoop-aws manualmente http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jar Pero no tengo ni idea de cómo hacerlo agrégalo a la chispa-shell correctamente.

Estaba enfrentando el mismo problema. Funcionó bien después de establecer el valor de fs.s3n.impl y agregar la dependencia de hadoop-aws.

 sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId) sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey) sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") 

Utilice s3a en lugar de s3n. Tuve un problema similar en un trabajo de Hadoop. Después de cambiar de s3n a s3a funcionó.

p.ej

s3a: //myBucket/myFile1.log