一文搞懂ELK

摘要

Logstash概述

  • Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的存储库中。

  • Logstash数据传输原理

1
2
3
1.数据采集与输入:Logstash支持各种输入选择,能够以连续的流式传输方式,轻松地从日志、指标、Web应用以及数据存储中采集数据。
2.实时解析和数据转换:通过Logstash过滤器解析各个事件,识别已命名的字段来构建结构,并将它们转换成通用格式,最终将数据从源端传输到存储库中。
3.存储与数据导出:Logstash提供多种输出选择,可以将数据发送到指定的地方。

  • Logstash通过管道完成数据的采集与处理,管道配置中包含input、output和filter(可选)插件,input和output用来配置输入和输出数据源、filter用来对数据进行过滤或预处理。

Logstash下载安装

1
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.17.3-linux-x86_64.tar.gz
  • 下载完成后解压到/usr/local/logstash目录下,解压命令如下:

1
2
mkdir /usr/local/logstash
tar -zxvf logstash-8.17.3-linux-x86_64.tar.gz -C /usr/local/logstash
  • elasticsearch和kibana都不能用root用户启动,为了统一管理,logstash也使用这个用户(非必要)

  • 创建用户elastic,并设置密码,这一步我们在安装elasticsearch的时候已经配置过了,这里就不再赘述了

1
2
useradd elastic
passwd elastic
  • 修改logstash安装目录的用户权限

1
chown -R elastic:elastic /usr/local/logstash
  • 切换到elastic用户下执行命令

1
2
3
4
# 切换用户
su - elastic
# 进入logstash安装目录
cd /usr/local/logstash/logstash-8.17.3/
  • 运行一个简单的测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# -e: 直接把配置放在命令中,这样可以有效快速进行测试
# input { stdin { } }: 输入插件,从标准输入中读取数据
# output { stdout {} }: 输出插件,将数据输出到标准输出
# 这里只包含了 input 和 output 两个部分,实际使用中还需要添加 filter 部分
./bin/logstash -e 'input { stdin { } } output { stdout {} }'
# 输出:
。。。。。。。 # 输出启动信息
The stdin plugin is now waiting for input: # 输出提示信息,提示你输入内容
hello world # 我输入内容
# logstash输出内容
{
"message" => "hello world",
"@version" => "1",
"event" => {
"original" => "hello world"
},
"host" => {
"hostname" => "ip-10-250-0-239.cn-northwest-1.compute.internal"
},
"@timestamp" => 2025-03-27T06:48:12.810617855Z
}
  • 也可以通过配置文件启动logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# vim test.conf
input {
stdin { }
}

output {
stdout { codec => rubydebug }
}

# codec => rubydebug : 是指定了一个编码器(Codec)为 rubydebug。
# 编码器负责如何格式化输出的数据。
# Rubydebug 编码器是一种特别适合开发和调试目的的编码器,因为它能以相当易读的形式展示复杂的结构化数据,比如嵌套的哈希表或者数组。
# 除了 Rubydebug 编码器,Logstash 还提供了其他几种编码器,比如 json、plain、multiline 等等。
# 具体可以参考官方文档:https://www.elastic.co/guide/en/logstash/current/codec-plugins.html
1
./bin/logstash -f ./test.conf

Logstash插件

Input Plugins

https://www.elastic.co/guide/en/logstash/8.17/input-plugins.html

  • 一个 Pipeline可以有多个input插件

1
2
3
4
5
- Stdin / File
- Beats / Log4J /Elasticsearch / JDBC / Kafka /Rabbitmq /Redis
- JMX/ HTTP / Websocket / UDP / TCP
- Google Cloud Storage / S3
- Github / Twitter

Filter Plugins

https://www.elastic.co/guide/en/logstash/8.17/filter-plugins.html

  • Filter Plugin可以对Logstash Event进行各种处理,例如解析,删除字段,类型转换

1
2
3
4
5
6
7
8
9
10
11
- Date: 日期解析
- Dissect: 分割符解析
- Grok: 正则匹配解析
- Mutate: 对字段做各种操作
- Convert : 类型转换
- Gsub : 字符串替换
- Split / Join /Merge: 字符串切割,数组合并字符串,数组合并数组
- Rename: 字段重命名
- Update / Replace: 字段内容更新替换
- Remove_field: 字段删除
- Ruby: 利用Ruby 代码来动态修改Event

Output Plugins

https://www.elastic.co/guide/en/logstash/8.17/output-plugins.html

  • 将Event发送到特定的目的地,是 Pipeline 的最后一个阶段。常见 Output Plugins:

1
2
3
4
- Elasticsearch
- Email / Pageduty
- Influxdb / Kafka / Mongodb / Opentsdb / Zabbix
- Http / TCP / Websocket

Codec Plugins

https://www.elastic.co/guide/en/logstash/8.17/codec-plugins.html

  • 将原始数据decode成Event;将Event encode成目标数据,内置的Codec Plugins:

1
2
3
- Line / Multiline
- JSON / Avro / Cef (ArcSight Common Event Format)
- Dots / Rubydebug

FileBeat概述

  • Beats 是一个免费且开放的平台,集合了多种单一用途的数据采集器。它们从成百上千或成千上万台机器和系统向 Logstash 或 Elasticsearch 发送数据。

  • FileBeat 专门用于转发和收集日志数据的轻量级采集工具。它可以作为代理安装在服务器上,FileBeat监视指定路径的日志文件,收集日志数据,并将收集到的日志转发到Elasticsearch或者Logstash。

logstash vs FileBeat

  • Logstash是在jvm上运行的,资源消耗比较大。而FileBeat是基于golang编写的,功能较少但资源消耗也比较小,更轻量级。

  • Logstash 和Filebeat都具有日志收集功能,Filebeat更轻量,占用资源更少

  • Logstash 具有Filter功能,能过滤分析日志

  • 一般结构都是Filebeat采集日志,然后发送到消息队列、Redis、MQ中,然后Logstash去获取,利用Filter功能过滤分析,然后存储到Elasticsearch中

  • FileBeat和Logstash配合,实现背压机制。当将数据发送到Logstash或 Elasticsearch时,Filebeat使用背压敏感协议,以应对更多的数据量。如果Logstash正在忙于处理数据,则会告诉Filebeat 减慢读取速度。一旦拥堵得到解决,Filebeat就会恢复到原来的步伐并继续传输数据。

FileBeat下载和安装

1
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.17.3-linux-x86_64.tar.gz
  • 下载完成后解压到/usr/local/filebeat目录下,解压命令如下:

1
2
mkdir /usr/local/filebeat
tar -zxvf filebeat-8.17.3-linux-x86_64.tar.gz -C /usr/local/filebeat
  • 修改logstash安装目录的用户权限

1
chown -R elastic:elastic /usr/local/filebeat
  • 切换到elastic用户下执行命令

1
2
3
4
# 切换用户
su - elastic
# 进入 filebeat 安装目录
cd /usr/local/filebeat/filebeat-8.17.3-linux-x86_64/

经典的ELK架构

  • Filebeat日志收集:Filebeat作为轻量级的日志收集代理,部署在客户端上,消耗资源少,能够高效地收集日志数据。

  • Logstash数据处理:Logstash作为数据处理管道,负责将Filebeat收集的日志数据进行过滤、转换等操作,然后发送到Elasticsearch进行存储。

  • Elasticsearch存储与搜索:Elasticsearch是一个基于Lucene的分布式搜索和分析引擎,提供强大的数据存储和搜索能力。

  • Kibana可视化:Kibana为Elasticsearch提供Web可视化界面,允许用户通过图表、仪表盘等方式直观地查看和分析日志数据。

  • 适用场景:经典的ELK架构主要适用于数据量较小的开发环境。然而,由于缺少消息队列的缓冲机制,当Logstash或Elasticsearch出现故障时,可能存在数据丢失的风险。

一个经典ELK架构示例

FileBeat采集Nginx服务器日志并发送到Logstash

  • 创建配置文件filebeat-nginx.yml,将其保存到Filebeat安装目录下的conf目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 因为Nginx的access.log日志都是以IP地址开头的,所以我们需要修改下匹配字段。
# 不以ip地址开头的行追加到上一行
filebeat.inputs: # 输入源配置
- type: log # 日志类型
enabled: true # 是否启用
paths: # 采集路径
- /var/log/nginx/access*.*
exclude_files: [".gz$"] # 排除以.gz 结尾的文件
multiline.pattern: '^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s' # 匹配以IP地址开头,并紧跟一个空白字符
multiline.negate: true # 不以ip地址开头的行追加到上一行
multiline.match: after # 追加到上一行的后面
# multiline: 多行日志配置,这里实际上不需要配置多行设置,因为nginx的access日志都是单行日志,这里只做演示
# pattern: 正则表达式
# negate: false,匹配pattern的行合并到上一行;true,不匹配pattern的行合并到上一行
# match: after,合并到上一行的末尾;before,合并到上一行的开头
scan_frequency: 10s # 每 10 秒扫描一次日志文件
clean_removed: true # 在日志文件被删除后是否从其内部状态中移除该文件的记录
clean_inactive: 2h # 在日志文件处于非活动状态多长时间后将其从状态数据库中移,设置为比 ignore_older + scan_frequency 更大的值
ignore_older: 1h # 忽略那些比当前时间早于 1 小时的日志文件
output.logstash: # 输出配置,这里是发送到Logstash
enabled: true # 是否启用
hosts: ["10.250.0.239:5044"] # Logstash服务地址
  • 为FileBeat的用户分配日志目录的读取权限

1
2
# 使用 setfacl命令为elastic用户分配日志目录的读取权限
sudo setfacl -d -m u:elastic:r-x -R /var/log/nginx/
  • 启动FileBeat

1
2
3
./filebeat -e -c conf/filebeat-nginx.yml
# 后台启动
nohup ./filebeat -e -c conf/filebeat-nginx.yml &
  • 在启动filebeat时可能会遇到如下错误

1
Exiting: error loading config file: config file ("conf/filebeat-nginx.yml") can only be writable by the owner but the permissions are "-rw-rw-r--" (to fix the permissions use: 'chmod go-w /usr/local/filebeat/filebeat-8.17.3-linux-x86_64/conf/filebeat-nginx.yml')
  • 因为安全原因不要其他用户写的权限,去掉写的权限就可以了

1
chmod go-w conf/filebeat-nginx.yml

配置Logstash接收FileBeat收集的数据并打印

  • 创建配置文件logstash-nginx.conf,将其保存到Logstash安装目录下的config目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 进入logstash安装目录
# vim config/logstash-nginx.conf
# 配置从FileBeat接收数据
input { # 输入源配置
beats { # 从FileBeat接收数据
port => 5044 # 监听5044端口
}
}

output { # 输出配置
stdout { # 打印到控制台
codec => rubydebug # 指定编码器
}
}
  • 测试logstash配置是否正确

1
bin/logstash -f config/logstash-nginx.conf --config.test_and_exit
  • 启动logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# reload.automatic:修改配置文件时自动重新加载
bin/logstash -f config/logstash-nginx.conf --config.reload.automatic

# 此时在控制台会看到如下输出:
{
"input" => {
"type" => "log"
},
"@version" => "1",
"message" => "1.119.161.30 - elastic [27/Mar/2025:08:52:41 +0000] \"GET /_cluster/stats HTTP/1.1\" 200 7445 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36\" \"-\"",
"log" => {
"file" => {
"path" => "/var/log/nginx/access.log"
},
"offset" => 42985
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"@timestamp" => 2025-03-27T08:52:42.947Z,
"ecs" => {
"version" => "8.0.0"
},
"agent" => {
"id" => "4dbe185b-900d-4990-b841-c13bf9618fc6",
"name" => "ip-10-250-0-239.cn-northwest-1.compute.internal",
"ephemeral_id" => "6bb04808-7da7-4acb-95fd-f3f915651457",
"type" => "filebeat",
"version" => "8.17.3"
},
"event" => {
"original" => "1.119.161.30 - elastic [27/Mar/2025:08:52:41 +0000] \"GET /_cluster/stats HTTP/1.1\" 200 7445 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36\" \"-\""
},
"host" => {
"name" => "ip-10-250-0-239.cn-northwest-1.compute.internal"
}
}

利用Logstash过滤器解析日志

  • 从打印结果看到包含了大量的无关数据,此时可以利用Logstash过滤器解析日志,这里使用Grok插件

  • Grok是一种将非结构化日志解析为结构化的插件。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是任意其他的日志格式。

  • Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配。

  • GROK模式参考

  • grok模式的语法是:

1
2
3
4
%{SYNTAX:SEMANTIC}
# SYNTAX(语法)指的是Grok模式名称,SEMANTIC(语义)是给模式匹配到的文本字段名。例如:
%{NUMBER:duration} %{IP:client}
# duration表示:匹配一个数字,client表示匹配一个IP地址。
  • 匹配nginx日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# nginx access日志格式
1.119.161.30 - elastic [27/Mar/2025:03:05:36 +0000] "GET /_cluster/stats HTTP/1.1" 200 7446 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36" "-"

# grok
%{IP:client_ip} - %{USER:remote_user} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{INT:http_status} %{INT:body_bytes_sent} \"%{NOTSPACE:referrer}\" \"%{GREEDYDATA:agent}\" %{QUOTEDSTRING:x_forwarded_for}
# 解释每个部分的作用:
# %{IP:client_ip}:匹配客户端 IP 地址。
# -:匹配一个破折号(-),表示匿名用户或未认证用户。
# %{USER:remote_user}:匹配远程用户名, - 表示匿名用户。
# \[%{HTTPDATE:timestamp}\]:匹配请求的时间戳,格式为 [27/Mar/2025:03:05:36 +0000]。
# \"%{WORD:verb} : 匹配请求方法 (GET)
# %{URIPATHPARAM:request} : 匹配请求路径 (/_cluster/stats)
# HTTP/%{NUMBER:httpversion}\\\":匹配 HTTP 版本 (HTTP/1.1)。
# %{INT:http_status}:匹配 HTTP 响应状态码 (200)。
# %{INT:body_bytes_sent}:匹配响应体的字节数 (7446)。
# \"%{NOTSPACE:referrer}\":匹配引用页 (Referer) 字段,这里为空 ("-")。
# \"%{GREEDYDATA:agent}\":匹配用户代理 (User-Agent) 字段。GREEDYDATA 会匹配尽可能多的数据,直到遇到下一个分隔符。
# %{QUOTEDSTRING:x_forwarded_for}:匹配 X-Forwarded-For 头字段,这里为空 ("-")。

# 匹配结果
{
"request": "/_cluster/stats",
"agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"body_bytes_sent": "7446",
"verb": "GET",
"x_forwarded_for": "\"-\"",
"remote_user": "elastic",
"referrer": "-",
"client_ip": "1.119.161.30",
"httpversion": "1.1",
"http_status": "200",
"timestamp": "27/Mar/2025:03:05:36 +0000"
}
  • 匹配Grok模式是个非常繁琐的过程,我们可以使用Kibana来进行可视化的Grok调试

  • grok配置

1
2
3
4
5
grok {
match => {
"message" => "%{IP:client_ip} - %{USER:remote_user} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{INT:http_status} %{INT:body_bytes_sent} \"%{NOTSPACE:referrer}\" \"%{GREEDYDATA:agent}\" %{QUOTEDSTRING:x_forwarded_for}"
}
}

使用mutate插件过滤掉不需要的字段

  • 除了nginx日志本身的格式外,logstash还会打印许多我们不需要的字段,此时可以使用mutate插件来过滤掉不需要的字段。

1
2
3
4
mutate {
enable_metric => "false" # 禁用指标上报
remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"] # 移除不需要的字段
}

使用Date插件对时间进行格式转换

1
2
3
4
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"] # 时间格式转换
target => "timestamp" # 目标字段
}

完整的Logstash配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 进入logstash安装目录
# vim config/logstsh-nginx.conf
# 配置从FileBeat接收数据
input { # 输入源配置
beats { # 从FileBeat接收数据
port => 5044 # 监听5044端口
}
}

filter { # 过滤器配置
grok {
match => {
"message" => "%{IP:client_ip} - %{USER:remote_user} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{INT:http_status} %{INT:body_bytes_sent} \"%{NOTSPACE:referrer}\" \"%{GREEDYDATA:agent}\" %{QUOTEDSTRING:x_forwarded_for}"
}
}

mutate {
enable_metric => "false" # 禁用指标上报
remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"] # 移除不需要的字段
}

date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"] # 时间格式转换
target => "timestamp" # 目标字段
}
}

output { # 输出配置
stdout { # 打印到控制台
codec => rubydebug # 指定编码器
}
}
  • 重新启动logstash后得到如下结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"event" => {
"original" => "1.119.161.30 - elastic [27/Mar/2025:09:49:41 +0000] \"GET /_cluster/stats HTTP/1.1\" 200 7445 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36\" \"-\""
},
"client_ip" => "1.119.161.30",
"request" => "/_cluster/stats",
"http_status" => "200",
"@timestamp" => 2025-03-27T09:49:45.751Z,
"body_bytes_sent" => "7445",
"x_forwarded_for" => "\"-\"",
"timestamp" => 2025-03-27T09:49:41.000Z,
"referrer" => "-",
"remote_user" => "elastic",
"httpversion" => "1.1",
"verb" => "GET"
}

将结果输出到ES

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 在输出配置中添加ES输出配置
output { # 输出配置
stdout { # 打印到控制台
codec => rubydebug # 指定编码器
}
elasticsearch { # 输出到ES
index => "nginx_access_log_%{+YYYY-MM}" # 索引名,按月分隔
hosts => ["https://10.250.0.239:9200"] # ES地址
user => "elastic" # ES用户名
password => "123456" # ES密码
ssl => true # 是否启用SSL,因为这里是https访问ES
ssl_certificate_verification => false # 禁用证书验证
}
}
  • 如果需要开启证书校验,可以通过如下方法进行配置

    • 获取 Elasticsearch 的 SSL 证书:
    1
    2
    # 从 Elasticsearch 的配置文件或证书文件路径中获取证书(通常是 .pem 或 .crt 文件)。
    /usr/local/elasticsearch/elasticsearch-8.17.3/config/certs/http_ca.crt
    • 导入证书到 Java 信任库:注意这里要导入到启动Logstash的JAVA进程的信任库
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    # logstash的jdk路径,默认在logstash的安装目录下
    cd /usr/local/logstash/logstash-8.17.3/jdk

    bin/keytool -importcert -alias es-cert -keystore /usr/local/logstash/logstash-8.17.3/logstash.keystore -file /usr/local/elasticsearch/elasticsearch-8.17.3/config/certs/http_ca.crt
    # 输出
    Enter keystore password: # 输入密码 这里是123456
    Re-enter new password:
    Owner: CN=Elasticsearch security auto-configuration HTTP CA
    Issuer: CN=Elasticsearch security auto-configuration HTTP CA
    Serial number: 89d5c501a2efd5d45a6ee5e08daa16bd605d8c28
    Valid from: Thu Mar 20 08:53:26 UTC 2025 until: Sun Mar 19 08:53:26 UTC 2028
    Certificate fingerprints:
    SHA1: DE:73:0C:EC:46:59:69:83:52:7C:C4:CC:6B:65:EC:B6:31:BE:10:22
    SHA256: 3F:14:1C:16:DF:C6:E1:65:89:4B:C1:67:20:84:B2:20:DC:DD:22:FF:E0:21:16:D5:1A:C1:80:03:CF:AA:5A:1D
    Signature algorithm name: SHA256withRSA
    Subject Public Key Algorithm: 4096-bit RSA key
    Version: 3

    Extensions:

    #1: ObjectId: 2.5.29.35 Criticality=false
    AuthorityKeyIdentifier [
    KeyIdentifier [
    0000: 25 EF BA 81 AE 5E 14 1C 7E FF A1 87 12 F8 D0 2E %....^..........
    0010: 3A D7 54 5F :.T_
    ]
    ]

    #2: ObjectId: 2.5.29.19 Criticality=true
    BasicConstraints:[
    CA:true
    PathLen: no limit
    ]

    #3: ObjectId: 2.5.29.14 Criticality=false
    SubjectKeyIdentifier [
    KeyIdentifier [
    0000: 25 EF BA 81 AE 5E 14 1C 7E FF A1 87 12 F8 D0 2E %....^..........
    0010: 3A D7 54 5F :.T_
    ]
    ]

    Trust this certificate? [no]: yes # yes确认
    Certificate was added to keystore
    • 配置 Logstash 使用信任库

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    output {
    elasticsearch {
    index => "nginx_access_log_%{+YYYY-MM}" # 索引名,按月分隔
    hosts => ["https://10.250.0.239:9200"] # ES地址
    user => "elastic" # ES用户名
    password => "123456" # ES密码
    ssl => true # 是否启用SSL,因为这里是https访问ES
    ssl_certificate_verification => true # 启用证书校验
    ssl_truststore_path => "/usr/local/logstash/logstash-8.17.3/logstash.keystore" # 指定信任库路径
    ssl_truststore_password => "123456" # 信任库密码
    }
    }

整合消息队列+Nginx的ELK架构

  • 消息队列:引入消息队列作为缓冲机制,确保即使在Logstash或Elasticsearch出现故障时,日志数据也不会丢失。消息队列能够均衡网络传输,降低数据丢失的可能性。

  • Nginx:Nginx作为高性能的Web和反向代理服务器,可以进一步优化整个系统的性能和可用性。它可以在负载均衡、缓存等方面发挥作用,提升用户访问体验。

  • 扩展性:由于引入了消息队列和Nginx等组件,整个架构的扩展性得到增强。可以根据实际需求动态调整各组件的资源分配和部署规模。

  • 适用场景:整合消息队列+Nginx的架构主要适用于生产环境,特别是需要处理大数据量的场景。它能够确保数据的安全性和完整性,同时提供高性能的日志处理和可视化分析服务。

  • 总结来说就是

    • filebeat将采集的日志发送到Redis\RocketMQ\Kafka等
    • Logstash再从Redis\RocketMQ\Kafka中读取日志数据,并进行解析后发送到ES中。
    • ES集群使用nginx进行负载均衡,以实现高可用和高性能。具体实现方法查看:linux下安装Elasticsearch集群

基于Redis的ELK架构示例

filebeat将采集的日志发送到Redis

  • 创建配置文件filebeat-nginx-to-redis.yml,将其保存到Filebeat安装目录下的conf目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 因为Nginx的access.log日志都是以IP地址开头的,所以我们需要修改下匹配字段。
# 不以ip地址开头的行追加到上一行
filebeat.inputs: # 输入源配置
- type: log # 日志类型
enabled: true # 是否启用
paths: # 采集路径
- /var/log/nginx/access*.*
exclude_files: [".gz$"] # 排除以.gz 结尾的文件
multiline.pattern: '^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s' # 匹配以IP地址开头,并紧跟一个空白字符
multiline.negate: true # 不以ip地址开头的行追加到上一行
multiline.match: after # 追加到上一行的后面
# multiline: 多行日志配置,这里实际上不需要配置多行设置,因为nginx的access日志都是单行日志,这里只做演示
# pattern: 正则表达式
# negate: false,匹配pattern的行合并到上一行;true,不匹配pattern的行合并到上一行
# match: after,合并到上一行的末尾;before,合并到上一行的开头
scan_frequency: 10s # 每 10 秒扫描一次日志文件
clean_removed: true # 在日志文件被删除后是否从其内部状态中移除该文件的记录
clean_inactive: 2h # 在日志文件处于非活动状态多长时间后将其从状态数据库中移,设置为比 ignore_older + scan_frequency 更大的值
ignore_older: 1h # 忽略那些比当前时间早于 1 小时的日志文件

output.redis: # Redis输出配置
hosts: ["10.250.0.214:6379"] # Redis服务器地址
password: "123456" # Redis密码
key: "filebeat_nginx" # Redis的key,类型为 list
db: 3 # Redis数据库
timeout: 3 # Redis连接超时时间,单位秒
  • 启动Filebeat,并查看日志输出。

1
./filebeat -e -c conf/filebeat-nginx-to-redis.yml
  • 查看Redis中的数据

1
2
3
4
5
6
7
8
9
10
11
$ redis-cli -h 10.250.0.214 -p 6379 -a 123456
10.250.0.214:6379> select 3
OK
10.250.0.214:6379[3]> keys filebeat*
1) "filebeat_nginx"
10.250.0.214:6379[3]> type filebeat_nginx
list
10.250.0.214:6379[3]> LLEN filebeat_nginx
(integer) 14
10.250.0.214:6379[3]> LINDEX filebeat_nginx 1
"{\"@timestamp\":\"2025-03-28T09:15:23.402Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"8.17.3\"},\"host\":{\"name\":\"ip-10-250-0-239.cn-northwest-1.compute.internal\"},\"agent\":{\"version\":\"8.17.3\",\"ephemeral_id\":\"c0a521fe-690b-46b1-aadc-bbe16ef1db9a\",\"id\":\"4dbe185b-900d-4990-b841-c13bf9618fc6\",\"name\":\"ip-10-250-0-239.cn-northwest-1.compute.internal\",\"type\":\"filebeat\"},\"log\":{\"offset\":46745,\"file\":{\"path\":\"/var/log/nginx/access.log\"}},\"message\":\"1.119.161.30 - elastic [28/Mar/2025:09:15:13 +0000] \\\"GET /_cluster/stats HTTP/1.1\\\" 200 7457 \\\"-\\\" \\\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36\\\" \\\"-\\\"\",\"input\":{\"type\":\"log\"},\"ecs\":{\"version\":\"8.0.0\"}}"

Logstash从Redis中读取日志并写入Elasticsearch

  • 创建配置文件logstash-nginx-redis-to-es.conf,将其保存到Logstash安装目录下的conf目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 进入logstash安装目录
# vim logstash-nginx-redis-to-es.conf
# 配置从 redis 接收数据
input { # 输入源配置
redis { # Redis输入配置
host => '10.250.0.214' # Redis服务器地址
port => "6379" # Redis服务器端口
password => "123456" # Redis密码
db => "3" # Redis数据库
data_type => 'list' # 数据类型为list
key => "filebeat_nginx" # Redis的key
}
}

filter { # 过滤器配置
grok {
match => {
"message" => "%{IP:client_ip} - %{USER:remote_user} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{INT:http_status} %{INT:body_bytes_sent} \"%{NOTSPACE:referrer}\" \"%{GREEDYDATA:agent}\" %{QUOTEDSTRING:x_forwarded_for}"
}
}

mutate {
enable_metric => "false" # 禁用指标上报
remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"] # 移除不需要的字段
}

date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"] # 时间格式转换
target => "timestamp" # 目标字段
}
}

# 在输出配置中添加ES输出配置
output { # 输出配置
stdout { # 打印到控制台
codec => rubydebug # 指定编码器
}
elasticsearch { # 输出到ES
index => "redis_nginx_access_log_%{+YYYY-MM}" # 索引名,按月分隔
hosts => ["https://10.250.0.239:9200"] # ES地址
user => "elastic" # ES用户名
password => "123456" # ES密码
ssl => true # 是否启用SSL,因为这里是https访问ES
ssl_certificate_verification => false # 禁用证书验证
}
}
  • 测试logstash配置是否正确

1
bin/logstash -f config/logstash-nginx-redis-to-es.conf --config.test_and_exit
  • 启动logstash

1
2
# reload.automatic:修改配置文件时自动重新加载
bin/logstash -f config/logstash-nginx-redis-to-es.conf --config.reload.automatic