本文介绍Logstash7.4.0如何部署
logstash部署
[toc]
一、初始化环境
环境说明
- Centos7.5
- ES三个节点的集群
- Docker19.03.4
- Logstash7.4.0(docker images)
- RabbitMQ
- Redis
配置说明
序号 | ip地址 | 别名 | CPU | 内存 | 硬盘 |
---|---|---|---|---|---|
1 | X.X.X.211 | L&K-node | 8C | 16G | 200G |
二、docker部署
安装docker
[root@blog ~]# sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo # 添加软件源信息
[root@blog ~]# sudo yum makecache fast # 更新 Docker-CE
[root@blog ~]# sudo yum -y install docker-ce # 安装 Docker-CE
[root@blog ~]# sudo service docker start # 开启Docker服务
拉取基础docker镜像
- 拉取logstash镜像
[root@blog ~]# docker pull logstash:7.4.0
- 查看logstash镜像
[root@blog ~]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
logstash 7.4.0 c2c1ac6b995b 5 weeks ago 889MB
定制docker镜像
为什么要定制docker镜像
- 由于后续步骤中编辑logstash的配置时会用到output区域的exec功能,先将此插件打包进docker镜像
- logstash有些通用的初始化配置(如logstash初始化配置,xpack配置)
- 新建打包目录
[root@blog ~]# mkdir -p logstash-dockerfile/setting
[root@blog ~]# mkdir -p logstash-dockerfile/pipline
- 新建并写入logstash.yml
[root@blog ~]# cat logstash-dockerfile/setting/logstash.yml <<eof
http.host: "0.0.0.0"
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://[此处填入es节点1IP地址]:9200", "http://[此处填入es节点2IP地址]:9200","http://[此处填入es节点3IP地址]:9200"]
xpack.monitoring.elasticsearch.username: remote_monitoring_user
xpack.monitoring.elasticsearch.password: [此处填入remote_monitoring_user的密码]
eof
- 新建并写入微信转发脚本
PS:此处省略企业微信配置
[root@blog ~]# cat logstash-dockerfile/pipline/weixin.sh <<eof
#!/bin/bash
CropID='[此处填入CropID]'
Secret='[此处填入Secret]'
GURL="https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$CropID&corpsecret=$Secret"
Gtoken=$(/usr/bin/curl -s -G $GURL | awk -F\" '{print $10}' )
echo $Gtoken > /tmp/1.txt
PURL="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$Gtoken"
function body() {
local int AppID=[此处填入AppID] # 应用ID
local PartyID=[此处填入PartyID] # 部门ID
local Msg=$(echo "$@" | cut -d" " -f3-)
printf '{\n'
printf '\t"toparty": "'"$PartyID"\"",\n"
printf '\t"msgtype": "text",\n'
printf '\t"agentid": "'" $AppID "\"",\n"
printf '\t"text": {\n'
printf '\t\t"content": "'"$Msg"\""\n"
printf '\t},\n'
printf '\t"safe":"0"\n'
printf '}\n'
}
/usr/bin/curl --data-ascii "$(body $1)" $PURL
eof
- 新建并编辑dockerfile
[root@blog ~]# cat logstash-dockerfile/Dockerfile <<eof
FROM logstash:7.4.0
COPY pipeline/weixin.sh /usr/share/logstash/pipeline/weixin.sh
COPY setting/logstash.yml /usr/share/logstash/config/logstash.yml
RUN logstash-plugin install logstash-output-exec
RUN rm -rf /usr/share/logstash/config/logstash-sample.conf
CMD ["-f", "/usr/share/logstash/pipeline/logstash.conf"]
- 打包docker
[root@blog ~]# cd logstash-dockerfile
[root@blog ~]# docker build .
Sending build context to Docker daemon 8.192kB
Step 1/6 : FROM logstash:7.4.0
---> c2c1ac6b995b
Step 2/6 : COPY pipeline/ /usr/share/logstash/pipeline/
---> Using cache
---> 407fe8bb1ab2
Step 3/6 : COPY setting/logstash.yml /usr/share/logstash/config/logstash.yml
---> 1f7f2d5787df
Step 4/6 : RUN logstash-plugin install logstash-output-exec
---> Running in 3ba85661fb01
Validating logstash-output-exec
Installing logstash-output-exec ## 由于下载logstash-output-exec插件比较慢,耐心等待
Installation successful
Removing intermediate container 3ba85661fb01
---> 459af4dff509
Step 5/6 : RUN rm -rf /usr/share/logstash/config/logstash-sample.conf
---> Running in 16fed9350b06
Removing intermediate container 16fed9350b06
---> 2398f44b5013
Step 6/6 : CMD ["-f", "/usr/share/logstash/pipeline/logstash.conf"]
---> Running in eacc20d78fcb
Removing intermediate container eacc20d78fcb
---> 024bed9a9a6c
Successfully built 024bed9a9a6c
[root@blog ~]# docker images | grep none | awk '{print $3}' # 查看新生成的镜像ID
[root@blog ~]# docker tag [填入上一步查询到的镜像ID] logstash:latest # 给新生成的镜像打标签
logstash.conf配置
根据业务划分,创建不同的容器配置,通过docker的volume功能挂载配置启动。logstash.conf由input(输入)、filter(过滤)、output(输出)三部分组成
类别 | 业务 | 配置存放路径 | 备注 |
---|---|---|---|
系统日志 | filebeat | /opt/elk/logstash_conf/os_log/filebeat/ | linux系统日志 |
系统日志 | winlogbeat | /opt/elk/logstash_conf/os_log/filebeat | windows系统日志 |
其他日志 | mysql-slowlog | /opt/elk/logstash_conf/other_log/mysql_log/ | mysql慢日志 |
编辑logstash.conf
INPUT区域
- 来源为rabbitmq
input{
rabbitmq {
host => "[MQ ip地址]"
vhost => "[MQ vhost]"
exchange => "[MQ exchange]"
queue => "[MQ queue]"
durable => "[durable状态]"
key => "[MQ key]"
user => "[MQ user]"
password => "[MQ password]"
type => "[用于output区分input来源]"
}
}
- 来源为redis
input{
redis {
host => "[redis ip地址]"
port => [redis 端口]
data_type => "list"
password => "[redis password]"
key => "[区分渠道的键值]"
db => [redis db号]
}
- 来源为端口监听
input{
beats{
port=>"[监听端口]]"
}
}
FILTER区域
参考:Use logstash pipelines for parsing.根据实际情况修改了部分配置
- linux系统日志规则
filter{
if [event][module] == "system" {
if [fileset][name] == "auth" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[syst
em][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?"
,
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][
user]} from %{IPORHOST:[system][auth][ssh][ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system]
[auth][ssh][dropped_ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo]
[error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{N
UMBER:system.auth.groupadd.gid}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][useradd][name]}, UID=%
{NUMBER:[system][auth][useradd][uid]}, GID=%{NUMBER:[system][auth][useradd][gid]}, home=%{DATA:[system][auth][useradd][home]}, shell=%{DATA:[system][auth][useradd][shell]}$",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][me
ssage]}"] }
pattern_definitions => {
"GREEDYMULTILINE"=> "(.|\n)*"
}
remove_field => [ "message","[agent][hostname]" ]
}
grok {
match => { "[log][file][path]" => "/var/log/%{LOGTYPE:log.type}%{GREEDYMULTILINE}" }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
pattern_definitions => { "LOGTYPE" => "[a-zA-Z0-9]+" }
}
date {
match => [ "[system][auth][timestamp]", "ISO8601" ,"MMM d HH:mm:ss", "MMM dd HH:mm:ss","MMM d yyyy HH:mm:ss" ]
remove_field => "[system][auth][timestamp]"
}
mutate {
rename => [ "[system][auth][hostname]" , "[system][hostname]" ]
rename => [ "[system][auth][pid]" , "[system][pid]" ]
rename => [ "[system][auth][message]" , "[system][message]" ]
rename => [ "[host][name]" , "[os][hostname]" ]
rename => [ "[host][ip][0]" , "[os][ip]" ]
remove_field => "[host]"
}
}
else if [fileset][name] == "syslog" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[sy
stem][syslog][message]}"] }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
remove_field => [ "message","[agent][hostname]" ]
}
grok {
match => { "[log][file][path]" => "/var/log/%{LOGTYPE:log.type}%{GREEDYMULTILINE}" }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
pattern_definitions => { "LOGTYPE" => "[a-zA-Z0-9]+" }
}
date {
match => [ "[system][syslog][timestamp]", "ISO8601" ,"MMM d HH:mm:ss","MMM dd HH:mm:ss","MMM d yyyy HH:mm:ss" ]
remove_field => "[system][syslog][timestamp]"
}
mutate {
rename => [ "[system][syslog][hostname]" , "[system][hostname]" ]
rename => [ "[system][syslog][pid]" , "[system][pid]" ]
rename => [ "[system][syslog][message]" , "[system][message]" ]
rename => [ "[host][name]" , "[os][hostname]" ]
rename => [ "[host][ip][0]" , "[os][ip]" ]
remove_field => "[host]"
}
}
}
}
filter{
if [winlog][event_id] == 1000 {
grok{
match => { "message" => "(?<mudulename>[a-zA-Z0-9._-]+.exe)"} # 匹配
}
}
grok {
match => { "[host][ip]" => "%{IPV4:[os][ip]}"}
}
mutate {
rename => [ "[host][name]" , "[os][hostname]" ]
remove_field => [ "[host]","[agent][hostname]"]
}
date {
match => [ "[event][created]", "ISO8601" ]
remove_field => "[event][created]"
}
}
- mysql慢日志规则
filter{
if [event][module] == "mysql" {
if [fileset][name] == "slowlog" {
grok {
match => { "message" => ["^# User@Host: %{USER:[mysql][slowlog][user]}(\[[^\]]+\])? @\s*\[%{IPV4:[mysql][slowlog][ip]}\](\s*Id:\s* %{NUMBER:[mysql][slowlog][id]})?\n# Query_time: %{NUMBER:[mysql][slowl
og][query_time][sec]}\s* Lock_time: %{NUMBER:[mysql][slowlog][lock_time][sec]}\s* Rows_sent: %{NUMBER:[mysql][slowlog][rows_sent]}\s* Rows_examined: %{NUMBER:[mysql][slowlog][rows_examined]}\n(SET timestamp=%{
NUMBER:[mysql][slowlog][timestamp]};\n)?%{GREEDYMULTILINE:[mysql][slowlog][query]}"] }
pattern_definitions => {
"GREEDYMULTILINE" => "(.|\n)*"
}
remove_field => "message"
}
date {
match => [ "[mysql][slowlog][timestamp]", "UNIX" ]
}
mutate {
gsub => ["[mysql][slowlog][query]", "\n# Time: [0-9]+ [0-9][0-9]:[0-9][0-9]:[0-9][0-9](\\.[0-9]+)?$", ""]
}
mutate {
rename => [ "[host][name]" , "[os][hostname]" ]
rename => [ "[host][ip][0]" , "[os][ip]" ]
remove_field => "[host]"
}
}
}
OUTPUT区域
- 输出到elasticsearch,发送数据到es集群
output{
elasticsearch {
hosts => ["[es节点1 IP地址]:9200","[es节点2 IP地址]:9200","[es节点3 IP地址]:9200"]
index => "[索引名称]" # example:os_syslog.windows-%{+YYYY.MM.dd}
user => "elastic"
password => "[elastic用户的密码]"
type => "[填入INPUT区域定义的type,以区分INPUT渠道]"
}
- 输出到exec,执行命令
作用:此规则针对windows模块状态监控编写,当匹配异常退出的程序,并发送微信告警
关联:需要配合windows过滤规则使用
output{
if [winlog][event_id] == 1000 { # 当匹配到[winlog][event_id]为1000时,执行发送微信报警脚本,
exec {command => "/bin/bash /usr/share/logstash/pipeline/weixin.sh '服务器模块故障\n故障时间(需要+8:00):%{@timestamp}\n主机名称:%{[os][hostname]}\n主机地址:%{[os][ip]}\n故障级别:%{[log][level]}\n故障信息:%{mudulename}'"}
}
}
- 输出到stdout,打印到屏幕
output{
stdout {
codec => rubydebug
}
}
启动docker
[root@blog ~]# docker run -itd \
--name filebeat --hostname filebeat \
-e TZ=Asia/Shanghai \
-v /opt/elk/logstash_conf/os_log/filebeat/:/usr/share/logstash/pipeline/ \
-v /opt/elk/logstash_conf/logstash.yml:/usr/share/logstash/config/logstash.yml \
logstash:latest
Tips:Docker报错 WARNING: IPv4 forwarding is disabled. Networking will not work.
[root@blog ~]# echo "net.ipv4.ip_forward=1" >> /etc/sysctl.conf # 添加配置 [root@blog ~]# systemctl restart network # 重启网络服务 [root@blog ~]# sysctl net.ipv4.ip_forward # 当返回结果为1时,配置生效
查看docker状态
[root@blog ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6c7c85a91e93 logstash:latest "/usr/local/bin/dock…" 4 days ago Up 4 days 9600/tcp filebeat