Flume分布式集群搭建的示例,该示例搭建环境以之前几篇文章的操作环境为基础,而且测试例子使用KafkaSink,因此,若未搭建Kafka集群请先查阅:《分布式集群手操 – Kafka搭建 》
准备 下载可从官方网站选择合适的版本下载,本文以1.7.0为例
1 wget http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
解压此处不再重复说明。
配置 进入flume安装的根目录下的config 目录,重命名flume-env.sh.template 为flume-env.sh 并修改其内容,主要修改JAVA_HOME 为当前系统安装JDK的路径即可。
将Flume的bin 目录写入环境变量,运行flume-ng version
命令如下:
1 2 3 4 5 Flume 1.7.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707 Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016 From source with checksum 0d21b3ffdc55a07e1d08875872c00523
一个KafkaSink的测试例子 首先,我们以Flume数据采集集群汇总数据到处理中心,再按需发送至Kafka集群供下一步处理为原型,设计通过hdfs1、hdfs2两个agent进行日志采集,通过AvroSink汇总至hdfs3,然后再使用KafkaSink发送至Kafka消息队列
编写配置文件 hdfs1的agent配置文件 kafka-sink.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 agent1.sources = s1 agent1.channels = c1 agent1.sinks = k1 agent1.sources.s1.type = exec agent1.sources.s1.command = tail -F /home/bigdata/accesslog/access.log agent1.sources.s1.channels = c1 agent1.channels.c1.type = memory agent1.channels.c1.capacity = 1000 agent1.channels.c1.transactionCapacity = 100 agent1.sinks.k1.type = avro agent1.sinks.k1.channel = c1 agent1.sinks.k1.hostname = hdfs3 agent1.sinks.k1.port = 4545
以上配置信息表明该agent 的名称为agent1,source 为exec tail -f <文件>
,可针对生产环境使用不同的source ,参见官方用户文档 ,使用内存缓冲通道,avro 的sink ,目标为hdfs3的4545端口
hdfs2的agent配置文件 该文件同hdfs1的agent 配置文件,只是agent 的名称不同而已,为agent2
hdfs3的agent配置文件 kafka-sink.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 agent3.sources = s3 agent3.channels = c3 agent3.sinks = k3 agent3.sources.s3.type = avro agent3.sources.s3.channels = c3 agent3.sources.s3.bind = 0.0.0.0 agent3.sources.s3.port = 4545 agent3.channels.c3.type = memory agent3.channels.c3.capacity = 1000 agent3.channels.c3.transactionCapacity = 100 agent3.sinks.k3.channel = c3 agent3.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink agent3.sinks.k3.kafka.topic = mytopic agent3.sinks.k3.kafka.bootstrap.servers = hdfs1:9092,hdfs2:9092,hdfs3:9092 agent3.sinks.k3.kafka.flumeBatchSize = 30 agent3.sinks.k3.kafka.producer.acks = 1 agent3.sinks.k3.kafka.producer.linger.ms = 1 agent3.sinks.k3.kafka.producer.compression.type = snappy
以上配置文件指明hdfs3的agent 名称为agent3 ,source 为avro ,绑定端口4545,同样使用内存做为缓冲通道,使用KafkaSink ,指定了服务集群、BatchSize、Topic、压缩方式等
启动Kafka集群 在Kafka安装目录下运行命令
1 ./bin/kafka-server-start.sh config/server.properties &
创建指定的Topic
1 ./bin/kafka-topics.sh -zookeeper hdfs1:2181,hdfs2:2181,hdfs3:2181 -topic mytopic -replication-factor 1 -partitions 1 -create
创建消费者(用于查看Kafka消息是否存在)
1 ./bin/kafka-console-consumer.sh -zookeeper hdfs1:2181,hdfs2:2181,hdfs3:2181 - from-begining -topic mytopic
启动Flume 在hdfs1节点运行
1 flume-ng agent -n agent1 -c conf -f ~/services/apache-flume-1.7.0-bin/conf/kafka-sink.conf
在hdfs2节点运行
1 flume-ng agent -n agent2 -c conf -f ~/services/apache-flume-1.7.0-bin/conf/kafka-sink.conf
在hdfs3节点运行
1 flume-ng agent -n agent3 -c conf -f ~/services/apache-flume-1.7.0-bin/conf/kafka-sink.conf
然后启动模拟的access日志生成脚本,该脚本会不断在access.log
文件末尾附加模拟的访问日志记录(关于该脚本,可参见《Storm实时处理 – Nginx访问日志 》
此时可看到消费者控制台输出了大量访问日志信息,测试成功