Archives November 2017

HBase, Zeppelin and the Mighty Spark

Hadoop is all about computation power and scalability. When it comes usability it commonly known lacking. Apache Zeppelin is a young project from Apache community that focuses on data visualization. It is earning people’s love because it is filling the missing piece. Recently I came across a project in which I need to built an analytic solution using Zeppelin, HBase and Spark. They are all standard and popular software. “Should be easy task” was my first thought. It turned out to be a daunting task since HBase connector is not a built-in in Spark. In the rest of this article, I will go through the steps to make it work.

Choosing HBase Connector

Unlike the other Hadoop components such as HDFS, Hive etc, Spark has no built-in connector to access HBase data. There is a few choices available:

  • HBase Spark is the official connector from HBase project. It is relatively a young project comparing to other choices and comes with little documentation.
  • Spark on HBase is backed by Hortonworks and has a longer history than HBase Spark project
  • Spark HBase Connector is another connector with very good documentation.

Actually, this is not a choice user needs to make. You should always go with the one comes with your Hadoop distribution. The Hadoop vendors such as Cloudera and Hortonworks built their only version of Hadoop, HDFS, Spark, HBase etc. Lots of times, things just don’t work on a specific distribution if you grab a random 3rd-party library.

My project is built on Cloudera 5.7. It comes with HBase Spark connector. I am glad it is the official connector from HBase community. But there is almost no documents. It is said good developers never write documents. I hope it is true in this case.

Configure Zeppelin

Spark is a built-in Zeppelin. In our case, the HBase Spark Connector comes with Cloudera only works with their own version of Spark. The first few changes is in zeppelin-env.sh file under the conf folder.

# spark folder from your distribution export 
SPARK_HOME=/usr/lib/spark 
# hbase folder from your distribution export 
HBASE_HOME=/usr/lib/hbase 
# if you want Zeppelin to execute your spark code in yarn. you should have the proper core-site.xml, hdfs-site.xml and yarn-site.xml in this folder 
HADOOP_CONF_DIR=~/hadoop-conf

Now go to Interpreter configuration located at the top right corner of Zeppelin UI. Add the following jar file to the Dependencies section of Spark interpreter. Noted file name and location varies depending what  distribution you use.

/usr/lib/hbase/lib/hbase-client-1.2.0-cdh5.12.0.jar
/usr/lib/hbase/lib/hbase-common-1.2.0-cdh5.12.0.jar 
/usr/lib/hbase/lib/hbase-spark-1.2.0-cdh5.12.0.jar
/usr/lib/hbase/lib/hbase-server-1.2.0-cdh5.12.0.jar
/usr/lib/hbase/lib/hbase-protocol-1.2.0-cdh5.12.0.jar
/usr/lib/hbase/lib/hbase-hadoop-compat-1.2.0-cdh5.12.0.jar
/usr/lib/hbase/lib/htrace-core-3.2.0-incubating.jar

Restart Zeppelin, now you should be able to use HBase Spark connector.

Exercising Spark

Now, let’s write some code to produce meaningful result. I have a database with the following tables and columns in HBase from a help desk application. I am trying to produce a chart showing the number of tickets assigned to different departments.

 

Table Column Family Column
tickets d ticket_id primary key of the ticket table
tickets d subject subject of the ticket
tickets d department_id department id the ticket is assigned to
departments d department_id primary key of the departments table
departments d name name of the department

Go to the Zeppelin Notebook, first import the APIs we will be using

%spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.io.File
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util._
import org.apache.hadoop.hbase.spark._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import sqlContext.implicits._

Secondly, we need some helper functions that give us the HBase connection and convert HBase bytes to meaningful values

def getString(r:Result, c:String):String = {
	var cell = r.getColumnLatestCell(Bytes.toBytes("d"), Bytes.toBytes(c))
	Bytes.toString(CellUtil.cloneValue(cell))
}

def getInteger(r:Result, c:String):Integer = {
	var cell = r.getColumnLatestCell(Bytes.toBytes("d"), Bytes.toBytes(c))
	Bytes.toInt(CellUtil.cloneValue(cell))
}

def getHbaseConfig():Configuration = {
	val conf = HBaseConfiguration.create()
	conf.set("hbase.zookeeper.quorum", "quickstart.cloudera")
	return conf
}

def getHbaseContext():HBaseContext = {
	val conf = getHbaseConfig()
	new HBaseContext(sc, conf)
}

Next is the most interesting part. The following code will build the mappings between Spark tables and HBase tables

val hc = getHbaseContext()
var rdd = hc.hbaseRDD(TableName.valueOf("test", "tickets"), new Scan())
case class Ticket(tick_id:Integer, department_id:Integer, subject:String)
var map = rdd.map( x => {
	val subject = getString(x._2, "subject")
	var ticket_id = getInteger(x._2, "ticket_id")
	var depart_id = getInteger(x._2, "department_id")
	Ticket(ticket_id,depart_id,subject)
})
map.toDF().registerTempTable("tickets")
val rddDepartments =  hc.hbaseRDD(TableName.valueOf("test", "departments"), new Scan())
case class Department(department_id:Integer, name:String)
val mapDepartment = rddDepartments.map( x => {
	val id = getInteger(x._2, "department_id")
	val name = getString(x._2, "name")
	Department(id, name)
})
mapDepartment.toDF().registerTempTable("departments")

I choose to use Spark Datarframe in this example because it can be used with Spark SQL later. It is an option of course. Without Dataframe, you can process data using RDD API. It is more complex but can do things that SQL can’t.

Execute the above code, we now have tables tickets and departments registered in the Spark Context

Charts with Spark SQL

Open a new paragraph in Zeppelin notebook. Write the following code. It will give the top 5 departments with the highest ticket count.

%spark.sql
select d.name, count(*) count from tickets t
join departments d on (t.department_id = d.department_id)
group by d.name
order by count(*) desc
limit 5

zeppeling-result-table

Switch over to pie chart, it will show a pretty chart as below

zeppelin-result-pie-chart

Version of the Software

For your reference, below are the versions of the software used in the setup.

Cloudera CDH 5.12.0
Zeppelin 0.7.3 netinst
Conclusion

Despite the painful configuration that makes the whole setup to work. Zeppelin and Spark is a sweet couple that is both productive and powerful. It saves significant amount of work to write code, debug logic and visualize the data. With the power of Hadoop/Spark, the same logic can be distributed to thousands of machines and process billions of records.