Tuesday 19 April 2016

Pyspark Notes

Pyspark exercises
=============

pyspark shell is preloaded the spark context and it can be accessed via the variable name : sc
sc is like a connector to the spark driver which is pyspark shell / spark-shell

However if you are running the pyspark/scala/java standalone programs, you need to export or create the spark context before starting the data processing.

Two types of data processing are available : Transformations and Actions
Transformations:
============
flatMap
map
reduceByKey
reduce
groupByKey
combinerByKey
aggregateByKey

Actions:=====

count
take
takeSorted
countByKey
saveAsTextFile
saveAsSequenceFile




Program 1
#########################################################
Load the file from hdfs and save into hdfs as sequence as well as text files:

As text file:

orderRDD = sc.textFile("/user/cloudera/sqoop_import-all/orders")
orderRDD.saveAsTextFile("/user/cloudera/sqoop_import-all/orders_temp")

As sequence file :

orderRDD = sc.textFile("/user/cloudera/sqoop_import-all/orders")
orderSeqRDD = orderRDD.map(lambda x : (x.split(",")[0] , x ) )       // generate a key value tupe with key as order id and entire record as the value
orderSeqRDD.saveAsSequenceFile("/user/cloudera/sqoop_import-all/orders_temp_seq")


read the sequence file:
orderRDD = sc.sequenceFile("/user/cloudera/sqoop_import-all/orders_temp_seq")



Program 2: 
#########################################################
Simple word count example using pyspark


wordText = sc.textFile("/user/cloudera/wordtext.txt")
wordFlatMap = wordText.flatMap(lambda x: x.split(" "))
wordTuple = wordFlatMap.map(lambda x: (x, 1))
wordTuplecount = wordTuple.reduceByKey(lambda x, y : x + y )
wordTuplecount.saveAsTextFile("/user/cloudera/wordCountoutput")


Program 3:
################################################################################################
# Join disparate datasets together using Spark
# Problem statement, get the total revenue and total number of orders from order_items on daily basis
Step1 : ########
orderRDD = sc.textFile("/user/cloudera/sqoop_import-all/orders")

order
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE

Step2 :########
orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import-all/order_items")

order items
1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99

Step3:########
join

orderMapRDD = orderRDD.map(lambda x: (x.split(",")[0], x ))
orderItemsMapRDD = orderItemsRDD.map(lambda x: (x.split(",")[1], x ))
orderItemsJoinRDD = orderMapRDD.join(orderItemsMapRDD)


(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'87999,35236,365,1,59.99,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88000,35236,191,1,99.99,99.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88001,35236,365,1,59.99,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88002,35236,365,4,239.96,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88003,35236,957,1,299.98,299.98'))
(u'35540', (u'35540,2014-02-28 00:00:00.0,5028,CLOSED', u'88767,35540,1004,1,399.98,399.98'))
########
orderRevenueMap = orderItemsJoinRDD.map(lambda rec : (rec[1][0].split(",")[1] , float(rec[1][1].split(",")[4])))

### by reduceByKey Operation
orderTotalRevenue = orderRevenueMap.reduceByKey(lambda acc, val : acc + val)

### by groupByKey Operation --> very expensive if the dataset is very huge.
orderTotalRevenue = orderRevenueMap.groupByKey().map(lambda rec : (rec[0], sum(rec[1])))

(order dt , sub total )   reduceByKey  -->Generates--> ( order dt , total revenue)

########
pyspark shell
orderTotalMap = orderItemsJoinRDD.map(lambda rec : (rec[1][0].split(",")[1] , 1))
orderTotal = orderTotalMap.reduceByKey(lambda acc , value : acc + value)

 (order dt , 1 )  reducebyKey -->Generates--> (order dt, total order )

########
Join both datasets ie total revenue, total order : (dt , (tptal revenue, total orders))


resultOPMap = orderTotal.join(orderTotalRevenue)



################################################################################
# Problem statement, get the average revenue per order on daily basis( agreggateByKey)

Run till Step3:

(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'87999,35236,365,1,59.99,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88000,35236,191,1,99.99,99.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88001,35236,365,1,59.99,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88002,35236,365,4,239.96,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88003,35236,957,1,299.98,299.98'))
(u'35540', (u'35540,2014-02-28 00:00:00.0,5028,CLOSED', u'88767,35540,1004,1,399.98,399.98'))
Step4#################

orderdtOrderIdMap = orderItemsJoinRDD.map(lambda rec : ((rec[1][0].split(",")[1], rec[1][0].split(",")[0]), float(rec[1][1].split(",")[4])))

((order dt, order id), sub total )

reduceByKey -->  ((order dt, order id), total revenue per order )


totalRevenuePerOrder = orderdtOrderIdMap.reduceByKey(lambda acc, val : acc +val )

Step5:#############(using without aggregate funtion )


totalOrderMap = totalRevenuePerOrder.map(lambda rec : (rec[0][0], 1))     // apply map and emit (order dt, 1)
totalOrderPerDay = totalOrderMap.reduceByKey(lambda acc , val : acc +val)  // apply reduce and generate (order dt, total no of orders per day )

totalRevenueMap = totalRevenuePerOrder.map(lambda rec : (rec[0][0], rec[1]))   // apply map and emit (order dt, sub total) 
totalRevenuePerDay = totalRevenueMap.reduceByKey(lambda acc , val : acc +val)  // apply reduce and generate (order dt, total revenue per day )


join both dataset and generate (key : order dt, value : (total revenue per day, total orders per day))

avgMapRDD = totalOrderPerDay.join(totalRevenuePerDay)

(u'2013-08-23 00:00:00.0', (169, 99616.169999999998))
(u'2014-04-06 00:00:00.0', (85, 56192.120000000017))
(u'2013-11-29 00:00:00.0', (220, 136296.63000000003))

Step6###############

avgPerDay = avgMapRDD.map(lambda rec : (rec[0] , rec[1][1]/rec[1][0]))

####################
Using AggregateByKey
Run till step4 :


Step5######  ( using a AggregateByKey)

totalRevenuePerOrderMap = totalRevenuePerOrder.map(lambda rec :(rec[0][0] , rec[1]))

(u'2014-05-12 00:00:00.0', 1054.8299999999999)
(u'2013-11-19 00:00:00.0', 499.93000000000006)

#Apply a aggregate fn and generate ( key : order dt , value : (total no of order per day, total revenue per day)  
Note : input is of type ( key : order dt , value : sub total ) and
       output is of type ( key : order dt , value : (total no of order per day, total revenue per day)     


avgPerDayMap = totalRevenuePerOrderMap.aggregateByKey((0,0), \
lambda acc , value : (acc[0] + 1 , acc[1] + value) , \
lambda acc1, acc2 : (round(acc1[0] + acc2[0], 2) , acc1[1] + acc2[1]))

Step6###############

avgPerDay = avgPerDayMap.map(lambda rec : (rec[0] , "{0:.2f}".format(round(rec[1][1]/rec[1][0]))))

note: "{0:.2f}".format() --> formats into 2 decimal points.

sorting and ranking:
#####################


products = sc.textFile("/user/cloudera/sqoop_import-all/products")


productsCatMap = products.map(lambda rec : (rec.split(",")[1], rec)).groupByKey()

productsCatMapList = productsCatMap.map(lambda rec : (rec[0], list(rec[1])))

productsCatMapSortedList = productsCatMap.map(lambda rec : (rec[0], sorted(list(rec[1]), key=lambda k : k.split(",")[4], reverse=True)))


def getTopN(rec, topN):
  x = [ ]
  x = list(sorted(rec[1], key=lambda k: float(k.split(",")[4]), reverse=True))
  y = [elm.split(",")[4] for elm in x]
  import itertools
  return list(itertools.islice(y, 0, topN))

productsCatMapSortedList = productsCatMap.flatMap(lambda rec: (rec[0], rec[0], getTopN(rec, 5)))
productsCatMapSortedList = productsCatMap.map(lambda rec: getTopN(rec, 5))



######Dense Ranking#####

def getTopDenseN(rec, topN):
  x = [ ]
  topPriceList = [ ]
  topPricedProductList = [ ]
  for i in rec[1]:
      x.append(float(i.split(",")[4]))
  import itertools
  topPriceList = list(itertools.islice(sorted(set(x), reverse=True), 0 ,topN))
  for elm in rec[1]:
     if (float(elm.split(",")[4]) in topPriceList):
        topPricedProductList.append(elm)
  return topPricedProductList


productsCatMapSortedList = productsCatMap.map(lambda rec: (rec[0] , getTopDenseN(rec, 5)))


filter Apis
===========

1. find foodball products which the price is less than 100$

footballProductsRDD = productRDD.filter(lambda rec : ( "football" in rec.split(",")[2].lower() and float(rec.split(",")[4]) <= 100 ))


2. football and mens products

footballOrMenProductsRDD = productRDD.filter(lambda rec : ( ("football" in rec.split(",")[2].lower() or " men" in rec.split(",")[2].lower()) and float(rec.split(",")[4]) <= 100 ))





No comments:

Post a Comment