Cómo leer desde hbase usando chispa

El código a continuación se leerá desde hbase, luego lo convertirá a estructura json y convertirá a schemaRDD, pero el problema es que estoy using List para almacenar la cadena json y luego pasar a javaRDD, para datos de aproximadamente 100 GB, el maestro será cargado con datos en la memoria. ¿Cuál es la forma correcta de cargar los datos de hbase y luego realizar la manipulación, luego convertir a JavaRDD.

 package hbase_reader; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.JavaSchemaRDD; import org.apache.commons.cli.ParseException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import scala.Function1; import scala.Tuple2; import scala.runtime.AbstractFunction1; import com.google.common.collect.Lists; public class hbase_reader { public static void main(String[] args) throws IOException, ParseException { List jars = Lists.newArrayList(""); SparkConf spconf = new SparkConf(); spconf.setMaster("local[2]"); spconf.setAppName("HBase"); //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1"); spconf.setJars(jars.toArray(new String[jars.size()])); JavaSparkContext sc = new JavaSparkContext(spconf); //spconf.set("spark.executor.memory", "1g"); JavaSQLContext jsql = new JavaSQLContext(sc); HBaseConfiguration conf = new HBaseConfiguration(); String tableName = "HBase.CounData1_Raw_Min1"; HTable table = new HTable(conf,tableName); try { ResultScanner scanner = table.getScanner(new Scan()); List jsonList = new ArrayList(); String json = null; for(Result rowResult:scanner) { json = ""; String rowKey = Bytes.toString(rowResult.getRow()); for(byte[] s1:rowResult.getMap().keySet()) { String s1_str = Bytes.toString(s1); String jsonSame = ""; for(byte[] s2:rowResult.getMap().get(s1).keySet()) { String s2_str = Bytes.toString(s2); for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) { String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3)); jsonSame += "\""+s2_str+"\":"+s3_str+","; } } jsonSame = jsonSame.substring(0,jsonSame.length()-1); json += "\""+s1_str+"\""+":{"+jsonSame+"}"+","; } json = json.substring(0,json.length()-1); json = "{\"RowKey\":\""+rowKey+"\","+json+"}"; jsonList.add(json); } JavaRDD jsonRDD = sc.parallelize(jsonList); JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD); System.out.println(schemaRDD.take(2)); } finally { table.close(); } } } 

Un ejemplo básico para leer los datos de HBase usando Spark (Scala), también puedes ver esto en Java:

 import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor } import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark._ object HBaseRead { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val tableName = "table1" System.setProperty("user.name", "hdfs") System.setProperty("HADOOP_USER_NAME", "hdfs") conf.set("hbase.master", "localhost:60000") conf.setInt("timeout", 120000) conf.set("hbase.zookeeper.quorum", "localhost") conf.set("zookeeper.znode.parent", "/hbase-unsecure") conf.set(TableInputFormat.INPUT_TABLE, tableName) val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(tableName)) { val tableDesc = new HTableDescriptor(tableName) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) println("Number of Records found : " + hBaseRDD.count()) sc.stop() } } 

ACTUALIZADO -2016

A partir de Spark 1.0.x +, ahora también puedes usar el Conector Spark-HBase:

Dependencia de Maven para incluir:

  it.nerdammer.bigdata spark-hbase-connector_2.10 1.0.3 // Version can be changed as per your Spark version, I am using Spark 1.6.x  

Y encuentre un código de muestra a continuación para el mismo:

 import org.apache.spark._ import it.nerdammer.spark.hbase._ object HBaseRead extends App { val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]") sparkConf.set("spark.hbase.host", "") //eg 192.168.1.1 or localhost or your hostanme val sc = new SparkContext(sparkConf) // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then: val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document") .select("DocID", "Title").inColumnFamily("SMPL") println("Number of Records found : " + docRdd .count()) } 

ACTUALIZADO – 2017

A partir de Spark 1.6.x +, ahora también puede usar SHC Connector (usuarios de Hortonworks o HDP):

Dependencia de Maven para incluir:

   com.hortonworks shc 1.0.0-2.0-s_2.11 // Version depends on the Spark version and is supported upto Spark 2.x  

La principal ventaja de utilizar este conector es que tiene flexibilidad en la definición del esquema y no necesita parametros codificados como en nerdammer / spark-hbase-connector. Además, recuerde que es compatible con Spark 2.x, por lo que este conector es bastante flexible y brinda soporte de extremo a extremo en problemas y relaciones públicas.

Busque la ruta de depósito siguiente para el último archivo léame y las muestras:

Conector Hortonworks Spark HBase

También puede convertir este RDD a DataFrames y ejecutar SQL sobre él o puede asignar estos Dataset o DataFrames a las clases Java Pojo o Case definidas por el usuario. Funciona genial.

Por favor comente abajo si necesita algo más.