Tuesday 6 December 2016

Spark MLLib


Although machine learning techniques were introduced in late 1960- 1990, there was no platform to run it more efficiently. First thing, most of the machine learning algorithms exhibit iterative processing which requires a huge computational effort. Secondly the data required to train the models was less in amount or there were no infrastructure to store the big chuck of data. In fact, this was the one of major factor which motivated in developing a distributed computing framework called Apache Spark. 

Hadoop map-reduce was one of such attempt to build the platform for machine learning, But it didn't work out well for iterative processing since it involves huge IO operation on HDFS. However HDFS helps in storing huge data in a distributed way and provides fault tolerance with the help of replication and other means. Apache Spark works on top of HDFS and provides a computational speed of 100 times as fast as map-reduce programs. MLLib module in Spark provides a variety of ready-made ML models which can be used easily with some configurable hyper-parameters. In this blog, I'm going to cover some important models and its basic working principle. 

Machine Learning is categorized into two types -  Supervised and Unsupervised Learning

In-order to train a model, we need to feed the training data-set and gradually the model tries to learn. When the training set is labeled, the type of learning is called supervised learning and when the training set is not labeled, it is called unsupervised learning. To explain with an e.g., we need to build a model which can predict whether tomorrow will rain or not based on today's climate variations. So our training set will have two parts.

Input features: today's temp - high | today's temp - low |  whether type | ... 
Output label :  rain or not rain

Inputs are observations or features and the output is the target label whether rain or not rain. We will build our training data which contain labeled features from a vast historical data. This type of learning is called supervised learning. Unsupervised learning is when the training set has only features and not having the label. This can be used for detecting patterns in a given data set. One such usage of learning is anomaly detection on network to identify hacking or unsecured access of resources over network.

A typical machine learning pipeline starts with featurization, training and model evaluation. Featurization is the processing of extracting the observation into numerical vectors. For e.g given below, respective features are extracted from spam/non-spam email texts into numeric vectors using some vectorization libraries and then labeled. Then fed this training data set to the predictive model for training.




Okay, now time to jump into some of MLLib models like Regression, Classification, Clustering, Collaborative filtering and its variants.

Regression model

Regression is one of the popular supervised learning technique where the output label is a continuous value. For e.g to predict the house price based on different features like sq feet | no_of_bedrooms | no_of_bathroom | waterfront | year_built | year_renovated | ...

Linear least squares regression is the basic regression model where in a linear function is generated such that the mean squared error/loss is minimized using the gradient descent algorithm. The resultant model or function tries to fit the data points and is capable of generalizing or predicting output from the new or unseen data points.




I have posted a mathematical explanation to this in my last blog here.

In linear regression, we always run into fitting(under-fitting and over-fitting) problem when some features has more weight-age to the output compared to other features. To mitigate these problem, we have linear regression model with Lazzo(L1) and Ridge(L2) regularization. It basically add a regularization term into the cost function which in-turn reduce the impact of some features.  Linear least squares doesn't have a regularization term. Please refer explanation for regularization here.

Classification model

Classification is a supervised learning technique which is very similar to regression. In classification, the predicted output is discrete. i,e a finite set of label. For e.g classify a email into spam or non-spam, predict tomorrow would rain or not. There are two types of classifiers - binary and multi-dimensional classifiers. In binary classification, the output label would be only 0 and 1 and in multi-dimensional classifier there would be more than 2 output labels. We will cover most used binary regression model - SVM (Support vector machine and Logistic regression) 

Support vectors are just data points plotted on the graph. In SVM model, it create a linear hyper plane which separates positive and negative cases so that it maximize the margin between the nearest support vector point and the hyper plane. Basically it provides geometrically approach to determine the best hyper plane which separates positive and negative cases. Sometimes we can't just separate the data points using a simple linear hyper plane and we may depend on SVM model with kernels which yields non linear hyper plane. The SVM model is very suitable when you want to ignore the outliers(outliers are noise in data set) and for deriving non-linear hyper planes.

Logistic regression on other hand is more based on probabilistic approach. It is very similar to linear regression, but the hypothesis function is converted to a logarithmic term which will squash the output value between 0 and 1. If value is greater than some threshold, it is positive, otherwise negative. It is more suitable for deriving a linear hyper plane and more robust for the outliers.

Below is code snippet in scala for SVM and Logistic Regression. In below e.g, I choose to use optimization algorithm as SGD(Stochastic Gradient Descent) for SVM and BFGS for Logistic regression. BFGS algorithm is good for faster convergence compared to SGD.

   
 import org.apache.spark.mllib.feature.HashingTF  
 import org.apache.spark.mllib.regression  
 import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}  
 import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}  
 import org.apache.spark.mllib.regression.LabeledPoint  
   
 //Loading data from file
 val SMSSpamCollection = sc.textFile("file:///home/vm4learning/SMSSpamCollection.txt");  
   
 //Transforming to key value pair  
 val kvdata = SMSSpamCollection.map(line => (line.split("\t")(0) , line.split("\t")(1)))  
   
 //Filtering normal and spam text contents  
 val normal = kvdata.filter(line => line._1 == "ham").map(line => line._2)  
 val spam = kvdata.filter(line => line._1 == "spam").map(line => line._2)  
   
 //converting the text to Feature vectors  
 val tf = new HashingTF(numFeatures = 10000);  
 val normalFeatures = normal.map(msg => tf.transform(msg.split(" ")))  
 val spamFeatures = spam.map(msg => tf.transform(msg.split(" ")))  
   
 //Building the LabelPoint Vectors  
 val labeledPositiveFeatures = spamFeatures.map(features => LabeledPoint(1, features))  
 val labeledNegativeFeatures = normalFeatures.map(features => LabeledPoint(0, features))  
   
 // Cache the training Data Set  
 val trainingData = labeledPositiveFeatures.union(labeledNegativeFeatures).cache()  
   
 // Build an SVM model with training Data Set  
 val numIterations = 100  
 val SVMModel = SVMWithSGD.train(trainingData, numIterations)  
   
 // Build a LR model with training Data Set  
 val LRmodel = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainingData)  
   
 // Positive and Negative Test Text data  
 val postTest = tf.transform("Congratulations, you've been awarded a $25 bonus ".split(" "))  
 val negTest = tf.transform("I would be on leave next friday. Please plan accordingly.".split(" "))  
   
 // Run the positive and negative test Prediction with SVM model   
 val posPredictionWithSVMModel = SVMModel.predict(postTest);  
 val negPredictionWithSVMModel = SVMModel.predict(negTest);  
   
   
 // Run the postive and negative test Prediction with LR model   
 val posPredictionWithLRModel = LRmodel.predict(postTest);  
 val negPredictionWithLRModel = LRmodel.predict(negTest);  
   
   
 Results:   
   
 scala> val posPredictionWithSVMModel = SVMModel.predict(postTestFeatures);  
 posPredictionWithSVMModel: Double = 1.0  
   
 scala> val negPredictionWithSVMModel = SVMModel.predict(negTestFeatures);  
 negPredictionWithSVMModel: Double = 0.0  
   
   
 scala> val posPredictionWithLRModel = LRmodel.predict(postTestFeatures);  
 posPredictionWithLRModel: Double = 1.0  
   
 scala> val negPredictionWithLRModel = LRmodel.predict(negTestFeatures);  
 negPredictionWithLRModel: Double = 0.0  
   
   
For multi-class classification, we have Decision tree, Random forest and Naive Bayes.

Decision tree is one of the simplest machine learning algorithm which is being used for a long time now. As the name implies, it is basically a tree in which the node will take some decision based on some conditions against one or more feature attributes. It is a divide and conquer technique where in the data is evaluated starting from the root node and bottom node. This can be used for both classification and regression. Below is an e.g for decision tree to model the titanic survivors and non-survivors.




















MLlib provides an implementation of decision tree(org.apache.spark.mllib.tree.DecisionTree) and it takes 3 hyper-parameters.
  • max level -> max depth of decision tree.
  • max bins  -> max no of bins/buckets in which each feature is split for derive the conditions.if the feature is categorical, max bin should be no of categories.
  • impurity -> "gini" for Classifiers and "variance" for Regression.
  • categoricalFeaturesInfo -> Specifies which features are categorical and how many categorical values each of those features can take. 
Below is code snippet for building a decision tree classifier for census data. The complete code and data is available in the github.

 // open the census data file   
 val census = sc.textFile("file:///home/vm4learning/census_data.txt")  
   
 // define the categorical features Map  
 val categoricalFeaturesInfo = Map(1 -> orgTypeSize, 3 -> gradeTypeSize , 5 -> marStatusSize , 6 -> jobTypeSize , 7 -> familyStatusSize , 8 -> raceTypeSize , 9 -> genderTypeSize , 13 -> countrySize, 14 -> salaryRangeSize)  
   
 val numClasses = 2  
 val impurity = "gini"  
 val maxDepth = 5  
 val maxBins = 100  
   
 val labeledTrainingSet = census.map{line =>  
 val fields = line.split(", ")  
 LabeledPoint(getLablelValue(fields(14).toString) , Vectors.dense(fields(0).toDouble, orgTypeMap(fields(1).toString) , fields(2).toDouble , gradeTypeMap(fields(3).toString) , fields(4).toDouble , marStatusMap(fields(5).toString), jobTypeMap(fields(6).toString), familyStatusMap(fields(7).toString),raceTypeMap(fields(8).toString),genderTypeMap (fields(9).toString), fields(10).toDouble , fields(11).toDouble , fields(12).toDouble,countryMap(fields(13).toString) , salaryRangeMap(fields(14).toString)))  
 }  
 val labeledTrainingSetRDD = featureVector.toJavaRDD();  
 // building the decision tree classifer   
 val model = DecisionTree.trainClassifier(labeledTrainingSetRDD, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)  
   
 // test data   
 val testData = "39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K"  
   
 // Converting to feature vector   
 val fields = testData.split(", ")  
  val testVector = Vectors.dense(fields(0).toDouble, orgTypeMap(fields(1).toString) , fields(2).toDouble , gradeTypeMap(fields(3).toString) , fields(4).toDouble , marStatusMap(fields(5).toString), jobTypeMap(fields(6).toString), familyStatusMap(fields(7).toString),raceTypeMap(fields(8).toString),genderTypeMap (fields(9).toString), fields(10).toDouble , fields(11).toDouble , fields(12).toDouble,countryMap(fields(13).toString) , salaryRangeMap(fields(14).toString))  
   
 // predict   
 model.predict(testVector);  
   
Random Forest is another variant of decision tree, also called decision Forest. The idea here is that multiple decision trees would be created for given data set and the output value would be calculated by kind of average out the individual output value from each decision trees. An e.g for the same census data is available in the github.

Naive Bayes is another classifier technique built based on Bayes probability distribution rule. The rule says probability of one classification over another = prior probability * posterior probability

Prior probability is calculated with out knowing the data and is based purely on existing data distribution. Say there are two labels red and blue. If the number of red labels are more than the no of blue labels in the entire data-set, the prior probability of red is at higher end and  the prior probability of blue is at lower end.

Posterior probability is the likelihood probability after seeing the data and it is calculated by drawing a small neighborhood area around the new data point and calculate the likelihood probability based on no of red/blue point in the neighborhood area. Posting a useful link here.

Clustering model

Clustering is a unsupervised learning technique. Idea here is simple. You have an unlabeled training set and you need some mechanism or algorithm so that it should cluster these data points into 1 or more buckets. Basically it tries to determine the unknown pattern your data points exhibits.

Out of this, k-means ways of clustering is quite famous. k is just number of clusters in which the algorithm has to built out from your data. Here, a cluster would be a centroid in the hyper plane and
it is surrounded with the respective data-points.



Step 1: Initialize:  Randomly initialize the cluster centroid and tag each data points to some cluster.
Step 2: Adjust centroid: Calculate the mean distance from the centroid to each its data points. Adjust the centroid so that mean should be minimum. This is repetitive steps and cluster would be moved to the actual centroid points in the cluster.
Step 3: Apply on new data: Apply a new data set to the model and the model will try to bucket the new data point to one of the suitable bucket/cluster as it can.
Step 4: Evaluate the result: In order to evaluate the cluster we got for the new data point, we measure the distance from resultant cluster centroid to the new data point. If the new data point is very near to the centroid, it is normal or usual pattern. If the distance is more than some threshold limit, it means your new data point is possibly an unusual pattern.

One of the use case of this technique to finding anomalies. For e.g detecting intruders in the networks, fraudulent credit transactions. Posting some interesting article around this

Collaborative filtering model

This is widely used recommender technique used by various shopping sites which helps the customers to get recommendations for the newer or similar products based on his/her previous shopping behavior, ratings after collaborating the ratings of other similar users. Basically the algorithm tries to fill the unknown rating in the user-item matrix below.



Step 1: Finding the similar users based on their rating on the common product. It does it by calculating the similarity value of two users. For e.g If the similarity value( i.e sim(Ted, Carol) ) is positive, Ted, Carol are of same kind or having same product taste. If the value is negative, then the two users are not similar.

Step 2: Predicting the rating. Now it calculates the missing rating of a particular user and a product by averaging all the ratings of similar user who rated that particular product.

Below is code sample code snippet for a movie recommender. Please find the complete code and the dataset in my github.

The ALS model accepts the RDD of Rating MLLIB datatype (RDD[Rating]) as the training data set and certain hyper-parameters like ranking, numIterations and lambda. Since we don't know the most suitable hyper-parameters, we will make the model to train with a range of hyper-parameters and select the best fit model.

   // load ratings and movie titles  
   val movieLensHomeDir = "hdfs://quickstart.cloudera:8020/user/cloudera/movieRatingDataset" ;  
        
      // load ratings  
   val ratings = sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/movieRatingDataset/ratings.dat").map { line =>  
    val fields = line.split("::")  
    // format: (timestamp % 10, Rating(userId, movieId, rating))  
    (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))  
   }  
        
      // load movies   
   val movies = sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/movieRatingDataset/movies.dat").map { line =>  
    val fields = line.split("::")  
    // format: (movieId, movieName)  
    (fields(0).toInt, fields(1))  
   }.collect().toMap  
        
      // Spilt the data into training, validation and test set   
   val training = ratings.filter(x => x._1 < 6)  
    .values  
    .union(myRatingsRDD)  
    .repartition(numPartitions)  
    .cache()  
   val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)  
    .values  
    .repartition(numPartitions)  
    .cache()  
   val test = ratings.filter(x => x._1 >= 8).values.cache()  
     
   // build ALS model   
      val ranks = List(12, 20)  
   val lambdas = List(0.1, 10.0)  
   val numIters = List(8, 10)  
      var bestModel: Option[MatrixFactorizationModel] = None  
   var bestValidationRmse = Double.MaxValue  
   var bestRank = 0  
   var bestLambda = -1.0  
   var bestNumIter = -1  
   for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {  
    val model = ALS.train(training, rank, numIter, lambda)  
    val validationRmse = computeRmse(model, validation, numValidation)  
    println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "   
     + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")  
    if (validationRmse < bestValidationRmse) {  
     bestModel = Some(model)  
     bestValidationRmse = validationRmse  
     bestRank = rank  
     bestLambda = lambda  
     bestNumIter = numIter  
    }  
   }  
      // Test recommendation   
      val myRatings = loadRatings("/home/cloudera/Downloads/ml-1m/myRating.dat")  
      val sortedMyrating = myRatings.sortBy(- _.rating ).take(5);  
   val myRatedMovieIds = sortedMyrating.map(_.product).toSet  
   val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)  
   val recommendations = bestModel.get  
    .predict(candidates.map((1, _)))  
    .collect()  
    .sortBy(-_.rating)  
    .take(10)  
      
   var i = 1  
   println("Movies recommended for the user :")  
     
   recommendations.foreach { r =>  
    println("%2d".format(i) + ": " + movies(r.product) + " : " + r.rating)  
    i += 1  
   }  
     
 

Just covered the high level principle and the usage of the basic models of Apache Spark MLLIB 1.6. You can refer the Spark MLlib 1.6 documentation for more details and other variants. In the newer version of Spark (2.0.2), a quite interesting classifier is introduced called Multi Layer Perception model which basically works based on neural network concepts.

I'm posting a YouTube channel - video link which helps in understanding the math behind of the above models.