摘要:本文主要向大家介绍了【云计算】大数据学习之flume使用示例,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
本文主要向大家介绍了【云计算】大数据学习之flume使用示例,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
资讯 安全 论坛 下载 读书 程序开发 数据库 系统 网络 电子书 微信学院 站长学院 QQ 考试
频道栏目
路由器| 交换机| 网络协议| 组网| 疑难| 其他| 云计算|
登录注册
首页 > 网络 > 云计算 > 正文
大数据学习之flume使用示例
2018-08-29 10:20:23
收藏 我要投稿
因为flume是面对各种场景的,所以当我们面对具体场景的时候,我们需要提供一套配置文件。
告诉他source用哪一种,channel用哪一种,sink用哪一种。我们知道source用哪一种之后还要告诉他文件在哪里。
sink也是,比如我们要往hdfs中存,我们要告诉他实现类用hdfs,接着hdfs的实现类需要参数。
配置文件解析:
1.我们首先需要给flume起一个名字,我们这里为agent1.
2.定义三大组件的名称:
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
3.配置source组件用来读一个目录下的东西,
spooldir是一种实现类,这是flume内置的一种实现类,只要目录下有新的东西就会被读走。
而这种实现类它自己也需要参数,不然它不知道去哪读,所以spoolDir就是需要读的目录。
fileHeader为是否要加文件头,我们暂时用不到。
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /root/logs/
agent1.sources.source1.fileHeader = false
4.配置拦截器,这个source可以带拦截器,sink也可以带拦截器,拦截器可以不配,我们先别管这个
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
5.配置sink组件
sink1的类型为hdfs
filePrefix为文件名前缀,为什么它不直接使用文件名,因为,它是去读文件中的内容,每读一行封装成一个event,甚至一个文件中的内容最后会被写到两个文件中。为什么不把文件名定死?,因为这样的话所有的内容都会写到一个文件中。所以写文件也是会滚动的,写到一定的大小就会生成一个新的文件。不管生成什么文件,我们可以指定它的文件名的前缀,但是中间一定会出现随机码或者时间戳。我们甚至可以指定后缀。
maxOpenFiles的意思是,我们同时可以在hdfs上打开多少个文件去写。
batchSize是批处理的意思,我们如果一个event一个event的去写会很慢,所以我们可以指定一批包括多少个event,然后一批一批的处理。
我们要把数据写到文件中肯定会有格式,如果是文本文件的话,就是
fileType = DataStream
writeFormat = Text
rollSize是按照文件的大小滚动,1024000就是100k滚动一次。
rollCount是按照event的条数滚动,
rollInterval就是按照时间间隔滚动,60就是60秒滚动一次。
如果三个滚动都配置了,那么三个滚动都会起作用。
round是管理文件夹的滚动的。
首先要起作用的话就要让round = true
roundValue与roundUnit配合起作用,如果roundValue=10,roundUnit=minute
就是每10分钟滚动一次。
useLocalTimeStamp的意思就是当path中的通配符获取时间的时候,是否使用本地时间
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://marshal:9000/weblog/flume-collection/%Y-%m-%d/%H-%M/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize = 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.rollSize = 1024
agent1.sinks.sink1.hdfs.rollCount = 100
agent1.sinks.sink1.hdfs.rollInterval = 20
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 1
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
channel配置:
type为channel的类型,设置为memory内存
keep-alive为能保存多久
capacity为能保存多少个event
transactionCapacity为内部的事务控制,它可以记住你上次传过哪些。
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
接下来我们需要告诉他哪个source连哪个channel,连哪个sink
因为一个agent可以连多个source,多个channel,多个sink
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
接下来我们需要将上面的配置写入(从文件夹拿文件放入hdfs的采集方案)。
vi spoondir-hdfs.conf
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /root/logs/
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://marshal:9000/weblog/flume-collection/%Y-%m-%d/%H-%M/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize = 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.rollSize = 1024
agent1.sinks.sink1.hdfs.rollCount = 100
agent1.sinks.sink1.hdfs.rollInterval = 20
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 1
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
保存之后我们就可以启动flume,让他用这个配置文件产生一个实例。
我们启动之后可以看到我们采集过的文件的名字发生了变化:
只要有新的日志进去就会马上被采集走。
但是只要我们传的文件名相同,flume就会挂掉。
我们启动的时候加上-Dflume.root.logger=INFO.console可以查看更详细的信息。
×loading..
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号