Tuesday 10 May 2016

Spark-shell (Scala)sample programs


/user/cloudera/sqoop_import_all/order_items
/user/cloudera/sqoop_import_all/orders/



Read and write as sequence file
========================
val orderRDD = sc.textFile("/user/cloudera/sqoop_import_all/order_items")
val orderSeqRDD = orderRDD.map(rec => (rec.split(",")(0) , rec))
orderSeqRDD.saveSequenceFile("/user/cloudera/order_Seq1")
val orderSeq = sc.sequenceFile("/user/cloudera/order_Seq1", classOf[IntWritable] , classOf[Text])

or using newApi
import org.apache.hadoop.mapreduce.lib.output._

val orderSeqRDD = orderRDD.map(rec => (new Text(rec.split(",")(0)) , new Text(rec)))
orderSeqRDD.saveAsNewAPIHadoopFile("/user/cloudera/order_Seq1", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]])


wordcount in scala
==================


val wordText = sc.textFile("/user/cloudera/wordtext.txt")
val wordFlatMap = wordText.flatMap(x => x.split(" "))
val wordTuple = wordFlatMap.map(x => (x, 1))
val wordTuplecount = wordTuple.reduceByKey((x, y) => x + y )
wordTuplecount.saveAsTextFile("/user/cloudera/wordCountoutput_scala")



################################################################################################
# Join disparate datasets together using Spark
# Problem statement, get the total revenue and total number of orders from order_items on daily basis


val orderRDD = sc.textFile("/user/cloudera/sqoop_import_all/orders")
val orderMapRDD =  orderRDD.map(rec => (rec.split(",")(0) , rec))
val orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import_all/order_items")
val orderItemsMapRDD = orderItemsRDD.map(rec => (rec.split(",")(1) , rec ))
val orderItemsJoinRDD = orderMapRDD.join(orderItemsMapRDD)

o/p : scala> orderItemsJoinRDD.take(6).foreach(println);

(25128,(25128,2013-12-27 00:00:00.0,9212,CLOSED,62962,25128,191,2,199.98,99.99))
(25128,(25128,2013-12-27 00:00:00.0,9212,CLOSED,62963,25128,957,1,299.98,299.98))
(25128,(25128,2013-12-27 00:00:00.0,9212,CLOSED,62964,25128,627,4,159.96,39.99))
(34731,(34731,2014-02-24 00:00:00.0,2598,COMPLETE,86745,34731,627,2,79.98,39.99))
(34731,(34731,2014-02-24 00:00:00.0,2598,COMPLETE,86746,34731,1014,1,49.98,49.98))
(65454,(65454,2014-05-12 00:00:00.0,3064,CLOSED,163587,65454,777,3,239.97,79.99))


val orderSubtotalRDD = orderItemsJoinRDD.map(rec => ( (rec._1 , rec._2._1.split(",")(1)) , rec._2._2.split(",")(4).toFloat ))
val orderTotalRDD = orderSubtotalRDD.reduceByKey( (acc , value ) => acc +value )
val orderTotalMapRDD = orderTotalRDD.map(rec => (rec._1._2 , rec._2))

val orderreveueMapRDD = orderTotalMapRDD.groupByKey().map(rec => (rec._1 , rec._2.toList.sum))
val orderCountMapRDD= orderTotalMapRDD.map(rec => (rec._1 , 1 )).reduceByKey(( acc, value ) => acc + value)
val orderrevenueTotalMapRDD  = orderreveueMapRDD.join(orderCountMapRDD);
val orderAvgRDD = orderrevenueTotalMapRDD.map(rec => (rec._1 , "%1.2f".format(rec._2._1/rec._2._2)))
o/p: scala> orderAvgRDD.take(6).foreach(println);

orderAvgRDD.sortByKey().saveAsTextFile("/user/cloudera/orderAvg1");

====================
using aggregateByKey
====================
orderTotalMapRDD.aggregateByKey((0,0.00))( ((acc , value) => (acc._1 + 1 , acc._2 + value)) , ((acc1 , acc2) => (acc1._1 + acc2._1 , acc1._2 + acc2._2))).map(rec => (rec._1 , "%1.2f".format(rec._2._2 / rec._2._1))).sortByKey().saveAsTextFile("/user/cloudera/orderAvg_using_aggregateByKey");
implement using hiveContext
===========================

import org.apache.spark.sql.hive.HiveContext
val sqlHiveContext = new HiveContext(sc)
val avgRDD = sqlHiveContext.sql("select o.order_date , round(sum(oi.order_item_subtotal), 2)  /count(distinct o.order_id) from orders o , order_items oi where oi.order_item_order_id = o.order_id group by o.order_date order by o.order_date");
avgRDD.rdd.saveAsTextFile("/user/cloudera/avgRDD");




################################################################################################
#Customer id with max revenue for each day

val orderRDD = sc.textFile("/user/cloudera/sqoop_import-all/orders")
val orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import-all/order_items")
val orderMapRDD = orderRDD.map(x => (x.split(",")(0), x ))
val orderItemsMapRDD = orderItemsRDD.map(x => (x.split(",")(1), x ))
val orderItemsJoinRDD = orderMapRDD.join(orderItemsMapRDD)

After join :

( order id , ( order rec , order items records)
(37231,(37231,2014-03-11 00:00:00.0,4470,PENDING,92926,37231,191,3,299.97,99.99))
(37231,(37231,2014-03-11 00:00:00.0,4470,PENDING,92927,37231,403,1,129.99,129.99))
(37231,(37231,2014-03-11 00:00:00.0,4470,PENDING,92928,37231,1014,5,249.9,49.98))
(37231,(37231,2014-03-11 00:00:00.0,4470,PENDING,92929,37231,403,1,129.99,129.99))

val orderCustSubTotalMap = orderItemsJoinRDD.map(rec => ((rec._2._1.split(",")(1), rec._2._1.split(",")(2)) , rec._2._2.split(",")(4).toFloat))

val revenuePerDayPerCustomerMap  = orderCustSubTotalMap.reduceByKey((acc, value) => (acc + value))

((2013-08-01 00:00:00.0,5806),639.88)                                          
((2014-04-25 00:00:00.0,6415),609.83997)
((2014-05-27 00:00:00.0,7564),649.95)
((2014-01-16 00:00:00.0,10293),129.99)
((2013-10-14 00:00:00.0,10165),499.95)


val maxRevenuePerDay=revenuePerDayPerCustomerMap.map(rec => (rec._1._1 , (rec._1._2,rec._2))).reduceByKey((acc, value) => (if(acc._2 >= value._2) acc else value))

scala> maxRevenuePerDay.take(5).foreach(println)
(2013-10-05 00:00:00.0,(6647,1429.92))                                         
(2014-05-17 00:00:00.0,(6233,1829.88))
(2014-06-29 00:00:00.0,(4793,1649.8301))
(2014-04-23 00:00:00.0,(3020,1449.9))
(2013-10-27 00:00:00.0,(4959,1479.8099))



################################################################################################
#Filter Apis

Check if there are any cancelled orders with amount greater than 1000$
#Get only cancelled orders
#Join orders and order items
#Generate sum(order_item_subtotal) per order
#Filter data which amount to greater than 1000$


Hive Query

select order_id , total from
(select co.order_id as order_id , round(sum(oi.order_item_subtotal), 2) as total from
(select * from orders where order_status = 'CANCELED' ) as co,
order_items oi where oi.order_item_order_id = co.order_id  group by co.order_id ) q where total > 1000;


spark scala program to find cancelled orders with amount greater than 1000


val ordersItemsRDD = sc.textFile("/user/cloudera/sqoop_import_all/order_items")
val ordersRDD = sc.textFile("/user/cloudera/sqoop_import_all/orders/")
val cancelledOrderRDD = ordersRDD.filter(x => x.split(",")(3).contains("CANCELED"))
val cancelledOrderRDDMap = cancelledOrderRDD.map(rec => (rec.split(",")(0) , rec ))
val ordersItemsRDDMap = ordersItemsRDD.map( rec => (rec.split(",")(1) , rec ))
val cancelledOrderJoin = cancelledOrderRDDMap.join(ordersItemsRDDMap)
val cancelledOrderJoinMap = cancelledOrderJoin.map(rec => ( rec._1 , rec._2._2.split(",")(4).toFloat)).reduceByKey((acc,value)=> acc + value )
val resultMap = cancelledOrderJoinMap.filter(rec => rec._2 > 1000)

################################################################################################
#Sorting and Ranking

topN products by price for each catgeory

/user/cloudera/sqoop_import_all/products

val productsRDD = sc.textFile("/user/cloudera/sqoop_import_all/products")
val productsCatRDDMap = productsRDD.map(rec => (rec.split(",")(1) , rec )).groupByKey()
val productsCatSortedMap = productsCatRDDMap.map(rec => (rec._1 , rec._2.toList.sortBy(k => k.split(",")(4).toFloat)))
val productsCatSortedMap = productsCatRDDMap.flatMap(rec =>  rec._2.toList.sortBy(k => k.split(",")(4).toFloat))




No comments:

Post a Comment