fluentd是一个实时的数据收集系统,不仅可以收集日志,还可以收集定期执行的命令输出和HTTP请求内容。数据被收集后按照用户配置的解析规则,形成一系列event。每一个event包含如下内容:
tag = xxxtime = xxxrecord = { "key1": "value1", "key2": "value2"}其中:
fluentd支持多种数据的解析过滤和输出操作。其中常用的有:
接下来以官网介绍为基础,穿插自己的理解,介绍下fluentd的使用方法。
官网安装步骤链接:https://docs.fluentd.org/installation/install-by-rpm
下面是精简的在CentOS下的安装步骤。打开shell,执行如下命令:
curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh systemctl start td-agent可以安装并启动fluentd。
编辑fluentd配置文件的方法:
vim /etc/td-agent/td-agent.conf
默认来说fluentd使用td-agent用户启动。如果需要修改fluentd的用户,需要执行:
vim /usr/lib/systemd/system/td-agent.service
文件内容如下所示:
[Unit]Description=td-agent: Fluentd based data collector for Treasure DataDocumentation=https://docs.treasuredata.com/articles/td-agentAfter=network-online.targetWants=network-online.target [Service]User=td-agentGroup=td-agentLimitNOFILE=65536Environment=LD_PRELOAD=/opt/td-agent/embedded/lib/libjemalloc.soEnvironment=GEM_HOME=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/Environment=GEM_PATH=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/Environment=FLUENT_CONF=/etc/td-agent/td-agent.confEnvironment=FLUENT_PLUGIN=/etc/td-agent/pluginEnvironment=FLUENT_SOCKET=/var/run/td-agent/td-agent.sockEnvironment=TD_AGENT_LOG_FILE=/var/log/td-agent/td-agent.logEnvironment=TD_AGENT_OPTIONS=EnvironmentFile=-/etc/sysconfig/td-agentPIDFile=/var/run/td-agent/td-agent.pidRuntimeDirectory=td-agentType=forkingExecStart=/opt/td-agent/embedded/bin/fluentd --log $TD_AGENT_LOG_FILE --daemon /var/run/td-agent/td-agent.pid $TD_AGENT_OPTIONSExecStop=/bin/kill -TERM ${MAINPID}ExecReload=/bin/kill -HUP ${MAINPID}Restart=alwaysTimeoutStopSec=120 [Install]WantedBy=multi-user.target修改Service部分User和Group配置项可以更改fluentd进程的用户和组。
在shell中运行:
/opt/td-agent/embedded/bin/fluentd -c /etc/td-agent/td-agent.conf
观察输出,如果有错误会给出对应提示。
fluentd以tag值为基准,决定数据的流经哪些处理器。
数据的流向为:source -> parser -> filter -> output
增量读取 日志文件 。需要提供一个用于标记已经读取到位置的文件(position file)所在的路径。
tail针对日志滚动的支持:
tail方式采用跟踪文件inode的方式进行。比如日志名为app.log,如果日志发生滚动,被重命名为app.log.1。文件重命名的时候inode是不会改变的。因此发生滚动时写入到旧文件末尾的日志也可以被收集到。tail会跟踪旧文件的inode一段时间(rotate_wait配置),这段时间过去之后,tail不再监听app.log.1,开始监听新的app.log文件。
tail方式的示例配置:
<source> @type tail path /var/log/httpd-access.log pos_file /var/log/td-agent/httpd-access.log.pos tag apache.access <parse> @type apache2 </parse></source>注意:如果文件发生修改会输出全量文件内容。
tag:数据源的tag值。*号可以扩展为path(/替换为.)。例如
path /path/to/filetag foo.*tag会被扩展为foo.path.to.file
path:配置读取的路径。可以使用*或者是strftime。例如:
path /path/to/%Y/%m/%d/*
如果今天是2020年1月2日,fluentd会读取/path/to/2020/01/02目录下的内容。
也可以配置多个路径,使用逗号分隔:
path /path/to/a/*,/path/to/b/c.log
exclude_path:排除部分目录或文件,使用数组格式配置。
path /path/to/*exclude_path ["/path/to/*.gz", "/path/to/*.zip"]refresh_interval:多长时间刷新一次文件监听列表,配合*使用才有意义。
pos_file:位置文件地址。这个文件保存了监听的日志文件已经读取到第几行。该项一定要配置。
注意,不要再多个source之间共用pos file,否则会出现问题。
pos_file_compaction_interval:pos file文件压缩时间间隔。用于压缩pos file中不再监听的记录,不可解析的记录以及重复的记录。
parse标签:用于指定log的解析器(必须的配置项)。
例如:
# json<parse> @type json</parse> # regexp<parse> @type regexp expression ^(?<name>[^ ]*) (?<user>[^ ]*) (?<age>\d*)$</parse>path_key:如果配置此项,监控文件的path会在event中,此项的key为path_key。
例如:
path /path/to/access.logpath_key tailed_path生成的数据如下所示:
{"tailed_path":"/path/to/access.log","k1":"v1",...,"kN":"vN"}
rotate_wait:日志发生滚动的时候,可能会有部分日志仍然输出在旧的日志文件,此时需要保持监听旧日志文件一段时间,这个时间配置就是rotate_wait。
周期性执行命令,抽取命令输出为event。
示例配置:
<source> @type exec command cmd arg arg <parse> keys k1,k2,k3 </parse> <extract> tag_key k1 time_key k2 time_format %Y-%m-%d %H:%M:%S </extract> run_interval 10s</source>以上命令的含义为每10秒钟执行cmd arg arg命令,提取命令执行结果,以空白字符分隔三个字段的值为k1,k2,k3。其中k1的值作为tag,k2作为时间字段,使用%Y-%m-%d %H:%M:%S格式。
一个例子,周期获取系统的平均负载。配置方法如下:
<source> @type exec tag system.loadavg command cat /proc/loadavg | cut -d ' ' -f 1,2,3 run_interval 1m <parse> @type tsv keys avg1,avg5,avg15 delimiter " " </parse></source>输出的日志格式为:
2018-06-29 17:27:35.115878527 +0900 system.loadavg: {"avg1":"0.30","avg5":"0.20","avg15":"0.05"}
连接rsyslog。可以作为rsyslog的接收端。
一个配置的例子:
<source> @type syslog port 5140 bind 0.0.0.0 tag system</source>fluentd打开5140端口监听rsyslog发来的log。
rsyslog配置文件/etc/rsyslog.conf设置为:
# Send log messages to Fluentd*.* @127.0.0.1:5140fluentd解析到的event格式如下:
tag = "#{@tag}.#{facility}.#{priority}"time = 1353436518,record = { "host": "host", "ident": "ident", "pid": "12345", "message": "text"}专用于测试的数据源。周期产生假数据。
配置举例:
<source> @type dummy dummy {"hello":"world"}</source>dummy常用参数:
用于接收其他fluentd forward过来的event。
示例配置:
<source> @type forward port 24224 bind 0.0.0.0</source>输出event为文件。默认每天输出一个日志文件。
示例配置:
<match pattern> @type file path /var/log/fluent/myapp compress gzip <buffer> timekey 1d timekey_use_utc true timekey_wait 10m </buffer></match>包含的参数类型:
path /path/to/${tag}/${key1}/file.%Y%m%d<buffer tag,time,key1> # buffer parameters</buffer>注意:buffer标签后面的内容为buffer chunk key。Buffer根据这些key分段。
.log。将event转发到其他的fluentd节点。如果配置了多个fluentd节点,会使用负载均衡和支持容错的方式发送。如果需要发送多份数据,需要使用copy。
配置示例:
<match pattern> @type forward send_timeout 60s recover_wait 10s hard_timeout 60s <server> name myserver1 host 192.168.1.3 port 24224 weight 60 </server> <server> name myserver2 host 192.168.1.4 port 24224 weight 60 </server> ... <secondary> @type file path /var/log/fluent/forward-failed </secondary></match>server标签内可以配置如下字段:
多路输出(复制event到多个输出端)
示例配置
<match pattern> @type copy <store> @type file path /var/log/fluent/myapp1 ... </store> <store> ... </store> <store> ... </store></match>其中每一个store是一路输出。
重要参数:
<match app.**> @type copy <store> @type plugin1 </store> <store> @type plugin2 </store></match>假如plugin1出现错误,plugin2也不会执行。如果在plugin1的store添加上ignore_error参数,如下所示:
<match app.**> @type copy <store ignore_error> @type plugin1 </store> <store> @type plugin2 </store></match>上述情况plugin2的运行不受影响。通常为不重要的store添加ignore_error参数。
通过http请求的方式发送event。
payload的格式由format标签决定。
示例配置:
<match pattern> @type http endpoint http://logserver.com:9000/api open_timeout 2 <format> @type json </format> <buffer> flush_interval 10s </buffer></match>该例子使用http方式将event发送到http://logserver.com:9000/api,使用post方式,连接超时时间为2秒。输出格式为json,每10秒钟输出一次。
注意:
如果使用JSON的方式发送,HTTP请求的content-type为application/x-ndjson (newline-delimited JSONs)。如果用spring mvc接收会提示不支持。可以使用HTTPServletRequest接收request body。
标准输出的模式,如果使用后台模式运行fluentd,输出到fluentd的日志。多用于debug的时候。
配置方法:
<match pattern> @type stdout</match>输出event到elasticsearch。
示例配置:
<match my.logs> @type elasticsearch host localhost port 9200 logstash_format true</match>可选参数:
logstash-%Y.%m.%d),默认不启用logstash把event输出到kafka。
示例配置如下:
<match pattern> @type kafka2 # list of seed brokers brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port> use_event_time true # buffer settings <buffer topic> @type file path /var/log/td-agent/buffer/td flush_interval 3s </buffer> # data type settings <format> @type json </format> # topic settings topic_key topic default_topic messages # producer settings required_acks -1 compression_codec gzip</match>重要的参数为:
event通过REST方式写入到HDFS。
core-site.xml
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://10.180.210.172:9000</value> </property></configuration>hdfs-site.xml
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.http.address</name> <value>0.0.0.0:50070</value> </property> <property> <name>dfs.webhdfs.enabled</name> <value>true</value> </property> <property> <name>dfs.support.append</name> <value>true</value> </property> <property> <name>dfs.support.broken.append</name> <value>true</value> </property></configuration>最后执行$HADOOP_HOME/sbin/httpfs.sh start命令启动webhdfs支持。
注意:此时webhdfs的端口号为50070。
示例配置:
<match access.**> @type webhdfs host namenode.your.cluster.local port 50070 path "/path/on/hdfs/access.log.%Y%m%d_%H.#{Socket.gethostname}.log" <buffer> flush_interval 10s </buffer></match>注意:需要保证HDFS的目标目录具有写入权限。debug过程发现fluentd请求webhdfs没有使用user proxy,HDFS认为操作的用户为dr.who,无法创建文件。为了解决这个问题,设置HDFS目标目录的权限为777。
重要参数:
\%Y: year including the century (at least 4 digits)\%m: month of the year (01..12)\%d: Day of the month (01..31)\%H: Hour of the day, 24-hour clock (00..23)\%M: Minute of the hour (00..59)\%S: Second of the minute (00..60)输出参数:
使用正则 表达式 命名分组的方式从日志(一行或多行)中提取信息。可以通过time_key指定event的time字段的名字。名字为time字段名的分组内容会被抽取为event时间。
一个在线测试正则表达式的工具:http://fluentular.herokuapp.com/
基本配置格式:
<parse> @type regexp expression /.../</parse>正则表达式可以添加额外的参数:
忽略大小写:/.../i
多行匹配:/.../m。注意,此时.匹配新行
同时使用忽略大小写和多行匹配:/.../im
一个例子,示例配置如下:
<parse> @type regexp expression /^\[(?<logtime>[^\]]*)\] (?<name>[^ ]*) (?<title>[^ ]*) (?<id>\d*)$/ time_key logtime time_format %Y-%m-%d %H:%M:%S %z types id:integer</parse>如下的数据:
[2013-02-28 12:00:00 +0900] alice engineer 1
会被解析为:
time:1362020400 (2013-02-28 12:00:00 +0900) record:{ "name" : "alice", "title": "engineer", "id" : 1}record_transformer用来修改event的结构,增加或修改字段。
一个record_transformer的例子:
<filter foo.bar> @type record_transformer <record> hostname "#{Socket.gethostname}" tag ${tag} </record></filter>这个filter匹配tag为foo.bar的source。event增加了两个新的字段:hostname和tag。
其中hostname这里使用了ruby表达式。tag使用了字符串插值。
如果数据为:
{"message":"hello world!"}
会被转换为:
{"message":"hello world!", "hostname":"db001.internal.example.com", "tag":"foo.bar"}
可以通过添加enable_ruby配置,在${}中使用ruby表达式。
例如:
<filter foo.bar> @type record_transformer enable_ruby <record> avg ${record["total"] / record["count"]} </record></filter>如下输入:
{"total":100, "count":10}
会被转换为:
{"total":100, "count":10, "avg":"10"}
注意,可以启用auto_typecast true配置实现自动类型转换。
修改字段的例子:
<filter foo.bar> @type record_transformer <record> message yay, ${record["message"]} </record></filter>如下输入:
{"message":"hello world!"}
会被修改为:
{"message":"yay, hello world!"}
可以在表达式中配置tag_parts变量,引用tag的第n部分。如下所示:
<filter web.*> @type record_transformer <record> service_name ${tag_parts[1]} </record></filter>如果遇到tag为web.auth的数据:
{"user_id":1, "status":"ok"}
会被转换为:
{"user_id":1, "status":"ok", "service_name":"auth"}
record标签的语法为:
<record> NEW_FIELD NEW_VALUE</record>表达式中可以配置如下变量:
record["count"]#{Socket.gethostname}作用一样.分隔,获取tag的第N部分例如tag为debug.my.app,tag_parts[1]返回my。tag_prefix和tag_suffix的结果如下:
tag_prefix[0] = debug tag_suffix[0] = debug.my.apptag_prefix[1] = debug.my tag_suffix[1] = my.apptag_prefix[2] = debug.my.app tag_suffix[2] = app<match>和<filter>标签可以使用通配符和扩展。
tag以.为分隔符,分隔为多个部分。
fluentd支持的通配符和扩展有:*:只匹配一个部分。比如a.*匹配a.b,但是不匹配a或a.b.c。**:匹配0个或多个部分。比如a.**匹配a,a.b和a.b.c。{X,Y,Z}:匹配X或Y或Z。#{expression}:使用嵌入的ruby表达式。有一些快捷变量可以直接使用,例如#{hostname}和#{worker_id}。${..}:使用变量值,tag,record
可以使用如下的方式指定默认值。例如:#{ENV["FOOBAR"] || use_default}。如果FOOBAR环境变量不存在,则使用use_default这个值。
注意:match标签的匹配过程是有顺序的。比如说下面的例子:
<match **> @type blackhole_plugin</match> <match myapp.access> @type file path /var/log/fluent/access</match>因为上面的match总是能被匹配到,下面的match永远没有机会执行。
buffer为fluentd很关键的配置,意为缓冲区。可以决定收集的数据存入什么介质,多长时间输出一次等。
buffer标签必须配置在match标签内(即在输出端配置)。
buffer具有一个@type属性,用来配置buffer的储存介质:
<buffer> @type file</buffer>@type有两个值:
buffer标签后面可以跟随chunk keys,用来决定buffer以record的什么字段来分段存放。例如:
<buffer ARGUMENT_CHUNK_KEYS> # ...</buffer>注意:
buffer如果使用time作为chunk key,可以按照时间对buffer进行分段。其中:
官网的例子如下:
<match tag.**> # ... <buffer time> timekey 1h # chunks per hours ("3600" also available) timekey_wait 5m # 5mins delay for flush ("300" also available) </buffer></match> # Time chunk key: events will be separated for hours (by timekey 3600) 11:59:30 web.access {"key1":"yay","key2":100} ------> CHUNK_A 12:00:01 web.access {"key1":"foo","key2":200} --| |---> CHUNK_B12:00:25 ssh.login {"key1":"yay","key2":100} --|部分经常用到的配置参数:
可以通过@include 配置文件路径方式,引用其他配置文件片段到fluentd主配置文件中。
配置文件路径可以使用绝对路径或相对路径。相对路径的基准路径为fluentd主配置文件所在的路径。
@include可以出现在主配置文件的任何位置。
通过配置fluentd logging driver的方式实现。
该driver发送的log信息包含:
| 字段 | 描述 |
|---|---|
| container_id | 64字符的container id |
| container_name | container名字 |
| source | stdout或stderr |
| log | container的log |
修改/etc/docker/daemon.json,增加如下内容:
{ "log-driver": "fluentd", "log-opts": { "fluentd-address": "fluentdhost:24224" }}然后重启docker daemon使配置生效。
也可以通过添加--log-driver和--log-opt参数的方式指定某个container使用fluentd logging driver。如下所示:
docker run --log-driver=fluentd --log-opt fluentd-address=fluentdhost:24224
可以通过在--log-opt后指定tag的方式,确定source的tag。
Docker官网参考链接:https://docs.docker.com/config/containers/logging/fluentd/
采集/root/my.txt文件(内容格式为key value),并发送到http://localhost:9090/。
fluentd的配置文件如下:
<source> @type tail path /root/my.txt pos_file /root/my.txt.pos tag my <parse> @type regexp expression /(?<key>\w+)\s(?<value>\w+)/ </parse></source> <match my> @type http endpoint http://localhost:9090/ open_timeout 2 http_method post <format> @type json </format> <buffer> flush_interval 3s </buffer></match>提取用户操作记录,打印到fluentd日志。
<source> @type tail # 这里使用HISTFILE环境变量,如果没有设置,使用默认值/root/.bash_history path "#{ENV["HISTFILE"] || /root/.bash_history}" pos_file /root/.bash_history.pos tag history <parse> @type none </parse></source> <filter history> @type record_transformer <record> hostname ${hostname} </record></filter> <match history> @type stdout</match>收集用户操作记录转发到另一个fluentd节点,同时将数据发送到Kafka和存入HDFS。
数据流为:fluentd采集端 -> fluentd收集端 -> kafka和HDFS
示例用户操作记录数据为:
root pts/1 2020-03-26 10:59 (10.180.206.1):root 2020-03-26 11:00:09 130 tail -f /var/log/command.his.log
采集节点的配置:
<source> @type tail path /var/log/command.his.log pos_file /var/log/command.his.log.pos tag history <parse> @type regexp # 使用正则解析日志文件 expression /^(?<who_user>\w+)\s(?<pts>\S+)\s(?<who_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2})\s\((?<remote_ip>\d+\.\d+\.\d+\.\d+)\):(?<user>\w+)\s(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\s(?<res>\d+)\s(?<command>.+)$/ time_key time </parse></source><filter history> @type record_transformer <record> # event内容增加hostname这一行 hostname ${hostname} </record></filter> <match history> @type forward send_timeout 60s recover_wait 10s hard_timeout 60s <buffer> # 1秒钟向另一个fluentd节点转发一次 flush_interval 1s </buffer> <server> name myserver1 host 10.180.210.172 port 24225 weight 60 </server></match>fluentd收集节点的配置:
<source> @type forward port 24225 bind 0.0.0.0 tag remote</source> <match remote> # 使用copy方式,分两路输出 @type copy <store> @type kafka2 brokers 10.180.210.172:9092 use_event_time true <buffer topic> @type file path /var/log/td-agent/buffer/td flush_interval 3s </buffer> <format> @type json </format> default_topic history required_acks -1 </store> <store> @type webhdfs host 10.180.210.172 port 50070 path "/history/access.log.%Y%m%d_%H.#{Socket.gethostname}.log" <buffer> flush_interval 60s </buffer> </store></match>
免责声明:本文系网络转载或改编,未找到原创作者,版权归原作者所有。如涉及版权,请联系删