Thursday, 9 June 2016

Integrating Kafka, Spark Streaming and Hbase to power a real time dashboard



Here is a simple POC project which integrates Kafka, Spark Streaming, Hbase, NodeJs and D3 Js.
I got inspired by this idea from here where they explained how a real time streaming application with a UI dashboard is developed using Spark Streaming.

Below is the block diagram from sigmoid.



I developed a sample prototype project called Voting machine which integrates different components like Kafka, Spark Streaming, Hbase, NodeJs and D3 Js as above. The application will listen a kafka topic, aggregate the votes received from different parties and project a real time bar chart dashboard using D3 Js.

This is just a simple prototype which helps in understanding the integration of multiple components. The use case I'm trying to build here is a simple vote count aggregator which can be easily implemented using  java with JMS. I just wanted to try a demo. The actual use case should be doing some analytics on real time data feeds from Twitter, Facebook and other social medias or detecting the fraudulent credit transaction from the real time transaction feed data.

Below are the components of the application.

Kafka Topic :

A Kafka broker server has to be started and a topic has to be created to feed the real time votes. Here, I used the in built Kafka producer to feed the votes manually through the terminal.





Spark Streaming :

This is spark streaming component which will listen to the Kafka topic. A Spark DStream batch is created using the KafkaUtils.createStream() having a batch interval of 2 seconds. The spark program will aggregate the vote count of the current batch, push the aggregated count to a noSql db HBase.




Hbase :

HBase is a distributed column-oriented database built on top of the Hadoop file system. It is a part of the Hadoop ecosystem that provides random real-time read/write access to data in the Hadoop File System. Here I used HBase Apis to fetch and update the vote count in hbase.

Global aggregation:
Along with processing the count of the current batch, it will aggregate the previous count in the hBase db.



NodeJs
NodeJs is server side Java script library which can used to configure a server very easily.
A simple nodeJs restApi server is configured inorder to get the current vote counts in Json format. express package in nodeJs is used to set up the route rule for /fetch url pattern.

Here I used HBase thrift Apis to fetch the vote count in hbase. We can also use Hbase Rest Apis to connect/fetch the data from Hbase. The only difference is that in hbase thrift api, the payload data is in byte stream and size of the payload will be less compared to the restApi payload.

In-order to connect to the Hbase thrift api, Hbase.js and thrift module has to be exported to the nodeJs project.

Rest Api : http://localhost:8081/fetch

sample json data:



D3 JS

A vote count barchart is developed using d3 Js library. The Json vote data is consumed from the nodeJS server and the barchart is rendered using the current vote data.




 

 The code is available in here in github.


Wednesday, 8 June 2016

Challanges working with sbt build tool



sbt is a build tool(like maven) developed in scala program. Nowadays sbt is widely used to build scala application. The common challange in using the sbt is you often get below error. The reason is that if you have two dependencies with same file in it, sbt will error out such conflicts by default. The default merge strategy is configured as "MergeStrategy.deduplicate".

[error] (*:assembly) deduplicate: different file contents found in the following:
[error] /home/cloudera/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:javax/servlet/Filter.class
[error] /home/cloudera/.ivy2/cache/org.mortbay.jetty/servlet-api/jars/servlet-api-2.5-20081211.jar:javax/servlet/Filter.class


In order to resolve the conflicts, you have 2 options.

Exclude Jars
You can exclude the unwanted inner dependencies which is causing the conflits and keep only one.  But it is really difficult to find the inner dependencies and exclude.You can follow this approach if you are really sure about the dependencies. For more clarity on dependencies on your package, you can refer the mvnrepository site and exclude the inner dependencies like below:


libraryDependencies ++= Seq(
    ("org.apache.spark" %% "spark-hive" % "1.3.1").exclude("com.twitter", "parquet-hadoop-bundle").exclude("org.apache.avro", "avro-ipc").exclude("com.twitter", "parquet-format") ,
     "org.apache.spark" %% "spark-streaming" % "1.3.1",
)



Merge Strategy
You can choose your merge strategy based on your conflicts to discard, use the first one or use the last one as below.

Suppose you are getting below conflicts :

[error] (*:assembly) deduplicate: different file contents found in the following:
[error] /home/cloudera/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:javax/servlet/Filter.class
[error] /home/cloudera/.ivy2/cache/org.mortbay.jetty/servlet-api/jars/servlet-api-2.5-20081211.jar:javax/servlet/Filter.class

inorder to set the strategy for the conflicts "javax/servlet/Filter.class", you pass path keyword 'javax' , 'servlet' to the PathList and select MergeStrategy.discard , MergeStrategy.first or MergeStrategy.last based on your choice. If you have many keyword, pas as many keywords to PathList so that it can uniquely identify the conflict resources. You can also use "startsWith" to identify the conflicts.


mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case x if x.startsWith("META-INF/maven/com.fasterxml.jackson.core") => MergeStrategy.last
    case x if x.startsWith("META-INF/maven/commons-logging") => MergeStrategy.last
    case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
    case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
    case x if x.startsWith("plugin.properties") => MergeStrategy.last

    case PathList("javax", "servlet", xs@_ *) => MergeStrategy.discard
    case x => old(x)
  }

   
Reference : http://stackoverflow.com/questions/14791955/assembly-merge-strategy-issues-using-sbt-assembly