【云计算】大数据学习之flume使用示例
小标 2018-12-24 来源 : 阅读 886 评论 0

摘要:本文主要向大家介绍了【云计算】大数据学习之flume使用示例,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】大数据学习之flume使用示例,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。


资讯 安全 论坛 下载 读书 程序开发 数据库 系统 网络 电子书 微信学院 站长学院 QQ 考试

频道栏目

路由器| 交换机| 网络协议| 组网| 疑难| 其他| 云计算|

登录注册

首页 > 网络 > 云计算 >  正文

大数据学习之flume使用示例

2018-08-29 10:20:23             

收藏   我要投稿

因为flume是面对各种场景的,所以当我们面对具体场景的时候,我们需要提供一套配置文件。

告诉他source用哪一种,channel用哪一种,sink用哪一种。我们知道source用哪一种之后还要告诉他文件在哪里。

sink也是,比如我们要往hdfs中存,我们要告诉他实现类用hdfs,接着hdfs的实现类需要参数。

配置文件解析:

1.我们首先需要给flume起一个名字,我们这里为agent1.

2.定义三大组件的名称:

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

3.配置source组件用来读一个目录下的东西,

spooldir是一种实现类,这是flume内置的一种实现类,只要目录下有新的东西就会被读走。

而这种实现类它自己也需要参数,不然它不知道去哪读,所以spoolDir就是需要读的目录。

fileHeader为是否要加文件头,我们暂时用不到。

agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /root/logs/
agent1.sources.source1.fileHeader = false

4.配置拦截器,这个source可以带拦截器,sink也可以带拦截器,拦截器可以不配,我们先别管这个

agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname

5.配置sink组件

sink1的类型为hdfs

filePrefix为文件名前缀,为什么它不直接使用文件名,因为,它是去读文件中的内容,每读一行封装成一个event,甚至一个文件中的内容最后会被写到两个文件中。为什么不把文件名定死?,因为这样的话所有的内容都会写到一个文件中。所以写文件也是会滚动的,写到一定的大小就会生成一个新的文件。不管生成什么文件,我们可以指定它的文件名的前缀,但是中间一定会出现随机码或者时间戳。我们甚至可以指定后缀。

maxOpenFiles的意思是,我们同时可以在hdfs上打开多少个文件去写。

batchSize是批处理的意思,我们如果一个event一个event的去写会很慢,所以我们可以指定一批包括多少个event,然后一批一批的处理。

我们要把数据写到文件中肯定会有格式,如果是文本文件的话,就是

fileType = DataStream

writeFormat = Text

rollSize是按照文件的大小滚动,1024000就是100k滚动一次。

rollCount是按照event的条数滚动,

rollInterval就是按照时间间隔滚动,60就是60秒滚动一次。

如果三个滚动都配置了,那么三个滚动都会起作用。

round是管理文件夹的滚动的。

首先要起作用的话就要让round = true

roundValue与roundUnit配合起作用,如果roundValue=10,roundUnit=minute

就是每10分钟滚动一次。

useLocalTimeStamp的意思就是当path中的通配符获取时间的时候,是否使用本地时间

agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://marshal:9000/weblog/flume-collection/%Y-%m-%d/%H-%M/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize = 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.rollSize = 1024
agent1.sinks.sink1.hdfs.rollCount = 100
agent1.sinks.sink1.hdfs.rollInterval = 20
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 1
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

channel配置:

type为channel的类型,设置为memory内存

keep-alive为能保存多久

capacity为能保存多少个event

transactionCapacity为内部的事务控制,它可以记住你上次传过哪些。

agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

接下来我们需要告诉他哪个source连哪个channel,连哪个sink

因为一个agent可以连多个source,多个channel,多个sink

agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

接下来我们需要将上面的配置写入(从文件夹拿文件放入hdfs的采集方案)。

vi spoondir-hdfs.conf

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /root/logs/
agent1.sources.source1.fileHeader = false

agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname

agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://marshal:9000/weblog/flume-collection/%Y-%m-%d/%H-%M/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize = 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.rollSize = 1024
agent1.sinks.sink1.hdfs.rollCount = 100
agent1.sinks.sink1.hdfs.rollInterval = 20
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 1
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

保存之后我们就可以启动flume,让他用这个配置文件产生一个实例。

我们启动之后可以看到我们采集过的文件的名字发生了变化:

只要有新的日志进去就会马上被采集走。

但是只要我们传的文件名相同,flume就会挂掉。

我们启动的时候加上-Dflume.root.logger=INFO.console可以查看更详细的信息。

×loading..

   

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 1
看完这篇文章有何感觉?已经有1人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程