Saturday 11 August 2018

Building Real-Time Data-pipeline using Spark Streaming and Kafka


Real-time data pipeline is one of the integral component for building end to end data streaming systems. Organizations are gearing towards this model to implement powerful ETL tools to ship data from one system to another in real time. It has also become a backbone to support the analytics and data visualization platform/systems. Below are some of widely used models for data streaming.



ETL model that perform a real time transport from one system to another.





 Analytic model that perform real time prediction of streaming data
Publisher- Subscriber model


In this blog, I'm trying to make a case study for simple data streaming pipeline using Kafka and Spark Streaming. This focuses on the fault tolerance and throughput considerations while developing such systems. Typically the core logic in data pipeline is just to create an Extract, Transform and Load phases or either one of the phases. That is the business requirement part of it. But when we think about the designing of the pipeline, we need to keep an eye on how the data pipeline is reacting to a failure and capable to scale up on huge data volume. Off-course there are some key configurations on the spark clusters that needs to be considered(for e.g : cores, memory of executor nodes). That is not my topic and I will differ it for now although it makes a major contribution to performance. I'm only interested on how the pipeline can be designed efficiently in order to recover from a system failure without any data loss and some of the problems and solutions that we encounter in production like environment.


Integration with spark-streaming with kafka can be done in 2 ways.

1. Receiver based approach.

This is available in the older version of Spark and provides 'at-least once' message semantics. This is handled by introducing WAL(Write Ahead Log) before processing the message and then update into zookeeper. There are some drawbacks to this approach.
  •  WAL logging is redundant as the messages are also staged and replicated in kafka cluster. Also for message recovery perspective, we can retrieve the 'considered to be lost' messages from kafka by pointing the offset value. So we don't really need a WAL logging. 
  • Attaining the processing parallelism would be a tedious and complex task. We need to have multiple receivers and have to be joined together. 
  • Number of Spark RDD partitions and kafka partition is not compatible. This will create unnecessary confusion while estimating and sizing the pipeline.

2. New Direct Stream approach.

This is a receiver-less approach where you have below options.
  • No receivers, no need to join the receivers. Reading messages from kafka is done with simple kafka consumer api with offset and it is similar to reading data from a HDFS location. Master node assigns a work node to read a batch of stream from kafka and process it.
  • Allows developer to achieve the 'at-most once' message semantics and by managing the offset separately and handle the transnational flow in message processing and offset update.
  • Number of spark RDD partition is equal to the number of kafka partitions. Technically you have a simple way to increase the processing parallelism by just increasing the kafka partitions.

Because of the above obvious reasons, I am going to use new Direct stream API, focusing on easy recovery, fault tolerant and parallelism.

Lets talk about the high level architecture of Spark -Yarn cluster mode and how spark read and processes data from HDFS. In a typical spark cluster, there is a master node in which the resource manager runs and there are couple of executor nodes. When you submit a spark program in the cluster mode, spark driver- SparkContext get launched in one of the node in the cluster. Spark driver is the heart of the spark program which drivers the entire tasks that required to be processed. Spark driver talks to the resource manager to get required resource/nodes to run the tasks. Spark driver starts assigning the asks to the  executor nodes and RDDs will get processed in each of the executor nodes. Driver program monitor each tasks, collects the results and the program gets completed.

In the context of Spark-Streaming(direct Dstream), the processing is pretty much same as above. Only difference is that instead of fetching the data from HDFS, Spark reads the streaming data from kafka and processes the data in micro batches. This is handled by the spark driver, StreamingContext( an extension of SparkContext). The driver assign the task to read each batch of rdd to one of the executor node. Executor pulls the data from kafka and starts processing that particular batch of rdd.

Now what happens if some executor node blows up? Spark driver identifies that there is a failure for certain tasks, it immediately assigns the task to another healthy executor node in the cluster with the help of resource manager. However how does the newly assigned executor node knows that what data it needs to work upon and from what point it should start processing. Spark provides a solution for this problem and that is achieved through check-pointing. Spark enables check-pointing the state of the RDD and the sequence of operations that is getting applied to RDD and this helps the new executor node regenerate the RDD and start processing from the point it failed.




That being said, Spark is capable of handling the failures by default with help of checkpoints. However is that enough to have a fault tolerant data pipeline.There are possibilities that the checkpoint files can be corrupted after a system failure. In that case, the streaming pipeline wouldn't be able to read the checkpoint and restore the spark-streaming context aka driver. It continues to fail to read from Kafka. You can remove the entire checkpoint files and restart the spark stream program as a mitigation to recover. In the instance of an upgrade of your application and you change the driver code, you need to clear the old checkpoint files in order to have the new driver to be running. Remember, checkpoint will store the entire state of the driver and it gets generated from the checkpoint files. Either cases, you loose the offset points and Spark will either read from latest offset or from beginning based on the configuration. That is not the way we needed, rather it should recover from the point it failed.The code snippet shows how checkpoint is set and streaming-context is recovered from the checkpoint after the restart.
       

  // This will be invoked if the checkpoint is not found in StreamingContext.getActiveOrCreate() 
   // to create ssc- spark streaming context.
   // please note the processData(dsStream) is the one actually process the streaming data.
   // Typically no code change is expected in this method, modify only processData(dsStream) method in order to change any process logic.
   // If at all, there is a change in this method, you need to clear the spark checkpoint files so that it doesn't use the old implementation.
   def createSC(): StreamingContext = {
    LogHelper.log.debug("Checkpoint not found : Creating a new spark streaming"); 
    val ssc = new StreamingContext(sparkConf, Seconds(4));
    ssc.checkpoint(checkpoint);
    val dsStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet)
    processData(dsStream)
    ssc
    
  }
   

  def main(args: Array[String]) {
    val ssc = StreamingContext.getActiveOrCreate(checkpoint, createSC)
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
         
 
So how do we fix this problem. The solution for this problem is to manually maintain the offset in an external data source after the data is processed by Spark Streaming. In the event of failure or after an upgrade, we simply fetch the  offset points from the external store and points the streaming Context to read from the previous offset. Below code snippet doesn't use checkpoint, but maintain the offset into external store and retrieves while creating streaming-context.
       

 def main(args: Array[String]) {
    
    val ssc = new StreamingContext(sparkConf, Seconds(2));
    // read offset from external data source
    val storedOffsets = readOffset() 
    
    val dsStream = storedOffsets match {
      case None => // start from the latest offsets
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet)
      case Some(fromOffset) => // start from previously saved offsets
        val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder
          , (String, String)](ssc, kafkaParams, fromOffset, messageHandler)
    }
    
    

    // Start the computation
    processData(dsStream)
    ssc.start()
    ssc.awaitTermination()
    
  }


   def processData(dsStream: InputDStream[(String, String)]) {
     dsStream.foreachRDD(rdd => 
      {
        LogHelper.log.debug("Rdd : with partition size : " + rdd.partitions.size + " message count in the batch: " + rdd.count); 
        println("Rdd : with partition" + rdd.partitions.size + " count: " + rdd.count) 

        // process the rdd and persist the offset into zookeeper( if you are not using spark check pointing)
        
        val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        
        val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}:${offsetRange.untilOffset}")
        .mkString(",")
        
        
        LogHelper.log.debug("offsetsRangesStr : " + offsetsRangesStr); 
        System.out.println("offsetsRangesStr : " + offsetsRangesStr )
         rdd.foreachPartition(partition =>  
          {
            partition.foreach(record => 
              {
                // does the actual processing of data
                // implement your processing logic here... 
                // use connection pooling for underlined resources for saving/persisting data into noSql, kafka.
                // resources are maintained in processing nodes and not in driver node. 
                LogHelper.log.debug("rdd :" + record)
              })
          });
        // Persist offset into a external data store/zookeeper
         peristOffset(offsetsRanges)


      });

   }
       
 

With all that solutions, we can not completely get rid of check-pointing. check-pointing is really required for the running pipeline in order to recover from a node failure in the cluster. So we will go with a hybrid approach by implementing check-pointing as well as manual offset management in case checkpoint is corrupted or a pipeline application upgrade.
       

  def createSC(): StreamingContext = {
    LogHelper.log.debug("Checkpoint not found : Reading offset from zookeeper/external data store "); 
    val ssc = new StreamingContext(sparkConf, Seconds(2));
    LogHelper.log.debug("Setting the checkpoint directory "); 
    ssc.checkpoint(checkpoint);
    val storedOffsets = readOffset() 
    
    val dsStream = storedOffsets match {
      case None => // start from the latest offsets
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet)
      case Some(fromOffset) => // start from previously saved offsets
        val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder
          , (String, String)](ssc, kafkaParams, fromOffset, messageHandler)
    }
    processData(dsStream)
    ssc
  }
  
  def main(args: Array[String]) {
        LogHelper.log.debug("Starting the pipeline "); 

      val ssc = StreamingContext.getActiveOrCreate(checkpoint, createSC)
    // Start the computation
    ssc.start()
    ssc.awaitTermination()

  }
       
 

The code is uploaded to my github repo

Another problem with  offset management... When we manually maintain the offset in an external storage, there is another problem that can result due to the processing latency. Suppose a batch of data arrives as B-1, B-2, B-3... in the stream and the node that process B-1, B-2 takes more time, but B-3 processed faster that B-1 and B-2. In this case, since we are maintaining a single version of offset data in the external store, the offset will get overwritten with B-3 partition offset, hence loosing the partition offset data for B-1 and B-2. Normally this will not cause any data loss when the pipeline kept running, because spark will continuously reads and completely processes the data. Off-course the message processing would be out of sequence if your application cares about the sequencing of messages, Otherwise don't worry on that part.

The problem arises when we restart the pipeline when the batches B-1 and B-2 are in process. When the pipeline comes up, it fetches the offset from latest batch( i.e B-3 ) and end up loosing some message from B-1 and B-2. A solution for this is to maintain a snapshot of in process and completed batch records in a noSql db like Hbase instead of maintaining a single version of offset records. When the pipeline restarts, fetch the offset records from B-0, (min of sequence nbr -1 of the In-process records, sequence 1000). Only problem with this is to handle the duplicate messages that was processed before the restart, that is any way is mandatory requirement of a data pipeline.



Processing parallelism

Now lets talk about the processing parallelism. One of the key feature for the new direct API is that the streaming data is already partitioned while you reads the batch of data. Simple, there is no confusion. The number of partition in an RDD batch is equal to the number of partitions in kafka. forEachPartition method takes each partition and process in parallel, also it allows to effectively uses shared resource per partition. Instead of creating separate resources for each rdd, forEachPartition method reuses same resource for all rdd under a particular partition. Processing parallelism is directly tied up with the number of partitions we configure on kafka Topic. Spinning multiple consumer groups for the same Kafka topic is also another way to attain processing parallelism.


To conclude, above are some of the challenges and solution in building real time data pipeline. By looking at these challenges, Spark streaming needs to evolve little more in order to handle these little problems. Kafka streams and Kafka connect is another way of building a light weight streaming pipeline.