Wednesday 13 April 2016

Flume


Flume usage and configuration.

Apache Flume is a distributed system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.

A Flume event -  a unit of data flow having a byte payload and an optional set of string attributes.
A Flume agent- JVM process that hosts the components through which events flow from an external source to the next destination (hop).

Flume Agent has 3 components:

1. Source: Source will consume message from the source system and will convert the message into various data format(Avro, thrift..)
2. Channel: Buffer space which stores the message temporarily before flushing out to sink
3. Sink: Destination space where the message will ultimately written. It can be a logger, hdfs location, jms or another Flume source.



http://archive-primary.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.2/_images/UserGuide_image00.png

Different configuration of Flume



1.  Flume agent with an avro source and sink as a logger.
     (direct the a file content to the source using avro-client utility)

Here we used a source of type "avro" and sink as a logger.

#me the components on this agent
a1.sources = source1
a1.sinks = sink1
a1.channels = channel1

# Describe/configure the source
a1.sources.source1.type = avro
a1.sources.source1.bind = localhost
a1.sources.source1.port = 44444

# Describe the sink
a1.sinks.sink1.type = logger

# Use a channel which buffers events in memory
a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1




Use below command to start the Flume agent

flume-ng agent -n a1 -c . -f AvroLogger.conf

Use below utility to direct the content to the above listening agent.

flume-ng avro-client -H localhost -p 44444 -F /usr/tmp/cm_api.log

2. Flume Agent with an avro source and sink as a hdfs
   (direct the a file content to the source using avro-client utility)

#  components on this agent
a1.sources = source1
a1.sinks = sink1
a1.channels = channel1

# Describe/configure the source
a1.sources.source1.type = avro
a1.sources.source1.bind = localhost
a1.sources.source1.port = 44444

# Describe the sink
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.hdfs.path = hdfs://quickstart.cloudera:8020/user/cloudera/Flume/Events
a1.sinks.sink1.hdfs.rollSize = 1024
a1.sinks.sink1.hdfs.filePrefix = Flume_events
a1.sinks.sink1.hdfs.batchSize = 100


# Use a channel which buffers events in memory
a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1



Use below command to start the Flume agent

flume-ng agent -n a1 -c . -f AvroHDFS.conf

Use below utility to direct the content to the above listening agent.

flume-ng avro-client -H localhost -p 44444 -F /usr/tmp/cm_api.log

3.  MultiAgent Configuration:

a. Consolidation configuration: 

 http://archive-primary.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.2/_images/UserGuide_image02.png


A typical example of this configuration is for redirecting web logs from multiple web servers to one hdfs location. There will be multiple agents in two layers. Each webserver has one agent to collect logs from respective server and another one to aggregate the logs from multiple agents.

Below are the configuration :
Two agents configured :  wsAgent which listen for access log
AvroSourceAgent which is an aggregator


wsAgent
#components on this web server agent 


wsAgent.sources = source1
wsAgent.sinks = sink1
wsAgent.channels = channel1

# Describe/configure the source
wsAgent.sources.source1.type = exec
wsAgent.sources.source1.command = tail -F /opt/gen_logs/logs/access.log

# Describe the sink
wsAgent.sinks.sink1.type = avro
wsAgent.sinks.sink1.hostname = localhost
wsAgent.sinks.sink1.port = 44444

# Use a channel which buffers events in memory
wsAgent.channels.channel1.type = memory
wsAgent.channels.channel1.capacity = 1000
wsAgent.channels.channel1.transactionCapacity = 100

# Bind the source and sink to the channel
wsAgent.sources.source1.channels = channel1
wsAgent.sinks.sink1.channel = channel1




AvroSourceAgent
# components on aggregator


AvroSourceAgent.sources = source2
AvroSourceAgent.sinks = sink2
AvroSourceAgent.channels = channel2

# Describe/configure the source
AvroSourceAgent.sources.source2.type = avro
AvroSourceAgent.sources.source2.bind = localhost
AvroSourceAgent.sources.source2.port = 44444

# Describe the sink
AvroSourceAgent.sinks.sink2.type = hdfs
AvroSourceAgent.sinks.sink2.hdfs.path = hdfs://quickstart.cloudera:8020/user/cloudera/WebServer/Events/WS1
AvroSourceAgent.sinks.sink2.hdfs.rollSize = 1024
AvroSourceAgent.sinks.sink2.hdfs.filePrefix = Flume_access_log
AvroSourceAgent.sinks.sink2.hdfs.batchSize = 100


# Use a channel which buffers events in memory
AvroSourceAgent.channels.channel2.type = memory
AvroSourceAgent.channels.channel2.capacity = 1000
AvroSourceAgent.channels.channel2.transactionCapacity = 100

# Bind the source and sink to the channel
AvroSourceAgent.sources.source2.channels = channel2
AvroSourceAgent.sinks.sink2.channel = channel2


Commands to start Flume agents:
flume-ng agent -n wsAgent -c . -f wsAgent.conf
flume-ng agent -n AvroSourceAgent -c . -f AvroSourceAgent.conf 

Cloudera provides a utility to simulate the access log population. type start_logs to start the logs in /opt/gen_logs/logs/access.log

 stop_logs can be used to stop the logs.

[cloudera@quickstart ~]$ start_logs
[cloudera@quickstart ~]$


b. Fan Out configuration

 Flume supports fanning out the flow from one source to multiple channels. There are two modes of fan out, replicating and multiplexing. In the replicating flow, the event is sent to all the configured channels. In case of multiplexing, the event is sent to only a subset of qualifying channels. To fan out the flow, one needs to specify a list of channels for a source and the policy for the fanning it out. This is done by adding a channel “selector” that can be replicating or multiplexing. Then further specify the selection rules if it’s a multiplexer. If you don’t specify a selector, then by default it’s replicating.


http://archive-primary.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.2/_images/UserGuide_image01.png



Below is the sample configuration:
1 source
2 channels
2 sinks - one sink is used to write to hdfs and other to logger.

[cloudera@quickstart fanout]$ cat fanOutReplication.conf
#me the components on this agent
a1.sources = source1
a1.sinks = sink1 sink2
a1.channels = channel1 channel2

# Describe/configure the source
a1.sources.source1.type = avro
a1.sources.source1.bind = localhost
a1.sources.source1.port = 44444
a1.sources.source1.selector.type = replicating


# Describe the sink1
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.hdfs.path = hdfs://quickstart.cloudera:8020/user/cloudera/FanoutChannel/
a1.sinks.sink1.hdfs.rollSize = 1024
a1.sinks.sink1.hdfs.filePrefix = Flume_events
a1.sinks.sink1.hdfs.batchSize = 100

# Describe the sink2
a1.sinks.sink2.type = logger

# Use a channel which buffers events in memory
a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# Channel2

a1.channels.channel2.type = memory
a1.channels.channel2.capacity = 1000
a1.channels.channel2.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.source1.channels = channel1 channel2
a1.sinks.sink1.channel = channel1
a1.sinks.sink2.channel = channel2
[cloudera@quickstart fanout]$

Commands to start Flume agents:
flume-ng agent -n a1 -c . -f fanOutReplication.conf


No comments:

Post a Comment