Although machine learning techniques were introduced in late 1960- 1990, there was no platform to run it more efficiently. First thing, most of the machine learning algorithms exhibit iterative processing which requires a huge computational effort. Secondly the data required to train the models was less in amount or there were no infrastructure to store the big chuck of data. In fact, this was the one of major factor which motivated in developing a distributed computing framework called Apache Spark.
Hadoop map-reduce was one of such attempt to build the platform for machine learning, But it didn't work out well for iterative processing since it involves huge IO operation on HDFS. However HDFS helps in storing huge data in a distributed way and provides fault tolerance with the help of replication and other means. Apache Spark works on top of HDFS and provides a computational speed of 100 times as fast as map-reduce programs. MLLib module in Spark provides a variety of ready-made ML models which can be used easily with some configurable hyper-parameters. In this blog, I'm going to cover some important models and its basic working principle.
Machine Learning is categorized into two types - Supervised and Unsupervised Learning
In-order to train a model, we need to feed the training data-set and gradually the model tries to learn. When the training set is labeled, the type of learning is called supervised learning and when the training set is not labeled, it is called unsupervised learning. To explain with an e.g., we need to build a model which can predict whether tomorrow will rain or not based on today's climate variations. So our training set will have two parts.
Input features: today's temp - high | today's temp - low | whether type | ...
Output label : rain or not rain
Inputs are observations or features and the output is the target label whether rain or not rain. We will build our training data which contain labeled features from a vast historical data. This type of learning is called supervised learning. Unsupervised learning is when the training set has only features and not having the label. This can be used for detecting patterns in a given data set. One such usage of learning is anomaly detection on network to identify hacking or unsecured access of resources over network.
A typical machine learning pipeline starts with featurization, training and model evaluation. Featurization is the processing of extracting the observation into numerical vectors. For e.g given below, respective features are extracted from spam/non-spam email texts into numeric vectors using some vectorization libraries and then labeled. Then fed this training data set to the predictive model for training.
A typical machine learning pipeline starts with featurization, training and model evaluation. Featurization is the processing of extracting the observation into numerical vectors. For e.g given below, respective features are extracted from spam/non-spam email texts into numeric vectors using some vectorization libraries and then labeled. Then fed this training data set to the predictive model for training.
Okay, now time to jump into some of MLLib models like Regression, Classification, Clustering, Collaborative filtering and its variants.
Regression model
Regression is one of the popular supervised learning technique where the output label is a continuous value. For e.g to predict the house price based on different features like sq feet | no_of_bedrooms | no_of_bathroom | waterfront | year_built | year_renovated | ...
Linear least squares regression is the basic regression model where in a linear function is generated such that the mean squared error/loss is minimized using the gradient descent algorithm. The resultant model or function tries to fit the data points and is capable of generalizing or predicting output from the new or unseen data points.
I have posted a mathematical explanation to this in my last blog here.
In linear regression, we always run into fitting(under-fitting and over-fitting) problem when some features has more weight-age to the output compared to other features. To mitigate these problem, we have linear regression model with Lazzo(L1) and Ridge(L2) regularization. It basically add a regularization term into the cost function which in-turn reduce the impact of some features. Linear least squares doesn't have a regularization term. Please refer explanation for regularization here.
Regression is one of the popular supervised learning technique where the output label is a continuous value. For e.g to predict the house price based on different features like sq feet | no_of_bedrooms | no_of_bathroom | waterfront | year_built | year_renovated | ...
Linear least squares regression is the basic regression model where in a linear function is generated such that the mean squared error/loss is minimized using the gradient descent algorithm. The resultant model or function tries to fit the data points and is capable of generalizing or predicting output from the new or unseen data points.
I have posted a mathematical explanation to this in my last blog here.
In linear regression, we always run into fitting(under-fitting and over-fitting) problem when some features has more weight-age to the output compared to other features. To mitigate these problem, we have linear regression model with Lazzo(L1) and Ridge(L2) regularization. It basically add a regularization term into the cost function which in-turn reduce the impact of some features. Linear least squares doesn't have a regularization term. Please refer explanation for regularization here.
Classification model
Classification is a supervised learning technique which is very similar to regression. In classification, the predicted output is discrete. i,e a finite set of label. For e.g classify a email into spam or non-spam, predict tomorrow would rain or not. There are two types of classifiers - binary and multi-dimensional classifiers. In binary classification, the output label would be only 0 and 1 and in multi-dimensional classifier there would be more than 2 output labels. We will cover most used binary regression model - SVM (Support vector machine and Logistic regression)
Support vectors are just data points plotted on the graph. In SVM model, it create a linear hyper plane which separates positive and negative cases so that it maximize the margin between the nearest support vector point and the hyper plane. Basically it provides geometrically approach to determine the best hyper plane which separates positive and negative cases. Sometimes we can't just separate the data points using a simple linear hyper plane and we may depend on SVM model with kernels which yields non linear hyper plane. The SVM model is very suitable when you want to ignore the outliers(outliers are noise in data set) and for deriving non-linear hyper planes.
Logistic regression on other hand is more based on probabilistic approach. It is very similar to linear regression, but the hypothesis function is converted to a logarithmic term which will squash the output value between 0 and 1. If value is greater than some threshold, it is positive, otherwise negative. It is more suitable for deriving a linear hyper plane and more robust for the outliers.
Below is code snippet in scala for SVM and Logistic Regression. In below e.g, I choose to use optimization algorithm as SGD(Stochastic Gradient Descent) for SVM and BFGS for Logistic regression. BFGS algorithm is good for faster convergence compared to SGD.
Decision tree is one of the simplest machine learning algorithm which is being used for a long time now. As the name implies, it is basically a tree in which the node will take some decision based on some conditions against one or more feature attributes. It is a divide and conquer technique where in the data is evaluated starting from the root node and bottom node. This can be used for both classification and regression. Below is an e.g for decision tree to model the titanic survivors and non-survivors.
MLlib provides an implementation of decision tree(org.apache.spark.mllib.tree.DecisionTree) and it takes 3 hyper-parameters.
Naive Bayes is another classifier technique built based on Bayes probability distribution rule. The rule says probability of one classification over another = prior probability * posterior probability
Prior probability is calculated with out knowing the data and is based purely on existing data distribution. Say there are two labels red and blue. If the number of red labels are more than the no of blue labels in the entire data-set, the prior probability of red is at higher end and the prior probability of blue is at lower end.
Posterior probability is the likelihood probability after seeing the data and it is calculated by drawing a small neighborhood area around the new data point and calculate the likelihood probability based on no of red/blue point in the neighborhood area. Posting a useful link here.
Clustering model
Classification is a supervised learning technique which is very similar to regression. In classification, the predicted output is discrete. i,e a finite set of label. For e.g classify a email into spam or non-spam, predict tomorrow would rain or not. There are two types of classifiers - binary and multi-dimensional classifiers. In binary classification, the output label would be only 0 and 1 and in multi-dimensional classifier there would be more than 2 output labels. We will cover most used binary regression model - SVM (Support vector machine and Logistic regression)
Support vectors are just data points plotted on the graph. In SVM model, it create a linear hyper plane which separates positive and negative cases so that it maximize the margin between the nearest support vector point and the hyper plane. Basically it provides geometrically approach to determine the best hyper plane which separates positive and negative cases. Sometimes we can't just separate the data points using a simple linear hyper plane and we may depend on SVM model with kernels which yields non linear hyper plane. The SVM model is very suitable when you want to ignore the outliers(outliers are noise in data set) and for deriving non-linear hyper planes.
Logistic regression on other hand is more based on probabilistic approach. It is very similar to linear regression, but the hypothesis function is converted to a logarithmic term which will squash the output value between 0 and 1. If value is greater than some threshold, it is positive, otherwise negative. It is more suitable for deriving a linear hyper plane and more robust for the outliers.
Below is code snippet in scala for SVM and Logistic Regression. In below e.g, I choose to use optimization algorithm as SGD(Stochastic Gradient Descent) for SVM and BFGS for Logistic regression. BFGS algorithm is good for faster convergence compared to SGD.
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.regression
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}
import org.apache.spark.mllib.regression.LabeledPoint
//Loading data from file
val SMSSpamCollection = sc.textFile("file:///home/vm4learning/SMSSpamCollection.txt");
//Transforming to key value pair
val kvdata = SMSSpamCollection.map(line => (line.split("\t")(0) , line.split("\t")(1)))
//Filtering normal and spam text contents
val normal = kvdata.filter(line => line._1 == "ham").map(line => line._2)
val spam = kvdata.filter(line => line._1 == "spam").map(line => line._2)
//converting the text to Feature vectors
val tf = new HashingTF(numFeatures = 10000);
val normalFeatures = normal.map(msg => tf.transform(msg.split(" ")))
val spamFeatures = spam.map(msg => tf.transform(msg.split(" ")))
//Building the LabelPoint Vectors
val labeledPositiveFeatures = spamFeatures.map(features => LabeledPoint(1, features))
val labeledNegativeFeatures = normalFeatures.map(features => LabeledPoint(0, features))
// Cache the training Data Set
val trainingData = labeledPositiveFeatures.union(labeledNegativeFeatures).cache()
// Build an SVM model with training Data Set
val numIterations = 100
val SVMModel = SVMWithSGD.train(trainingData, numIterations)
// Build a LR model with training Data Set
val LRmodel = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainingData)
// Positive and Negative Test Text data
val postTest = tf.transform("Congratulations, you've been awarded a $25 bonus ".split(" "))
val negTest = tf.transform("I would be on leave next friday. Please plan accordingly.".split(" "))
// Run the positive and negative test Prediction with SVM model
val posPredictionWithSVMModel = SVMModel.predict(postTest);
val negPredictionWithSVMModel = SVMModel.predict(negTest);
// Run the postive and negative test Prediction with LR model
val posPredictionWithLRModel = LRmodel.predict(postTest);
val negPredictionWithLRModel = LRmodel.predict(negTest);
Results:
scala> val posPredictionWithSVMModel = SVMModel.predict(postTestFeatures);
posPredictionWithSVMModel: Double = 1.0
scala> val negPredictionWithSVMModel = SVMModel.predict(negTestFeatures);
negPredictionWithSVMModel: Double = 0.0
scala> val posPredictionWithLRModel = LRmodel.predict(postTestFeatures);
posPredictionWithLRModel: Double = 1.0
scala> val negPredictionWithLRModel = LRmodel.predict(negTestFeatures);
negPredictionWithLRModel: Double = 0.0
For multi-class classification, we have Decision tree, Random forest and Naive Bayes.Decision tree is one of the simplest machine learning algorithm which is being used for a long time now. As the name implies, it is basically a tree in which the node will take some decision based on some conditions against one or more feature attributes. It is a divide and conquer technique where in the data is evaluated starting from the root node and bottom node. This can be used for both classification and regression. Below is an e.g for decision tree to model the titanic survivors and non-survivors.
MLlib provides an implementation of decision tree(org.apache.spark.mllib.tree.DecisionTree) and it takes 3 hyper-parameters.
- max level -> max depth of decision tree.
- max bins -> max no of bins/buckets in which each feature is split for derive the conditions.if the feature is categorical, max bin should be no of categories.
- impurity -> "gini" for Classifiers and "variance" for Regression.
- categoricalFeaturesInfo -> Specifies which features are categorical and how many categorical values each of those features can take.
// open the census data file
val census = sc.textFile("file:///home/vm4learning/census_data.txt")
// define the categorical features Map
val categoricalFeaturesInfo = Map(1 -> orgTypeSize, 3 -> gradeTypeSize , 5 -> marStatusSize , 6 -> jobTypeSize , 7 -> familyStatusSize , 8 -> raceTypeSize , 9 -> genderTypeSize , 13 -> countrySize, 14 -> salaryRangeSize)
val numClasses = 2
val impurity = "gini"
val maxDepth = 5
val maxBins = 100
val labeledTrainingSet = census.map{line =>
val fields = line.split(", ")
LabeledPoint(getLablelValue(fields(14).toString) , Vectors.dense(fields(0).toDouble, orgTypeMap(fields(1).toString) , fields(2).toDouble , gradeTypeMap(fields(3).toString) , fields(4).toDouble , marStatusMap(fields(5).toString), jobTypeMap(fields(6).toString), familyStatusMap(fields(7).toString),raceTypeMap(fields(8).toString),genderTypeMap (fields(9).toString), fields(10).toDouble , fields(11).toDouble , fields(12).toDouble,countryMap(fields(13).toString) , salaryRangeMap(fields(14).toString)))
}
val labeledTrainingSetRDD = featureVector.toJavaRDD();
// building the decision tree classifer
val model = DecisionTree.trainClassifier(labeledTrainingSetRDD, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)
// test data
val testData = "39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K"
// Converting to feature vector
val fields = testData.split(", ")
val testVector = Vectors.dense(fields(0).toDouble, orgTypeMap(fields(1).toString) , fields(2).toDouble , gradeTypeMap(fields(3).toString) , fields(4).toDouble , marStatusMap(fields(5).toString), jobTypeMap(fields(6).toString), familyStatusMap(fields(7).toString),raceTypeMap(fields(8).toString),genderTypeMap (fields(9).toString), fields(10).toDouble , fields(11).toDouble , fields(12).toDouble,countryMap(fields(13).toString) , salaryRangeMap(fields(14).toString))
// predict
model.predict(testVector);
Random Forest is another variant of decision tree, also called decision Forest. The idea here is that multiple decision trees would be created for given data set and the output value would be calculated by kind of average out the individual output value from each decision trees. An e.g for the same census data is available in the github.Naive Bayes is another classifier technique built based on Bayes probability distribution rule. The rule says probability of one classification over another = prior probability * posterior probability
Prior probability is calculated with out knowing the data and is based purely on existing data distribution. Say there are two labels red and blue. If the number of red labels are more than the no of blue labels in the entire data-set, the prior probability of red is at higher end and the prior probability of blue is at lower end.
Posterior probability is the likelihood probability after seeing the data and it is calculated by drawing a small neighborhood area around the new data point and calculate the likelihood probability based on no of red/blue point in the neighborhood area. Posting a useful link here.
Clustering model
Clustering is a unsupervised learning technique. Idea here is simple. You have an unlabeled training set and you need some mechanism or algorithm so that it should cluster these data points into 1 or more buckets. Basically it tries to determine the unknown pattern your data points exhibits.
Out of this, k-means ways of clustering is quite famous. k is just number of clusters in which the algorithm has to built out from your data. Here, a cluster would be a centroid in the hyper plane and
it is surrounded with the respective data-points.
Step 1: Initialize: Randomly initialize the cluster centroid and tag each data points to some cluster.
Step 2: Adjust centroid: Calculate the mean distance from the centroid to each its data points. Adjust the centroid so that mean should be minimum. This is repetitive steps and cluster would be moved to the actual centroid points in the cluster.
Step 3: Apply on new data: Apply a new data set to the model and the model will try to bucket the new data point to one of the suitable bucket/cluster as it can.
Step 4: Evaluate the result: In order to evaluate the cluster we got for the new data point, we measure the distance from resultant cluster centroid to the new data point. If the new data point is very near to the centroid, it is normal or usual pattern. If the distance is more than some threshold limit, it means your new data point is possibly an unusual pattern.
One of the use case of this technique to finding anomalies. For e.g detecting intruders in the networks, fraudulent credit transactions. Posting some interesting article around this
Collaborative filtering model
This is widely used recommender technique used by various shopping sites which helps the customers to get recommendations for the newer or similar products based on his/her previous shopping behavior, ratings after collaborating the ratings of other similar users. Basically the algorithm tries to fill the unknown rating in the user-item matrix below.
Step 1: Finding the similar users based on their rating on the common product. It does it by calculating the similarity value of two users. For e.g If the similarity value( i.e sim(Ted, Carol) ) is positive, Ted, Carol are of same kind or having same product taste. If the value is negative, then the two users are not similar.
Step 2: Predicting the rating. Now it calculates the missing rating of a particular user and a product by averaging all the ratings of similar user who rated that particular product.
Below is code sample code snippet for a movie recommender. Please find the complete code and the dataset in my github.
The ALS model accepts the RDD of Rating MLLIB datatype (RDD[Rating]) as the training data set and certain hyper-parameters like ranking, numIterations and lambda. Since we don't know the most suitable hyper-parameters, we will make the model to train with a range of hyper-parameters and select the best fit model.
Just covered the high level principle and the usage of the basic models of Apache Spark MLLIB 1.6. You can refer the Spark MLlib 1.6 documentation for more details and other variants. In the newer version of Spark (2.0.2), a quite interesting classifier is introduced called Multi Layer Perception model which basically works based on neural network concepts.
I'm posting a YouTube channel - video link which helps in understanding the math behind of the above models.
Step 2: Predicting the rating. Now it calculates the missing rating of a particular user and a product by averaging all the ratings of similar user who rated that particular product.
Below is code sample code snippet for a movie recommender. Please find the complete code and the dataset in my github.
The ALS model accepts the RDD of Rating MLLIB datatype (RDD[Rating]) as the training data set and certain hyper-parameters like ranking, numIterations and lambda. Since we don't know the most suitable hyper-parameters, we will make the model to train with a range of hyper-parameters and select the best fit model.
// load ratings and movie titles
val movieLensHomeDir = "hdfs://quickstart.cloudera:8020/user/cloudera/movieRatingDataset" ;
// load ratings
val ratings = sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/movieRatingDataset/ratings.dat").map { line =>
val fields = line.split("::")
// format: (timestamp % 10, Rating(userId, movieId, rating))
(fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
}
// load movies
val movies = sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/movieRatingDataset/movies.dat").map { line =>
val fields = line.split("::")
// format: (movieId, movieName)
(fields(0).toInt, fields(1))
}.collect().toMap
// Spilt the data into training, validation and test set
val training = ratings.filter(x => x._1 < 6)
.values
.union(myRatingsRDD)
.repartition(numPartitions)
.cache()
val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)
.values
.repartition(numPartitions)
.cache()
val test = ratings.filter(x => x._1 >= 8).values.cache()
// build ALS model
val ranks = List(12, 20)
val lambdas = List(0.1, 10.0)
val numIters = List(8, 10)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
val model = ALS.train(training, rank, numIter, lambda)
val validationRmse = computeRmse(model, validation, numValidation)
println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "
+ rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
if (validationRmse < bestValidationRmse) {
bestModel = Some(model)
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lambda
bestNumIter = numIter
}
}
// Test recommendation
val myRatings = loadRatings("/home/cloudera/Downloads/ml-1m/myRating.dat")
val sortedMyrating = myRatings.sortBy(- _.rating ).take(5);
val myRatedMovieIds = sortedMyrating.map(_.product).toSet
val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
val recommendations = bestModel.get
.predict(candidates.map((1, _)))
.collect()
.sortBy(-_.rating)
.take(10)
var i = 1
println("Movies recommended for the user :")
recommendations.foreach { r =>
println("%2d".format(i) + ": " + movies(r.product) + " : " + r.rating)
i += 1
}
Just covered the high level principle and the usage of the basic models of Apache Spark MLLIB 1.6. You can refer the Spark MLlib 1.6 documentation for more details and other variants. In the newer version of Spark (2.0.2), a quite interesting classifier is introduced called Multi Layer Perception model which basically works based on neural network concepts.
I'm posting a YouTube channel - video link which helps in understanding the math behind of the above models.