本文介绍Logstash7.4.0如何部署

logstash部署

[toc]

一、初始化环境

环境说明

  • Centos7.5
  • ES三个节点的集群
  • Docker19.03.4
  • Logstash7.4.0(docker images)
  • RabbitMQ
  • Redis

配置说明

序号ip地址别名CPU内存硬盘
1X.X.X.211L&K-node8C16G200G

二、docker部署

安装docker

[root@blog ~]# sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo # 添加软件源信息
[root@blog ~]# sudo yum makecache fast # 更新 Docker-CE
[root@blog ~]# sudo yum -y install docker-ce # 安装 Docker-CE
[root@blog ~]# sudo service docker start # 开启Docker服务

拉取基础docker镜像

  • 拉取logstash镜像
[root@blog ~]# docker pull logstash:7.4.0
  • 查看logstash镜像
[root@blog ~]# docker images 
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
logstash            7.4.0               c2c1ac6b995b        5 weeks ago         889MB

定制docker镜像

为什么要定制docker镜像

  1. 由于后续步骤中编辑logstash的配置时会用到output区域的exec功能,先将此插件打包进docker镜像
  2. logstash有些通用的初始化配置(如logstash初始化配置,xpack配置)
  • 新建打包目录
[root@blog ~]# mkdir -p logstash-dockerfile/setting
[root@blog ~]# mkdir -p logstash-dockerfile/pipline
  • 新建并写入logstash.yml
[root@blog ~]# cat logstash-dockerfile/setting/logstash.yml <<eof
http.host: "0.0.0.0"
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://[此处填入es节点1IP地址]:9200", "http://[此处填入es节点2IP地址]:9200","http://[此处填入es节点3IP地址]:9200"]
xpack.monitoring.elasticsearch.username: remote_monitoring_user
xpack.monitoring.elasticsearch.password: [此处填入remote_monitoring_user的密码]
eof
  • 新建并写入微信转发脚本

PS:此处省略企业微信配置

[root@blog ~]# cat logstash-dockerfile/pipline/weixin.sh <<eof
#!/bin/bash

CropID='[此处填入CropID]'
Secret='[此处填入Secret]'  

GURL="https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$CropID&corpsecret=$Secret"
Gtoken=$(/usr/bin/curl -s -G $GURL | awk -F\" '{print $10}' )
echo $Gtoken > /tmp/1.txt

PURL="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$Gtoken"

function body() {
        local int AppID=[此处填入AppID]     # 应用ID
        local PartyID=[此处填入PartyID]     # 部门ID
        local Msg=$(echo "$@" | cut -d" " -f3-)
        printf '{\n'
        printf '\t"toparty": "'"$PartyID"\"",\n"
        printf '\t"msgtype": "text",\n'
        printf '\t"agentid": "'" $AppID "\"",\n"
        printf '\t"text": {\n'
        printf '\t\t"content": "'"$Msg"\""\n"
        printf '\t},\n'
        printf '\t"safe":"0"\n'
        printf '}\n'
}
/usr/bin/curl --data-ascii "$(body $1)" $PURL
eof
  • 新建并编辑dockerfile
[root@blog ~]# cat logstash-dockerfile/Dockerfile  <<eof
FROM logstash:7.4.0
COPY pipeline/weixin.sh /usr/share/logstash/pipeline/weixin.sh
COPY setting/logstash.yml /usr/share/logstash/config/logstash.yml
RUN logstash-plugin install logstash-output-exec  
RUN rm -rf /usr/share/logstash/config/logstash-sample.conf
CMD ["-f", "/usr/share/logstash/pipeline/logstash.conf"]
  • 打包docker
[root@blog ~]# cd logstash-dockerfile
[root@blog ~]# docker build . 
Sending build context to Docker daemon  8.192kB
Step 1/6 : FROM logstash:7.4.0
 ---> c2c1ac6b995b
Step 2/6 : COPY pipeline/ /usr/share/logstash/pipeline/
 ---> Using cache
 ---> 407fe8bb1ab2
Step 3/6 : COPY setting/logstash.yml /usr/share/logstash/config/logstash.yml
 ---> 1f7f2d5787df
Step 4/6 : RUN logstash-plugin install logstash-output-exec
 ---> Running in 3ba85661fb01
Validating logstash-output-exec
Installing logstash-output-exec  ## 由于下载logstash-output-exec插件比较慢,耐心等待
Installation successful
Removing intermediate container 3ba85661fb01
 ---> 459af4dff509
Step 5/6 : RUN rm -rf /usr/share/logstash/config/logstash-sample.conf
 ---> Running in 16fed9350b06
Removing intermediate container 16fed9350b06
 ---> 2398f44b5013
Step 6/6 : CMD ["-f", "/usr/share/logstash/pipeline/logstash.conf"]
 ---> Running in eacc20d78fcb
Removing intermediate container eacc20d78fcb
 ---> 024bed9a9a6c
Successfully built 024bed9a9a6c

[root@blog ~]# docker images | grep none | awk '{print $3}' # 查看新生成的镜像ID
[root@blog ~]# docker tag [填入上一步查询到的镜像ID] logstash:latest  # 给新生成的镜像打标签

logstash.conf配置

根据业务划分,创建不同的容器配置,通过docker的volume功能挂载配置启动。logstash.conf由input(输入)、filter(过滤)、output(输出)三部分组成

类别业务配置存放路径备注
系统日志filebeat/opt/elk/logstash_conf/os_log/filebeat/linux系统日志
系统日志winlogbeat/opt/elk/logstash_conf/os_log/filebeatwindows系统日志
其他日志mysql-slowlog/opt/elk/logstash_conf/other_log/mysql_log/mysql慢日志

编辑logstash.conf

INPUT区域

  • 来源为rabbitmq
input{
    rabbitmq {
        host => "[MQ ip地址]"
        vhost => "[MQ vhost]"
        exchange => "[MQ exchange]"
        queue => "[MQ queue]"
        durable => "[durable状态]"
        key => "[MQ key]"
        user => "[MQ user]"
        password => "[MQ password]"
        type => "[用于output区分input来源]"
        }
}
  • 来源为redis
input{
     redis {
       host =>  "[redis ip地址]"
       port => [redis 端口]
       data_type => "list"
       password => "[redis password]"
       key => "[区分渠道的键值]"
       db => [redis db]
  }
  • 来源为端口监听
input{
	beats{
		port=>"[监听端口]]"
	}
}

FILTER区域

参考:Use logstash pipelines for parsing.根据实际情况修改了部分配置

  • linux系统日志规则
filter{
  if [event][module] == "system" {
    if [fileset][name] == "auth" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[syst
em][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?"
,
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][
user]} from %{IPORHOST:[system][auth][ssh][ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system]
[auth][ssh][dropped_ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo]
[error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{N
UMBER:system.auth.groupadd.gid}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][useradd][name]}, UID=%
{NUMBER:[system][auth][useradd][uid]}, GID=%{NUMBER:[system][auth][useradd][gid]}, home=%{DATA:[system][auth][useradd][home]}, shell=%{DATA:[system][auth][useradd][shell]}$",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][me
ssage]}"] }
        pattern_definitions => {
          "GREEDYMULTILINE"=> "(.|\n)*"
        }
        remove_field => [ "message","[agent][hostname]" ]
      }
      grok {
        match => { "[log][file][path]" => "/var/log/%{LOGTYPE:log.type}%{GREEDYMULTILINE}" }      
        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
        pattern_definitions => { "LOGTYPE" => "[a-zA-Z0-9]+" }
      }
      date {
        match => [ "[system][auth][timestamp]", "ISO8601" ,"MMM  d HH:mm:ss", "MMM dd HH:mm:ss","MMM d yyyy HH:mm:ss" ]
        remove_field => "[system][auth][timestamp]"
      }
      mutate {
        rename => [ "[system][auth][hostname]" , "[system][hostname]" ]
        rename => [ "[system][auth][pid]" , "[system][pid]" ]
        rename => [ "[system][auth][message]" , "[system][message]" ]
        rename => [ "[host][name]" , "[os][hostname]" ]
        rename => [ "[host][ip][0]" , "[os][ip]" ]
	remove_field => "[host]"
      }
    }
    else if [fileset][name] == "syslog" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[sy
stem][syslog][message]}"] }
        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
        remove_field => [ "message","[agent][hostname]" ]
      }
      grok {
        match => { "[log][file][path]" => "/var/log/%{LOGTYPE:log.type}%{GREEDYMULTILINE}" }      
        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
        pattern_definitions => { "LOGTYPE" => "[a-zA-Z0-9]+" }
      }
      date {
           match => [ "[system][syslog][timestamp]", "ISO8601" ,"MMM  d HH:mm:ss","MMM dd HH:mm:ss","MMM d yyyy HH:mm:ss" ]
	   remove_field => "[system][syslog][timestamp]"
      }
      mutate {
        rename => [ "[system][syslog][hostname]" , "[system][hostname]" ]
        rename => [ "[system][syslog][pid]" , "[system][pid]" ]
        rename => [ "[system][syslog][message]" , "[system][message]" ]
        rename => [ "[host][name]" , "[os][hostname]" ]
        rename => [ "[host][ip][0]" , "[os][ip]" ]
	remove_field => "[host]"
     
     }
    }
  }
}
filter{

        if [winlog][event_id] == 1000 {
          grok{
          match => { "message" =>  "(?<mudulename>[a-zA-Z0-9._-]+.exe)"}  # 匹配
         }
	}
        grok {
	  match => { "[host][ip]" => "%{IPV4:[os][ip]}"}

	}
    
	mutate {
		rename => [ "[host][name]" , "[os][hostname]" ]
		remove_field => [ "[host]","[agent][hostname]"]
	}


        date {
           match => [ "[event][created]", "ISO8601" ]
	   remove_field => "[event][created]"
	}          
}
  • mysql慢日志规则
filter{
 if [event][module] == "mysql" {
    if [fileset][name] == "slowlog" {
      grok {
        match => { "message" => ["^# User@Host: %{USER:[mysql][slowlog][user]}(\[[^\]]+\])? @\s*\[%{IPV4:[mysql][slowlog][ip]}\](\s*Id:\s* %{NUMBER:[mysql][slowlog][id]})?\n# Query_time: %{NUMBER:[mysql][slowl
og][query_time][sec]}\s* Lock_time: %{NUMBER:[mysql][slowlog][lock_time][sec]}\s* Rows_sent: %{NUMBER:[mysql][slowlog][rows_sent]}\s* Rows_examined: %{NUMBER:[mysql][slowlog][rows_examined]}\n(SET timestamp=%{
NUMBER:[mysql][slowlog][timestamp]};\n)?%{GREEDYMULTILINE:[mysql][slowlog][query]}"] }
        pattern_definitions => {
          "GREEDYMULTILINE" => "(.|\n)*"
        }
        remove_field => "message"
      }
      date {
        match => [ "[mysql][slowlog][timestamp]", "UNIX" ]
      }
      mutate {
        gsub => ["[mysql][slowlog][query]", "\n# Time: [0-9]+ [0-9][0-9]:[0-9][0-9]:[0-9][0-9](\\.[0-9]+)?$", ""]
      }
      mutate {
        rename => [ "[host][name]" , "[os][hostname]" ]
        rename => [ "[host][ip][0]" , "[os][ip]" ]
	remove_field => "[host]"
      }
    }
  }

OUTPUT区域

  • 输出到elasticsearch,发送数据到es集群
output{
        elasticsearch {
        hosts => ["[es节点1 IP地址]:9200","[es节点2 IP地址]:9200","[es节点3 IP地址]:9200"]
        index => "[索引名称]"  # example:os_syslog.windows-%{+YYYY.MM.dd}
        user => "elastic"    
        password => "[elastic用户的密码]" 
        type => "[填入INPUT区域定义的type,以区分INPUT渠道]"
    }
  • 输出到exec,执行命令

作用:此规则针对windows模块状态监控编写,当匹配异常退出的程序,并发送微信告警
关联:需要配合windows过滤规则使用

output{	
    if [winlog][event_id] == 1000 {  # 当匹配到[winlog][event_id]为1000时,执行发送微信报警脚本,
        exec {command => "/bin/bash /usr/share/logstash/pipeline/weixin.sh '服务器模块故障\n故障时间(需要+8:00):%{@timestamp}\n主机名称:%{[os][hostname]}\n主机地址:%{[os][ip]}\n故障级别:%{[log][level]}\n故障信息:%{mudulename}'"}
        }
}
  • 输出到stdout,打印到屏幕
output{	
    stdout { 
        codec => rubydebug 
    }
}

启动docker

[root@blog ~]# docker run  -itd \
--name filebeat --hostname filebeat \
-e TZ=Asia/Shanghai \
 -v /opt/elk/logstash_conf/os_log/filebeat/:/usr/share/logstash/pipeline/  \
-v /opt/elk/logstash_conf/logstash.yml:/usr/share/logstash/config/logstash.yml \
logstash:latest

Tips:Docker报错 WARNING: IPv4 forwarding is disabled. Networking will not work.

[root@blog ~]# echo "net.ipv4.ip_forward=1" >> /etc/sysctl.conf # 添加配置
[root@blog ~]# systemctl restart network     # 重启网络服务
[root@blog ~]# sysctl net.ipv4.ip_forward     # 当返回结果为1时,配置生效

查看docker状态

[root@blog ~]# docker ps
CONTAINER ID  IMAGE            COMMAND                  CREATED        STATUS         PORTS        NAMES
6c7c85a91e93  logstash:latest  "/usr/local/bin/dock…"   4 days ago     Up 4 days      9600/tcp     filebeat