分布式集群手操 – Flume搭建 Flume+Kafka的例子

Flume分布式集群搭建的示例,该示例搭建环境以之前几篇文章的操作环境为基础,而且测试例子使用KafkaSink,因此,若未搭建Kafka集群请先查阅:《分布式集群手操 – Kafka搭建

准备

下载可从官方网站选择合适的版本下载,本文以1.7.0为例

1
wget http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

解压此处不再重复说明。

配置

进入flume安装的根目录下的config目录,重命名flume-env.sh.templateflume-env.sh并修改其内容,主要修改JAVA_HOME为当前系统安装JDK的路径即可。

将Flume的bin目录写入环境变量,运行flume-ng version命令如下:

1
2
3
4
5
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523

一个KafkaSink的测试例子

首先,我们以Flume数据采集集群汇总数据到处理中心,再按需发送至Kafka集群供下一步处理为原型,设计通过hdfs1、hdfs2两个agent进行日志采集,通过AvroSink汇总至hdfs3,然后再使用KafkaSink发送至Kafka消息队列

编写配置文件

hdfs1的agent配置文件

kafka-sink.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
agent1.sources = s1
agent1.channels = c1
agent1.sinks = k1
agent1.sources.s1.type = exec
agent1.sources.s1.command = tail -F /home/bigdata/accesslog/access.log
agent1.sources.s1.channels = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hostname = hdfs3
agent1.sinks.k1.port = 4545

以上配置信息表明该agent的名称为agent1,sourceexec tail -f <文件>,可针对生产环境使用不同的source,参见官方用户文档,使用内存缓冲通道,avrosink,目标为hdfs3的4545端口

hdfs2的agent配置文件

该文件同hdfs1的agent配置文件,只是agent的名称不同而已,为agent2

hdfs3的agent配置文件

kafka-sink.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
agent3.sources = s3
agent3.channels = c3
agent3.sinks = k3
agent3.sources.s3.type = avro
agent3.sources.s3.channels = c3
agent3.sources.s3.bind = 0.0.0.0
agent3.sources.s3.port = 4545
agent3.channels.c3.type = memory
agent3.channels.c3.capacity = 1000
agent3.channels.c3.transactionCapacity = 100
agent3.sinks.k3.channel = c3
agent3.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink
agent3.sinks.k3.kafka.topic = mytopic
agent3.sinks.k3.kafka.bootstrap.servers = hdfs1:9092,hdfs2:9092,hdfs3:9092
agent3.sinks.k3.kafka.flumeBatchSize = 30
agent3.sinks.k3.kafka.producer.acks = 1
agent3.sinks.k3.kafka.producer.linger.ms = 1
agent3.sinks.k3.kafka.producer.compression.type = snappy

以上配置文件指明hdfs3的agent名称为agent3sourceavro,绑定端口4545,同样使用内存做为缓冲通道,使用KafkaSink,指定了服务集群、BatchSize、Topic、压缩方式等

启动Kafka集群

在Kafka安装目录下运行命令

1
./bin/kafka-server-start.sh config/server.properties &

创建指定的Topic

1
./bin/kafka-topics.sh -zookeeper hdfs1:2181,hdfs2:2181,hdfs3:2181 -topic mytopic -replication-factor 1 -partitions 1 -create

创建消费者(用于查看Kafka消息是否存在)

1
./bin/kafka-console-consumer.sh -zookeeper hdfs1:2181,hdfs2:2181,hdfs3:2181 - from-begining -topic mytopic

启动Flume

在hdfs1节点运行

1
flume-ng agent -n agent1 -c conf -f ~/services/apache-flume-1.7.0-bin/conf/kafka-sink.conf

在hdfs2节点运行

1
flume-ng agent -n agent2 -c conf -f ~/services/apache-flume-1.7.0-bin/conf/kafka-sink.conf

在hdfs3节点运行

1
flume-ng agent -n agent3 -c conf -f ~/services/apache-flume-1.7.0-bin/conf/kafka-sink.conf

然后启动模拟的access日志生成脚本,该脚本会不断在access.log文件末尾附加模拟的访问日志记录(关于该脚本,可参见《Storm实时处理 – Nginx访问日志

此时可看到消费者控制台输出了大量访问日志信息,测试成功


分布式集群手操 – Flume搭建 Flume+Kafka的例子
https://vicasong.github.io/big-data/flume-distribute-install/
作者
Vica
发布于
2016年10月20日
许可协议