Tuesday 19 April 2016

Pyspark Notes

Pyspark exercises
=============

pyspark shell is preloaded the spark context and it can be accessed via the variable name : sc
sc is like a connector to the spark driver which is pyspark shell / spark-shell

However if you are running the pyspark/scala/java standalone programs, you need to export or create the spark context before starting the data processing.

Two types of data processing are available : Transformations and Actions
Transformations:
============
flatMap
map
reduceByKey
reduce
groupByKey
combinerByKey
aggregateByKey

Actions:=====

count
take
takeSorted
countByKey
saveAsTextFile
saveAsSequenceFile




Program 1
#########################################################
Load the file from hdfs and save into hdfs as sequence as well as text files:

As text file:

orderRDD = sc.textFile("/user/cloudera/sqoop_import-all/orders")
orderRDD.saveAsTextFile("/user/cloudera/sqoop_import-all/orders_temp")

As sequence file :

orderRDD = sc.textFile("/user/cloudera/sqoop_import-all/orders")
orderSeqRDD = orderRDD.map(lambda x : (x.split(",")[0] , x ) )       // generate a key value tupe with key as order id and entire record as the value
orderSeqRDD.saveAsSequenceFile("/user/cloudera/sqoop_import-all/orders_temp_seq")


read the sequence file:
orderRDD = sc.sequenceFile("/user/cloudera/sqoop_import-all/orders_temp_seq")



Program 2: 
#########################################################
Simple word count example using pyspark


wordText = sc.textFile("/user/cloudera/wordtext.txt")
wordFlatMap = wordText.flatMap(lambda x: x.split(" "))
wordTuple = wordFlatMap.map(lambda x: (x, 1))
wordTuplecount = wordTuple.reduceByKey(lambda x, y : x + y )
wordTuplecount.saveAsTextFile("/user/cloudera/wordCountoutput")


Program 3:
################################################################################################
# Join disparate datasets together using Spark
# Problem statement, get the total revenue and total number of orders from order_items on daily basis
Step1 : ########
orderRDD = sc.textFile("/user/cloudera/sqoop_import-all/orders")

order
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE

Step2 :########
orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import-all/order_items")

order items
1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99

Step3:########
join

orderMapRDD = orderRDD.map(lambda x: (x.split(",")[0], x ))
orderItemsMapRDD = orderItemsRDD.map(lambda x: (x.split(",")[1], x ))
orderItemsJoinRDD = orderMapRDD.join(orderItemsMapRDD)


(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'87999,35236,365,1,59.99,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88000,35236,191,1,99.99,99.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88001,35236,365,1,59.99,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88002,35236,365,4,239.96,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88003,35236,957,1,299.98,299.98'))
(u'35540', (u'35540,2014-02-28 00:00:00.0,5028,CLOSED', u'88767,35540,1004,1,399.98,399.98'))
########
orderRevenueMap = orderItemsJoinRDD.map(lambda rec : (rec[1][0].split(",")[1] , float(rec[1][1].split(",")[4])))

### by reduceByKey Operation
orderTotalRevenue = orderRevenueMap.reduceByKey(lambda acc, val : acc + val)

### by groupByKey Operation --> very expensive if the dataset is very huge.
orderTotalRevenue = orderRevenueMap.groupByKey().map(lambda rec : (rec[0], sum(rec[1])))

(order dt , sub total )   reduceByKey  -->Generates--> ( order dt , total revenue)

########
pyspark shell
orderTotalMap = orderItemsJoinRDD.map(lambda rec : (rec[1][0].split(",")[1] , 1))
orderTotal = orderTotalMap.reduceByKey(lambda acc , value : acc + value)

 (order dt , 1 )  reducebyKey -->Generates--> (order dt, total order )

########
Join both datasets ie total revenue, total order : (dt , (tptal revenue, total orders))


resultOPMap = orderTotal.join(orderTotalRevenue)



################################################################################
# Problem statement, get the average revenue per order on daily basis( agreggateByKey)

Run till Step3:

(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'87999,35236,365,1,59.99,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88000,35236,191,1,99.99,99.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88001,35236,365,1,59.99,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88002,35236,365,4,239.96,59.99'))
(u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'88003,35236,957,1,299.98,299.98'))
(u'35540', (u'35540,2014-02-28 00:00:00.0,5028,CLOSED', u'88767,35540,1004,1,399.98,399.98'))
Step4#################

orderdtOrderIdMap = orderItemsJoinRDD.map(lambda rec : ((rec[1][0].split(",")[1], rec[1][0].split(",")[0]), float(rec[1][1].split(",")[4])))

((order dt, order id), sub total )

reduceByKey -->  ((order dt, order id), total revenue per order )


totalRevenuePerOrder = orderdtOrderIdMap.reduceByKey(lambda acc, val : acc +val )

Step5:#############(using without aggregate funtion )


totalOrderMap = totalRevenuePerOrder.map(lambda rec : (rec[0][0], 1))     // apply map and emit (order dt, 1)
totalOrderPerDay = totalOrderMap.reduceByKey(lambda acc , val : acc +val)  // apply reduce and generate (order dt, total no of orders per day )

totalRevenueMap = totalRevenuePerOrder.map(lambda rec : (rec[0][0], rec[1]))   // apply map and emit (order dt, sub total) 
totalRevenuePerDay = totalRevenueMap.reduceByKey(lambda acc , val : acc +val)  // apply reduce and generate (order dt, total revenue per day )


join both dataset and generate (key : order dt, value : (total revenue per day, total orders per day))

avgMapRDD = totalOrderPerDay.join(totalRevenuePerDay)

(u'2013-08-23 00:00:00.0', (169, 99616.169999999998))
(u'2014-04-06 00:00:00.0', (85, 56192.120000000017))
(u'2013-11-29 00:00:00.0', (220, 136296.63000000003))

Step6###############

avgPerDay = avgMapRDD.map(lambda rec : (rec[0] , rec[1][1]/rec[1][0]))

####################
Using AggregateByKey
Run till step4 :


Step5######  ( using a AggregateByKey)

totalRevenuePerOrderMap = totalRevenuePerOrder.map(lambda rec :(rec[0][0] , rec[1]))

(u'2014-05-12 00:00:00.0', 1054.8299999999999)
(u'2013-11-19 00:00:00.0', 499.93000000000006)

#Apply a aggregate fn and generate ( key : order dt , value : (total no of order per day, total revenue per day)  
Note : input is of type ( key : order dt , value : sub total ) and
       output is of type ( key : order dt , value : (total no of order per day, total revenue per day)     


avgPerDayMap = totalRevenuePerOrderMap.aggregateByKey((0,0), \
lambda acc , value : (acc[0] + 1 , acc[1] + value) , \
lambda acc1, acc2 : (round(acc1[0] + acc2[0], 2) , acc1[1] + acc2[1]))

Step6###############

avgPerDay = avgPerDayMap.map(lambda rec : (rec[0] , "{0:.2f}".format(round(rec[1][1]/rec[1][0]))))

note: "{0:.2f}".format() --> formats into 2 decimal points.

sorting and ranking:
#####################


products = sc.textFile("/user/cloudera/sqoop_import-all/products")


productsCatMap = products.map(lambda rec : (rec.split(",")[1], rec)).groupByKey()

productsCatMapList = productsCatMap.map(lambda rec : (rec[0], list(rec[1])))

productsCatMapSortedList = productsCatMap.map(lambda rec : (rec[0], sorted(list(rec[1]), key=lambda k : k.split(",")[4], reverse=True)))


def getTopN(rec, topN):
  x = [ ]
  x = list(sorted(rec[1], key=lambda k: float(k.split(",")[4]), reverse=True))
  y = [elm.split(",")[4] for elm in x]
  import itertools
  return list(itertools.islice(y, 0, topN))

productsCatMapSortedList = productsCatMap.flatMap(lambda rec: (rec[0], rec[0], getTopN(rec, 5)))
productsCatMapSortedList = productsCatMap.map(lambda rec: getTopN(rec, 5))



######Dense Ranking#####

def getTopDenseN(rec, topN):
  x = [ ]
  topPriceList = [ ]
  topPricedProductList = [ ]
  for i in rec[1]:
      x.append(float(i.split(",")[4]))
  import itertools
  topPriceList = list(itertools.islice(sorted(set(x), reverse=True), 0 ,topN))
  for elm in rec[1]:
     if (float(elm.split(",")[4]) in topPriceList):
        topPricedProductList.append(elm)
  return topPricedProductList


productsCatMapSortedList = productsCatMap.map(lambda rec: (rec[0] , getTopDenseN(rec, 5)))


filter Apis
===========

1. find foodball products which the price is less than 100$

footballProductsRDD = productRDD.filter(lambda rec : ( "football" in rec.split(",")[2].lower() and float(rec.split(",")[4]) <= 100 ))


2. football and mens products

footballOrMenProductsRDD = productRDD.filter(lambda rec : ( ("football" in rec.split(",")[2].lower() or " men" in rec.split(",")[2].lower()) and float(rec.split(",")[4]) <= 100 ))





Wednesday 13 April 2016

Flume


Flume usage and configuration.

Apache Flume is a distributed system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.

A Flume event -  a unit of data flow having a byte payload and an optional set of string attributes.
A Flume agent- JVM process that hosts the components through which events flow from an external source to the next destination (hop).

Flume Agent has 3 components:

1. Source: Source will consume message from the source system and will convert the message into various data format(Avro, thrift..)
2. Channel: Buffer space which stores the message temporarily before flushing out to sink
3. Sink: Destination space where the message will ultimately written. It can be a logger, hdfs location, jms or another Flume source.



http://archive-primary.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.2/_images/UserGuide_image00.png

Different configuration of Flume



1.  Flume agent with an avro source and sink as a logger.
     (direct the a file content to the source using avro-client utility)

Here we used a source of type "avro" and sink as a logger.

#me the components on this agent
a1.sources = source1
a1.sinks = sink1
a1.channels = channel1

# Describe/configure the source
a1.sources.source1.type = avro
a1.sources.source1.bind = localhost
a1.sources.source1.port = 44444

# Describe the sink
a1.sinks.sink1.type = logger

# Use a channel which buffers events in memory
a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1




Use below command to start the Flume agent

flume-ng agent -n a1 -c . -f AvroLogger.conf

Use below utility to direct the content to the above listening agent.

flume-ng avro-client -H localhost -p 44444 -F /usr/tmp/cm_api.log

2. Flume Agent with an avro source and sink as a hdfs
   (direct the a file content to the source using avro-client utility)

#  components on this agent
a1.sources = source1
a1.sinks = sink1
a1.channels = channel1

# Describe/configure the source
a1.sources.source1.type = avro
a1.sources.source1.bind = localhost
a1.sources.source1.port = 44444

# Describe the sink
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.hdfs.path = hdfs://quickstart.cloudera:8020/user/cloudera/Flume/Events
a1.sinks.sink1.hdfs.rollSize = 1024
a1.sinks.sink1.hdfs.filePrefix = Flume_events
a1.sinks.sink1.hdfs.batchSize = 100


# Use a channel which buffers events in memory
a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1



Use below command to start the Flume agent

flume-ng agent -n a1 -c . -f AvroHDFS.conf

Use below utility to direct the content to the above listening agent.

flume-ng avro-client -H localhost -p 44444 -F /usr/tmp/cm_api.log

3.  MultiAgent Configuration:

a. Consolidation configuration: 

 http://archive-primary.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.2/_images/UserGuide_image02.png


A typical example of this configuration is for redirecting web logs from multiple web servers to one hdfs location. There will be multiple agents in two layers. Each webserver has one agent to collect logs from respective server and another one to aggregate the logs from multiple agents.

Below are the configuration :
Two agents configured :  wsAgent which listen for access log
AvroSourceAgent which is an aggregator


wsAgent
#components on this web server agent 


wsAgent.sources = source1
wsAgent.sinks = sink1
wsAgent.channels = channel1

# Describe/configure the source
wsAgent.sources.source1.type = exec
wsAgent.sources.source1.command = tail -F /opt/gen_logs/logs/access.log

# Describe the sink
wsAgent.sinks.sink1.type = avro
wsAgent.sinks.sink1.hostname = localhost
wsAgent.sinks.sink1.port = 44444

# Use a channel which buffers events in memory
wsAgent.channels.channel1.type = memory
wsAgent.channels.channel1.capacity = 1000
wsAgent.channels.channel1.transactionCapacity = 100

# Bind the source and sink to the channel
wsAgent.sources.source1.channels = channel1
wsAgent.sinks.sink1.channel = channel1




AvroSourceAgent
# components on aggregator


AvroSourceAgent.sources = source2
AvroSourceAgent.sinks = sink2
AvroSourceAgent.channels = channel2

# Describe/configure the source
AvroSourceAgent.sources.source2.type = avro
AvroSourceAgent.sources.source2.bind = localhost
AvroSourceAgent.sources.source2.port = 44444

# Describe the sink
AvroSourceAgent.sinks.sink2.type = hdfs
AvroSourceAgent.sinks.sink2.hdfs.path = hdfs://quickstart.cloudera:8020/user/cloudera/WebServer/Events/WS1
AvroSourceAgent.sinks.sink2.hdfs.rollSize = 1024
AvroSourceAgent.sinks.sink2.hdfs.filePrefix = Flume_access_log
AvroSourceAgent.sinks.sink2.hdfs.batchSize = 100


# Use a channel which buffers events in memory
AvroSourceAgent.channels.channel2.type = memory
AvroSourceAgent.channels.channel2.capacity = 1000
AvroSourceAgent.channels.channel2.transactionCapacity = 100

# Bind the source and sink to the channel
AvroSourceAgent.sources.source2.channels = channel2
AvroSourceAgent.sinks.sink2.channel = channel2


Commands to start Flume agents:
flume-ng agent -n wsAgent -c . -f wsAgent.conf
flume-ng agent -n AvroSourceAgent -c . -f AvroSourceAgent.conf 

Cloudera provides a utility to simulate the access log population. type start_logs to start the logs in /opt/gen_logs/logs/access.log

 stop_logs can be used to stop the logs.

[cloudera@quickstart ~]$ start_logs
[cloudera@quickstart ~]$


b. Fan Out configuration

 Flume supports fanning out the flow from one source to multiple channels. There are two modes of fan out, replicating and multiplexing. In the replicating flow, the event is sent to all the configured channels. In case of multiplexing, the event is sent to only a subset of qualifying channels. To fan out the flow, one needs to specify a list of channels for a source and the policy for the fanning it out. This is done by adding a channel “selector” that can be replicating or multiplexing. Then further specify the selection rules if it’s a multiplexer. If you don’t specify a selector, then by default it’s replicating.


http://archive-primary.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.2/_images/UserGuide_image01.png



Below is the sample configuration:
1 source
2 channels
2 sinks - one sink is used to write to hdfs and other to logger.

[cloudera@quickstart fanout]$ cat fanOutReplication.conf
#me the components on this agent
a1.sources = source1
a1.sinks = sink1 sink2
a1.channels = channel1 channel2

# Describe/configure the source
a1.sources.source1.type = avro
a1.sources.source1.bind = localhost
a1.sources.source1.port = 44444
a1.sources.source1.selector.type = replicating


# Describe the sink1
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.hdfs.path = hdfs://quickstart.cloudera:8020/user/cloudera/FanoutChannel/
a1.sinks.sink1.hdfs.rollSize = 1024
a1.sinks.sink1.hdfs.filePrefix = Flume_events
a1.sinks.sink1.hdfs.batchSize = 100

# Describe the sink2
a1.sinks.sink2.type = logger

# Use a channel which buffers events in memory
a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# Channel2

a1.channels.channel2.type = memory
a1.channels.channel2.capacity = 1000
a1.channels.channel2.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.source1.channels = channel1 channel2
a1.sinks.sink1.channel = channel1
a1.sinks.sink2.channel = channel2
[cloudera@quickstart fanout]$

Commands to start Flume agents:
flume-ng agent -n a1 -c . -f fanOutReplication.conf


Monday 11 April 2016

Sqoop Import commands

sqoop commands:

sqoop list database  command
==================


sqoop list-databases \
  --connect "jdbc:mysql://quickstart.cloudera:3306" \
  --username retail_dba \
  --password cloudera

sqoop list tables command
=========================

sqoop list-tables \
 --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
 --username retail_dba \
 --password cloudera

sqoop eval command
=========================

sqoop eval \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username retail_dba \
  --password cloudera \
  --query "select count(1) from order_items"


sqoop import all tables commands
================================

sqoop import-all-tables \
   -m 12 \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username=retail_dba \
  --password=cloudera \
  --warehouse-dir=/user/cloudera/sqoop_import_all

sqoop import individual tables commands
=======================================
--Default
sqoop import \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username=retail_dba \
  --password=cloudera \
  --table departments \
  --as-textfile \
  --target-dir=/user/cloudera/departments

note : use below options for various file formats

  --as-avrodatafile \
  --as-sequencefile \
  --as-textfile \
  --as-parquetfile \

sqoop import delimitors
=======================
sqoop import \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username=retail_dba \
  --password=cloudera \
  --table departments \
  --as-textfile \
  --target-dir=/user/cloudera/departments \
  --fields-terminated-by '|' \
  --lines-terminated-by '\n' \


sqoop boundary Query and columns
================================

sqoop import \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username=retail_dba \
  --password=cloudera \
  --table departments \
  --target-dir /user/cloudera/departments \
  --boundary-query "select 2, 8 from departments" \
  --columns department_id,department_name

sqoop where clause
==================

sqoop import \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username=retail_dba \
  --password=cloudera \
  --table departments \
  --target-dir /user/cloudera/departments \
  --where  "department_id > 4" \
  --columns department_id,department_name



sqoop join query and split by
=============================


sqoop import \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username=retail_dba \
  --password=cloudera \
  --query="select * from orders join order_items on orders.order_id = order_items.order_item_order_id where \$CONDITIONS" \
  --target-dir /user/cloudera/order_join \
  --split-by order_id


note : sqoop look for the primary key to determine no of mappers. Hence primary key or split by key must be specified.


sqoop incremental load
======================

sqoop import \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username=retail_dba \
  --password=cloudera \
  --table departments \
  --target-dir /user/cloudera/departments \
  --append \
  --fields-terminated-by '|' \
  --lines-terminated-by '\n' \
  --check-column "department_id" \
  --incremental append \
  --last-value 7 \


note : 2 modes are present for incremental load
1. append
2. last modified

HIVE Related sqoop commands
===========================

Loading avro formatted dataset to hive.

1. sqoop command to generate avro dataset from mysql

sqoop import \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username=retail_dba \
  --password=cloudera \
  --table departments \
  --as-avrodatafile \
  --target-dir=/user/cloudera/departments

note : an Avro schema file will be generated in the path where sqoop import runs.

2. copy the schema generated to hadoop

fs -put departments.avsc /user/cloudera

3. Create Hive table specifying data set location and avro serde

CREATE EXTERNAL TABLE departments
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 'hdfs://quickstart.cloudera/user/cloudera/departments'
TBLPROPERTIES ('avro.schema.url'='hdfs://quickstart.cloudera/user/cloudera/departments.avsc');