HBase, Zeppelin and the Mighty Spark
Hadoop is all about computation power and scalability. When it comes usability it commonly known lacking. Apache Zeppelin is a young project from Apache community that focuses on data visualization. It is earning people’s love because it is filling the missing piece. Recently I came across a project in which I need to built an analytic solution using Zeppelin, HBase and Spark. They are all standard and popular software. “Should be easy task” was my first thought. It turned out to be a daunting task since HBase connector is not a built-in in Spark. In the rest of this article, I will go through the steps to make it work.
Choosing HBase Connector
Unlike the other Hadoop components such as HDFS, Hive etc, Spark has no built-in connector to access HBase data. There is a few choices available:
- HBase Spark is the official connector from HBase project. It is relatively a young project comparing to other choices and comes with little documentation.
- Spark on HBase is backed by Hortonworks and has a longer history than HBase Spark project
- Spark HBase Connector is another connector with very good documentation.
Actually, this is not a choice user needs to make. You should always go with the one comes with your Hadoop distribution. The Hadoop vendors such as Cloudera and Hortonworks built their only version of Hadoop, HDFS, Spark, HBase etc. Lots of times, things just don’t work on a specific distribution if you grab a random 3rd-party library.
My project is built on Cloudera 5.7. It comes with HBase Spark connector. I am glad it is the official connector from HBase community. But there is almost no documents. It is said good developers never write documents. I hope it is true in this case.
Configure Zeppelin
Spark is a built-in Zeppelin. In our case, the HBase Spark Connector comes with Cloudera only works with their own version of Spark. The first few changes is in zeppelin-env.sh file under the conf folder.
# spark folder from your distribution export SPARK_HOME=/usr/lib/spark # hbase folder from your distribution export HBASE_HOME=/usr/lib/hbase # if you want Zeppelin to execute your spark code in yarn. you should have the proper core-site.xml, hdfs-site.xml and yarn-site.xml in this folder HADOOP_CONF_DIR=~/hadoop-conf
Now go to Interpreter configuration located at the top right corner of Zeppelin UI. Add the following jar file to the Dependencies section of Spark interpreter. Noted file name and location varies depending what distribution you use.
/usr/lib/hbase/lib/hbase-client-1.2.0-cdh5.12.0.jar /usr/lib/hbase/lib/hbase-common-1.2.0-cdh5.12.0.jar /usr/lib/hbase/lib/hbase-spark-1.2.0-cdh5.12.0.jar /usr/lib/hbase/lib/hbase-server-1.2.0-cdh5.12.0.jar /usr/lib/hbase/lib/hbase-protocol-1.2.0-cdh5.12.0.jar /usr/lib/hbase/lib/hbase-hadoop-compat-1.2.0-cdh5.12.0.jar /usr/lib/hbase/lib/htrace-core-3.2.0-incubating.jar
Restart Zeppelin, now you should be able to use HBase Spark connector.
Exercising Spark
Now, let’s write some code to produce meaningful result. I have a database with the following tables and columns in HBase from a help desk application. I am trying to produce a chart showing the number of tickets assigned to different departments.
Table | Column Family | Column | |
---|---|---|---|
tickets | d | ticket_id | primary key of the ticket table |
tickets | d | subject | subject of the ticket |
tickets | d | department_id | department id the ticket is assigned to |
departments | d | department_id | primary key of the departments table |
departments | d | name | name of the department |
Go to the Zeppelin Notebook, first import the APIs we will be using
%spark import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import java.io.File import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util._ import org.apache.hadoop.hbase.spark._ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import sqlContext.implicits._
Secondly, we need some helper functions that give us the HBase connection and convert HBase bytes to meaningful values
def getString(r:Result, c:String):String = { var cell = r.getColumnLatestCell(Bytes.toBytes("d"), Bytes.toBytes(c)) Bytes.toString(CellUtil.cloneValue(cell)) } def getInteger(r:Result, c:String):Integer = { var cell = r.getColumnLatestCell(Bytes.toBytes("d"), Bytes.toBytes(c)) Bytes.toInt(CellUtil.cloneValue(cell)) } def getHbaseConfig():Configuration = { val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "quickstart.cloudera") return conf } def getHbaseContext():HBaseContext = { val conf = getHbaseConfig() new HBaseContext(sc, conf) }
Next is the most interesting part. The following code will build the mappings between Spark tables and HBase tables
val hc = getHbaseContext() var rdd = hc.hbaseRDD(TableName.valueOf("test", "tickets"), new Scan()) case class Ticket(tick_id:Integer, department_id:Integer, subject:String) var map = rdd.map( x => { val subject = getString(x._2, "subject") var ticket_id = getInteger(x._2, "ticket_id") var depart_id = getInteger(x._2, "department_id") Ticket(ticket_id,depart_id,subject) }) map.toDF().registerTempTable("tickets") val rddDepartments = hc.hbaseRDD(TableName.valueOf("test", "departments"), new Scan()) case class Department(department_id:Integer, name:String) val mapDepartment = rddDepartments.map( x => { val id = getInteger(x._2, "department_id") val name = getString(x._2, "name") Department(id, name) }) mapDepartment.toDF().registerTempTable("departments")
I choose to use Spark Datarframe in this example because it can be used with Spark SQL later. It is an option of course. Without Dataframe, you can process data using RDD API. It is more complex but can do things that SQL can’t.
Execute the above code, we now have tables tickets and departments registered in the Spark Context
Charts with Spark SQL
Open a new paragraph in Zeppelin notebook. Write the following code. It will give the top 5 departments with the highest ticket count.
%spark.sql select d.name, count(*) count from tickets t join departments d on (t.department_id = d.department_id) group by d.name order by count(*) desc limit 5
Switch over to pie chart, it will show a pretty chart as below
Version of the Software
For your reference, below are the versions of the software used in the setup.
Cloudera CDH 5.12.0 |
Zeppelin 0.7.3 netinst |
Conclusion
Despite the painful configuration that makes the whole setup to work. Zeppelin and Spark is a sweet couple that is both productive and powerful. It saves significant amount of work to write code, debug logic and visualize the data. With the power of Hadoop/Spark, the same logic can be distributed to thousands of machines and process billions of records.
Leave a Reply