Skip to content

flume_plugin_regex

ouyangzhe edited this page Feb 17, 2017 · 1 revision

Flume的原理、架构,以及核心组件的介绍请参考 Flume-ng的原理和使用。本文将构建一个使用Datahub Sink的Flume实例,对Web Server的Access日志文件进行解析,并上传到Datahub Topic中。

需要上传的日志文件格式如下:

# cat access_log_data
- - - [16/Jan/2017:07:37:38 +0000] "GET /vendor/bootstrap.min.css HTTP/1.1" 200 106006 "http://localhost/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36" "-"
- - - [16/Jan/2017:07:37:38 +0000] "GET /vendor/prettify.css HTTP/1.1" 200 1482 "http://localhost/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36" "-"
- - - [16/Jan/2017:07:37:38 +0000] "GET /css/style.css HTTP/1.1" 200 9819 "http://localhost/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36" "-"

下面将创建Datahub Topic,并把每行日志的请求时间和请求信息作为一条录写入Topic中。

创建Datahub Topic

使用datahub console创建topic语句示例如下:

ct test_project test_topic 1 1 (string access_time, string request);

Flume配置文件

在Flume安装目录的conf/文件夹下创建名为datahub_regex.conf的文件,并输入内容如下:

# A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/access_log_data

# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.project = test_project
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.batchSize = 1
a1.sinks.k1.serializer = REGEX
a1.sinks.k1.serializer.regex = ^.*\\[(.*)]\ \"(.*)\"\ \\d{3}.*$
a1.sinks.k1.serializer.fieldnames = access_time,request
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里serializer配置指定了正则匹配表达式,匹配请求时间和请求内容,即第一个方括号和第一个引号里的内容。

启动Flume

配置完成后,启动Flume并指定agent的名称和配置文件路径,添加**-Dflume.root.logger=INFO,console**选项可以将日志实时输出到控制台。

$ cd {YOUR_FLUME_DIRECTORY}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_rege.conf -Dflume.root.logger=INFO,console

写入成功,显示日志如下:

...
Write success. Sink: k1, Event count: 3
...
Clone this wiki locally