Flume浅度学习指南

栏目: 服务器 · 发布时间: 6年前

内容简介:cloudera 公司开源的,贡献给Apache基金会

flume简介

cloudera 公司开源的,贡献给Apache基金会

http://flume.apache.org/

http://archive.cloudera.com/c...

只能运行在 linux 系统上

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

flume用来高效的收集、聚合、移动大量的日志数据

有一个基于流式的简单的有弹性的传输模型

有一个健壮的可容错的机制

使用简单,可以扩展的数据模型运行使用到在线实时分析应用中

简单体现在flume-agent的配置及传输模型简单

在线实时分析应用中

flume日志的实时采集-》sparkStreaming/storm/Flink =》mysql/redis=》实时分析的结果进行报表展示

数据(日志)的移动传输工具:

日志=》系统运行日志、web服务器的访问日志、客户端的用户行为日志、软件的运行操作日志

可以将数据从数据源中采集并移动到另外一个目的地:

数据源=》系统本地日志文件中的数据、jms、avro端口、kafka、系统本地目录下... 
目的地=》hdfs、hive、hbase、kafka、系统本地一个文件中...

如何将linux本地的一个日志文件中的日志数据采集到hdfs上

  • 脚本+hdfs命令 =》【周期性】上传

    #!/bin/sh
        HADOOP_HOME=/opt/cdh-5.14.2/hadoop-2.6.0-cdh5.14.2    
        $HADOOP_HOME/bin/hdfs -put /.../xx.log  /hdfs

    针对项目初期数据量较少时可以使用 , 没有容灾性及稳定性

  • 采用flume日志采集框架=》【实时】采集一个日志文件中实时追加的日志数据并写入到目的地

    针对不同的应用场景定义并启动对应的flume-agent实例/进程

    source  -- 定义从哪里采集数据  
        exec类型的source可以借助Linux的 shell 命令实现实时读取一个日志文件中动态追加的日志数据 
        avro类型 
        ……
    channel  -- 定义了source采集的数据临时存储地   
        memory 基于内存的管道容器 
        file 基于磁盘 
    sink  -- 定义将数据最终写入的-目的地  
        hdfs类型的sink将数据最终写入到hdfs上  
        hive类型将数据最终写入到hive表 
        kafka类型将数据最终写入到kafka分布式消息队列中  
        ……

flume-agent实例的模型

每个flume-agent实例至少由以下三个功能模块组成 
    source模块  
        用于监控数据源并进行数据的实时采集,是实时产生数据流的模块
        数据源=》系统本地的一个日志文件中、kafka、jms、系统本地的一个目录下、avro端口  。。。 
        source将采集到的数据提交到channel中
    channel模块  
        用于连接source和sink的管道容器  
        类似一个队列(FIFO)
    sink模块  
        从channel中拉取take(剪切)数据并最终将数据写入到目的地
        目的地=》hdfs、hive、hbase、kafka、avro端口...  
            
event事件: 
    event事件是flume传输日志数据时基本单元,在flume-agent内部数据都是以事件形式存在 
        source将采集到的数据封装成一个个的event事件,将事件提交到channel
        sink从channel消费事件并将事件中封装的数据最终写入到目的地  
    event事件的数据结构:header + body      
        header 
            是一个map集合类型 
            内部的key-value为该事件的元数据信息,主要用来区分不同的事件 
        body 
            是一个字节数组类型
            body为我们真正要传输的数据

flume的安装使用

flume-ng-1.6.0-cdh5.14.2

安装 
    1、上次解压flume的安装包 
        $ tar zxvf  /opt/softwares/flume-ng-1.6.0-cdh5.14.2.tar.gz -C /opt/cdh-5.14.2/
        $ mv apache-flume-1.6.0-cdh5.14.2-bin/ flume-1.6.0-cdh5.14.2  修改目录名称-可选  
    2、修改flume配置文件 
        $ mv conf/flume-env.sh.template conf/flume-env.sh  修改后环境配置文件才能生效
        $ vi conf/flume-env.sh 
            export JAVA_HOME=/opt/cdh-5.14.2/jdk1.8.0_112
    3、针对不同的场景需求配置对应的 java 属性配置文件并启动flume-agent进程  
        
        如何启动一个flume-agent进程  
        $ bin/flume-ng agent  \
        --name或-n 当前flume-agent实例的别名  \
        --conf或-c 当前flume框架的配置文件目录 \
        --conf-file,-f 与当前要启动的flume-agent进程相匹配的java属性配置文件的本地路径  
            
        Usage: bin/flume-ng <command> [options]...

案例

案例一:

flume官方简单案例

定义一个flume-agent去监听读取某台服务器上的某个端口中的数据,并将监听读取到的数据最终写入到flume框架自己的日志文件中
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = centos01
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
提交测试: 
    $ bin/flume-ng agent -n a1 -c conf/ -f conf/netcat2logger.properties &  
确定目标服务器的端口是否已经成功被flume-agent代理进程监听  
    $ netstat -antp |grep 44444     --查看端口信息 
    $ ps -ef | grep flume  -- 查看进程信息  
    
安装一个telnet工具并连接服务器端口写入数据  
    $ sudo yum -y install telnet
    
    发送消息数据  
    
检查flume的日志文件中的数据 
    $ tail -f logs/flume.log

案例二:

要求使用flume实时监控读取系统本地一个日志文件中动态追加的日志数据并实时写入到hdfs上的某个目录下

# example.conf: A single-node Flume configuration
#同一台Linux上可开启多个flume-agent,但agent别名要区分
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
#依靠的是Linux的命令读取本地文件,Linux的命令不停止flume就不停
a2.sources.r2.type = exec
# tail -F 文件名  即使没有这个-F后面指定的文件,命令也不会停止,容错能力强
a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log




# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100




#声明a2的sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog
a2.sinks.k2.hdfs.filePrefix = nginxData





# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

报错首先找logs目录

报错:找不到类

缺少jar包

[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/hadoop-common-2.6.0-cdh5.14.2.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/     
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/hadoop-auth-2.6.0-cdh5.14.2.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/

案例三:

案例二的优化:

解决生成的文件过多过小的问题(希望文件的大小=128M) 
将日志文件按照日期分目录存储(按照天分目录存储)  
将生成的日志文件的格式改为Text文本格式

修改案例二的flume-agent属性文件

# 声明当前flume-agent的别名及当前的flume-agent实例包含的模块的别名和个数
a2.sources = s2
a2.channels = c2
a2.sinks = k2

# 定义source模块中的s2的类型及与此类型相关的延伸属性 
# exec类型的source可以借助执行一条linux shell命令实现读取linux系统上某个文件中的日志数据,其中 cat是一次性读取,tail可以实现实时读取新增加的数据  
# shell属性用来声明要执行的命令的运行环境
a2.sources.s2.type = exec
a2.sources.s2.command = tail -F /opt/nginx/access.log 
a2.sources.s2.shell = /bin/sh -c


# 定义channel模块中的c2的类型及与此类型相关的延伸属性  
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100


# 定义sink模块中的k2的类型及与此类型相关的延伸属性 
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.134.101:8020/flume-demo2/%Y%m%d
#启用根据时间生成路径中的转义字符的具体的时间值
a2.sinks.k2.hdfs.round = true
#表示使用本地linux系统时间戳作为时间基准,否则会自动参考事件的header中的时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true

#设置文件的前缀
a2.sinks.k2.hdfs.filePrefix = NgnixLog


#设置解决文件过多过小问题
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 128000000
a2.sinks.k2.hdfs.rollCount = 0
#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1


#批量写入到hdfs上文件中的最大event数量
#batchSize的值需要小于等于transactionCapacity的值 
#从性能上考虑,最优的是batchSize=transactionCapacity 
a2.sinks.k2.hdfs.batchSize = 100

# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile
a2.sinks.k2.hdfs.fileType = DataStream
# 写入到hdfs上的文件的格式(序列化方法) 
# 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 
a2.sinks.k2.hdfs.writeFormat = Text 


# 将a2中的source及sink模块绑定到对应的channel模块上 
# 一个source模块可以同时绑定多个channel模块,但是一个sink模块只能绑定一个唯一的channel
a2.sources.s2.channels = c2
a2.sinks.k2.channel = c2

案例四:

利用flume监控某个目录下的日志文件,当某个目录下出现符合要求的文件名称的文件时,则对文件中的日志数据进行读取,并将数据最终写入到hdfs上

目录
    /opt/data/logs
        nginx-access.log.2018120309 
        nginx-access.log.2018120310
# example.conf: A single-node Flume configuration
#同一台Linux上可开启多个flume-agent,但agent别名要区分
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
# includePattern 用正则表达式指定要包含的文件
# ignorePattern  用正则表达式指定要忽略的文件
a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /home/chen/mylogs
# 由于每次读完会给读完的文件增加.COMPLETED从而形成新文件,需要忽略这些文件
a2.sources.r2.ignorePattern = ^.*\.COMPLETED$
# includePattern和ignorePattern会同时生效
a2.sources.r2.includePattern =     ^.*$




# Use a channel
# file类型更安全
# memory类型效率更高
a2.channels.c2.type = file
a2.channels.c2.dataDirs = /opt/modules/flume-1.6.0-cdh5.14.2/data




#声明a2的sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog/%y%m%d
#启用根据时间生成转义字符的具体的时间值
a2.sinks.k2.hdfs.round = true
#使用本地linux系统时间戳作为时间基准,否则会自动参考事件的header中的时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true

a2.sinks.k2.hdfs.filePrefix = nginxData


#设置解决文件过多过小问题
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 128000000
a2.sinks.k2.hdfs.rollCount = 0
#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1

#批量写入到hdfs上文件中的最大event数量
#batchSize的值需要小于等于transactionCapacity的值 
#从性能上考虑,最优的是batchSize=transactionCapacity 
a2.sinks.k2.hdfs.batchSize = 100


# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile
a2.sinks.k2.hdfs.fileType = DataStream
# 写入到hdfs上的文件的格式(序列化方法) 
# 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 
a2.sinks.k2.hdfs.writeFormat = Text


# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

案列五:

需求:            
Nginx服务器集群 -- 10台  
    每台服务器上都有一个access.log日志文件  
    需要将每台服务器上的日志文件中追加的日志数据实时读取并写入到hdfs上 
    
    
    思路1: 
        每台Nginx服务器上启动一个flume-agent 
            source - exec  
            channel - mem 
            sink - hdfs  
        多个flume-agent同时写入数据到hfds上不利于hdfs的稳定性 

    思路2: 
        每台Nginx服务器上启动一个flume-agent 
            source - exec  
            channel - mem 
            sink - avro   
                type = avro 
                hostname = 主机名
                port =  端口号 
            将数据统一写入到某台服务器某个端口中 
            
        启动一个负责对汇总后的数据统一写入到目的地的flum-agent 
            source - avro   
                type = avro
                bind = 
                port = 
            channel - mem 
            sink - hdfs

nginxs2flume.properties

# example.conf: A single-node Flume configuration
#同一台Linux上可开启多个flume-agent,但agent别名要区分
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
#依靠的是Linux的命令读取本地文件,Linux的命令不停止flume就不停
a2.sources.r2.type = exec
# tail -F 文件名  即使没有这个-F后面指定的文件,命令也不会停止,容错能力强
a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log




# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100




#声明a2的sink
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = centos01
a2.sinks.k2.port = 6666


# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

flume2hdfs.properties

# example.conf: A single-node Flume configuration
#同一台Linux上可开启多个flume-agent,但agent别名要区分
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = avro
a3.sources.r3.bind = centos01
a3.sources.r3.port = 6666




# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100




#声明a3的sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://centos01:8020/flume/weblog/test
a3.sinks.k3.hdfs.writeFormat = Text 

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

以上所述就是小编给大家介绍的《Flume浅度学习指南》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Learn Python the Hard Way

Learn Python the Hard Way

Zed A. Shaw / Addison-Wesley Professional / 2013-10-11 / USD 39.99

Master Python and become a programmer-even if you never thought you could! This breakthrough book and CD can help practically anyone get started in programming. It's called "The Hard Way," but it's re......一起来看看 《Learn Python the Hard Way》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具