Flume

Flume概述

  • 官网:http://flume.apache.org/

  • Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错能力。它使用简单的可扩展数据模型,允许在线分析应用程序。(多用于数据采集)

  • 数据从哪里来

  1. 爬虫
  2. 日志数据 flume
  3. 传统型数据库 sqoop

Flume架构

  1. source:数据源

    产生数据流,同时source将产生的数据流传输到channel

  2. channel:传输通道

    用于桥接source和sinks

  3. sinks:下沉

    从channel收集数据

  4. event:传输单元(事件)

    Flume数据传输的基本单元,以事件的形式将数据送往目的地


Flume安装配置

  • 修改配置文件
    1
    export JAVA_HOME=path

Flume的使用

  • 单channel/sink
  1. 准备配置文件flumejob_telnet.conf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #smple.conf: A single-node Flume configuration

    # Name the components on this agent 定义变量方便调用 加s可以有多个此角色
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source 描述source角色 进行内容定制
    # 此配置属于tcp source 必须是netcat类型
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444

    # Describe the sink 输出日志文件
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory(file) 使用内存 总大小1000 每次传输100
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel 一个source可以绑定多个channel
    # 一个sinks可以只能绑定一个channel 使用的是图二的模型
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  2. 通过命令执行

    1
    2
    3
    4
    5
    [root@bigdata211 flume]# bin/flume-ng agent \
    > --conf/ \ === 指定配置文件所在目录
    > --name a1 \ === 指定别名
    > --conf-file conf/flumejob_telnet.conf \ === 指定配置文件名
    > -Dflume.root.logger==INFO,console === 指定输出日志到控制台
  • 多channel/sink
  1. 准备配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
flumejob_1.conf:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

#将数据复制给多个channel
a1.sources.r1.selector.type = replicating

#设置sources
a1.sources.r1.type = exec
#设置需要监控的文件(通过command的tail -F查看日志命令)
a1.sources.r1.command = tail -F /tmp/root/hive.log
a1.sources.r1.shell = /bin/bash -c

#设置sinks
#设置数据不进行落地操作
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata211
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata211
a1.sinks.k2.port = 4142

#设置channels(通过内存传输)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

#指定sources、sinks通道
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
flumejob_2.conf
a2.sources = r1
a2.sinks = k1
a2.channels = c1

#设置sources
a2.sources.r1.type = avro
#端口抓取数据
a2.sources.r1.bind = bigdata211
a2.sources.r1.port = 4141

#设置sink,下沉到hdfs
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata211:9000/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
#最小副本数
a2.sinks.k1.hdfs.minBlockReplicas = 1

#设置channels(通过内存传输)
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#设置传输通道
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
flumejob_3.conf
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# 设置sources
a3.sources.r1.type = avro
# 端口抓取数据
a3.sources.r1.bind = bigdata211
a3.sources.r1.port = 4142

# 设置sinks
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /root/flume2

# 设置channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 设置传输通道
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
  1. 通过命令执行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    [root@bigdata211 flume]# bin/flume-ng agent \
    > --conf conf/ \
    > --name a1 \
    > --conf-file conf/flumejob_1.conf

    [root@bigdata211 flume]# bin/flume-ng agent \
    > --conf conf/ \
    > --name a1 \
    > --conf-file conf/flumejob_2.conf

    [root@bigdata211 flume]# bin/flume-ng agent \
    > --conf conf/ \
    > --name a1 \
    > --conf-file conf/flumejob_3.conf