已解决的需求

  1. 时间顺序排序,与实际日志相符
  2. kibana根据ip、log筛选日志(对应字段:fileds.server, log.file.path)
  3. 引入kafka保证 机器宕机后日志不丢失,3个节点组成的集群来实现日志同步,备份
  4. 同城双活

架构图

ELK Demo采用的架构如下:

image-20220902093938379

如上图,图中 Logstash 多个的原因是考虑到程序是分布式架构的情况,每台机器都需要部署一个 Logstash,如果确实是单服务器的情况部署一个 Logstash 即可。

优点是搭建简单,易于上手。缺点是Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。

改进架构

架构图二:

image-20220902094000615

此种架构引入了消息队列机制,位于各个节点上的Logstash Agent先将数据/日志传递给Kafka(或者Redis),并将队列中消息或数据间接传递给Logstash,Logstash过滤、分析后将数据传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了Kafka(或者Redis),所以即使远端Logstash server因故障停止运行,数据将会先被存储下来,从而避免数据丢失。

架构图三:

image-20220902094313826

此种架构将收集端logstash替换为beats,更灵活,消耗资源更少,扩展性更强。同时可配置Logstash 和Elasticsearch 集群用于支持大集群系统的运维日志数据监控和查询。

当前架构

image-20220902094327500

集群节点分布图

image-20220902094338417

环境说明(Server 1, Server 2, Server 3均是需要日志收集服务的机器)

ELK版本均为7.6.0, Kafka版本为2.5.0所有机器的操作系统都是CentOS 7。

host vm-name hostname 资源 运行服务
10.240.211.189 yufeng-centos7-kafka1 kafka1 4核8G kafka
10.240.211.71 yufeng-centos7-kafka2 kafka2 4核8G kafka
10.240.211.76(Lab2 ESXI) yufeng-centos7-kafka3 kafka3 4核8G kafka
10.240.211.68 yufeng-centos7-dev yufeng 8核24G es, logstash, kibana
10.240.210.242 yufeng-centos7-dev2 dev2 4核8G es
10.240.210.193(Lab2 ESXI) yufeng-centos7-es3 es3 2核4G es
10.240.211.112 dbgshell lxca, filebeat
10.240.210.243 dbgshell lxca, filebeat

正如架构补充部分所说,引入Kafka,利用消息队列机制将filebeat(生产者)从服务器上采集的日志存储下来,然后logstash作为消费者将日志解析并存储到 ElasticSearch中,然后通过 Kibana提供日志可视化、筛选等功能。

此处的Kafka集群由 **三个节点(Lab1 2个节点,Lab2 1个节点)**组成,每个节点是一个broker,能够实现一个节点宕机,其他节点依然能够正常提供服务。而 **这里采取的Lab1 2个节点,Lab2 1个节点(Kafka,ES)**的目的是为了实现 同城双活

ES双活需要考虑的问题:

  • 长距离传输导致的数据传送的延时,以及 网络抖动增加“误选举”的概率?
    • 测试节点所在机房邻近,延时忽略不计
  • 3个节点,当某个节点挂了以后,重新恢复连接时,从leader节点上同步数据时 可能发生的流量风暴
    • 当前的日志量可以承受,需测试更大的规模。

参考:

img

img

工作流程概述

  1. 配置jdk
  2. 根据环境说明中的所运行的软件下载安装软件,并配置filebeat.yml, logstash配置文件kafka, es
  3. 依次启动es集群, kibana, kafka集群,创建topic,启动filebeat,启动logstash

具体配置

Kafka

config/server.properties

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# kafka1
broker.id=0				# 当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://10.240.211.189:9092
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
zookeeper.connect=10.240.211.189:2181,10.240.211.71:2181,10.240.211.76:2181

# kafka2
broker.id=1
listeners=PLAINTEXT://10.240.211.71:9092
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
zookeeper.connect=10.240.211.189:2181,10.240.211.71:2181,10.240.211.76:2181

# kafka3 也一样 就broker.id = 2
broker.id=2
listeners=PLAINTEXT://10.240.211.76:9092
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
zookeeper.connect=10.240.211.189:2181,10.240.211.71:2181,10.240.211.76:2181

config/zookeeper.properties

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
dataDir=/tmp/zookeeper

# 客户端连接server的端口,即对外服务端口,默认为2181
clientPort=2181

# 单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制。请注意这个限制的使用范围,仅仅是单台客户端机器与单台ZK服务器之间的连接数限制,不是针对指定客户端IP,也不是ZK集群的连接数限制,也不是单台ZK对所有客户端的连接数限制
maxClientCnxns=0

admin.enableServer=false

# server列表 2888为选举端口,3888为心跳端口
# 0表示服务器的编号 对应 dataDir 下面的 myid 文件
server.0=10.240.211.189:2888:3888
server.1=10.240.211.71:2888:3888
server.2=10.240.211.76:2888:3888

# ZK中的一个时间单元。ZK中所有时间都是以这个时间单元为基础,进行整数倍配置的。例如,session的最小超时时间是2*tickTime
tickTime=2000

# Follower在启动过程中,会从Leader同步所有最新数据,然后确定自己能够对外服务的起始状态。Leader允许F在 initLimit时间内完成这个工作。通常情况下,我们不用太在意这个参数的设置。如果ZK集群的数据量确实很大了,F在启动的时候,从Leader上同步数据的时间也会相应变长,因此在这种情况下,有必要适当调大这个参数
initLimit=10

# 在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。如果L发出心跳包在syncLimit之后,还没有从F那里收到响应,那么就认为这个F已经不在线了。注意:不要把这个参数设置得过大,否则可能会掩盖一些问题
syncLimit=5

在上一步 dataDir 指定的目录下,创建 myid 文件。 直接将 kafkabroker.id 写入对应即可

启动集群所有节点

1
2
3
4
5
6
7
8
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
./bin/kafka-server-start.sh -daemon ./config/server.properties

# 开放端口,
iptables -A INPUT -ptcp --dport 3888 -j ACCEPT
iptables -A INPUT -ptcp --dport 2888 -j ACCEPT
iptables-save 	#保存

创建topic

1
2
3
4
5
6
7
8
./bin/kafka-topics.sh --create --bootstrap-server 10.240.211.189:9092,10.240.211.71:9092,10.240.211.76:9092 --replication-factor 3 --partitions 1 --topic pim2_log

./bin/kafka-topics.sh --create --bootstrap-server 10.240.211.189:9092,10.240.211.71:9092,10.240.211.76:9092 --replication-factor 3 --partitions 1 --topic lxca2_log

# 查看topic是否正常
./bin/kafka-topics.sh --describe --bootstrap-server 10.240.211.189:9092,10.240.211.71:9092,10.240.211.76:9092 --topic pim2_log

./bin/kafka-topics.sh --describe --bootstrap-server 10.240.211.189:9092,10.240.211.71:9092,10.240.211.76:9092 --topic lxca2_log

ES

elasticsearch.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
cluster.name: elk-application  #ELK的集群名称,名称相同即属于是同一个集群
node.name: node-1 #本机在集群内的节点名称 要在集群中唯一
path.data: /elk/data #数据存放目录
path.logs: /elk/logs #日志保存目录
network.host: 10.240.211.68 #监听的IP地址
http.port: 9200 #服务监听的端口
discovery.seed_hosts: ["10.240.211.68:9300", "10.240.210.242:9300", "10.240.210.193:9300"] # 提供集群中符合主机要求的节点的列表 服务发现种子主机
cluster.initial_master_nodes: ["10.240.211.68", "10.240.210.242", "10.240.210.193"] # 可以成为master节点的机器 初始主节点
#开启 xpack 功能,如果要禁止使用密码,请将以下内容注释,直接启动不需要设置密码
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12

各节点修改 config/jvm.options 文件的 -Xms4g-Xmx4g 为服务器的内存一半,我的服务器时 8G 内存,所以这里改成了 4G 。当然,这个值最大不要超过 32G

生成TLS核身份认证

1
bin/elasticsearch-certutil cert -out config/elastic-certificates.p12 -pass ""

生成 TLS 和身份验证,将会在config下生成elastic-certificates.p12文件,将此文件传到其他节点的config目录,注意文件权限

创建 Elasticsearch 集群密码

交互式设置密码,此处密码全部设置为 123456.

1
bin/elasticsearch-setup-passwords interactive

kibana页面登录账号密码为:elastic/123456

Kibana

config/kibana.yml

1
2
3
4
5
server.port: 5601
server.host: "10.240.211.68"
elasticsearch.hosts: ["10.240.211.68:9200"]
elasticsearch.username: "kibana"
elasticsearch.password: "123456"

Logstash

lxca_log.conf 处理topic为lxca2_log的日志

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
input{
    kafka {
        codec => json
        auto_offset_reset => "earliest"
        topics => "lxca2_log"					# 与kafka topic对应
        consumer_threads => 1
        bootstrap_servers => "10.240.211.189:9092,10.240.211.71:9092,10.240.211.76:9092"
    }
}

filter {
    grok {
        match => {"message" => "%{MONTHDAY:day}\.%{MONTH:month}\.%{YEAR:year}\s+%{TIME:time}\s+%{TZ:timezone},\s+\[%{DATA:threadName}\],\s+%{LOGLEVEL:loglevel},\s+%{JAVACLASS:logclass}\:%{WORD:method}\s+%{GREEDYDATA:details}"}
    }
    mutate {
        add_field => {"timestamp" => "%{year}-%{month}-%{day} %{time}"}
    }
    date {
        match => [ "timestamp" , "yyy-MMM-dd HH:mm:ss.SSS" ]
        target => "@timestamp"
        locale => "en"
        timezone => "+00:00"
    }
    mutate {
        remove_field => ["timezone", "timestamp", "day", "month", "year", "time"]
    }
}

output{
    #if [type] == "lxca-log" {
    if [fields][topic] == "lxca2_log" {		# 与filebeat.yml对应
        elasticsearch{
            hosts => ["10.240.211.68:9200","10.240.210.242:9200", "10.240.210.193:9200"]
            user => "elastic"				# 前面创建的账号密码
            password => "123456"
            index =>"lxca-%{+YYYY.MM.dd}.log"
        }
    }
    #}
}

pim_log.conf 处理topic为pim2_log的日志

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# pim_log.conf 	处理topic为pim2_log的日志
input{
    kafka {
        codec => json
        auto_offset_reset => "earliest"
        topics => "pim2_log"
        consumer_threads => 1
        bootstrap_servers => "10.240.211.189:9092,10.240.211.71:9092,10.240.211.76:9092"
    }
}

filter {
    grok {
        match => {"message" => "%{MONTHDAY:day}\.%{MONTH:month}\.%{YEAR:year}\s+%{TIME:time}\s+%{TZ:timezone},\s+\[%{DATA:threadName}\],\s+%{LOGLEVEL:loglevel},\s+%{JAVACLASS:logclass}\:%{WORD:method}\s+%{GREEDYDATA:details}"}
    }
    mutate {
        add_field => {"timestamp" => "%{year}-%{month}-%{day} %{time}"}
    }
    date {
        match => [ "timestamp" , "yyy-MMM-dd HH:mm:ss.SSS" ]
        target => "@timestamp"
        #remove_field => ["timezone"]
        locale => "en"
        timezone => "+00:00"
    }
    mutate {
        remove_field => ["timezone", "timestamp", "day", "month", "year", "time"]
    }
}

output{
    if [fields][topic] == "pim2_log" {
        elasticsearch{
            hosts => ["10.240.211.68:9200","10.240.210.242:9200", "10.240.210.193:9200"]
            user => "elastic"
            password => "123456"
            index =>"pim-%{+YYYY.MM.dd}.log"
        }
    }
}

Filebeat

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /opt/lenovo/lxca/data/logs/pim-*.log
    #- /opt/cfc/test-filebeat.log
    #- /var/log/test-filebeat.log
    #- c:\programdata\elasticsearch\logs\*
  fields:
    topic: pim2_log				# 对应于输出到kafka的topic
    server: 10.240.211.112		# 这个字段用来区分是哪台机器

- type: log
  enabled: true

  paths:
    - /opt/lenovo/lxca/data/logs/lxca*.log

  fields:
    topic: lxca2_log
    server: 10.240.211.112

output.kafka:
  hosts: ["10.240.211.189:9092","10.240.211.71:9092", "10.240.211.76:9092"]
  topic: "%{[fields.topic]}"		# 输出到kafka对应topic
  partition.round_robin: # 开启kafka的partition分区
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 100000000

效果

http://10.240.211.68:5601/

登录kibana,elastic/123456

image-20220902102246026

  1. 按实际日志时间倒序排序;

  2. 根据server ip区分机器(fields.server),根据log.file.path区分log;

  3. 实时性,依赖kafka与ELK

  4. 同城双活(2机房,主机房2个节点,第二机房1个节点)

    image-20220902102226116

简单使用:创建Index Pattern(可以理解为对es的index进行分组筛选),然后切换到Discover页面根据需要做filter

联调步骤

为了能够节省调试时间,建议大家从数据源头开始做调试,

第一步:查看filebeat的log确认 pim-*.log日志收集是否正常

第二步:查看filebeat的日志文件有没有自身配置错误信息以及转发数据到kafka队列是否正常

此步骤可以在kafka服务器端使用命令行方式消费kafka中的消息进行双边确认

第三步:确认kafka服务本身正常的情况下,查看logstash日志,如果logstash和其它服务共用节点时

需要注意该节点的内存资源是否充足,否则logstash会因为内存不足直接启动失败

第四步:如果前面三步验证都没问题的情况下,elasticsearch里面现在生产了少量的日志数据,但只要有数据存入elasticsearch,就会产生logstash out插件中预定义好的index,此时可能通过elasticsearch 提供的查询API验证elasticsearch是否正常接收了来自logstash转发过来的日志数据

第五步:确认elasticsearch里面有数据以后,就可以使用浏览器打开es_server_ip:5601页面进行创建index模式

如果elasticsearch中没有数据时,是无法在kibana页面中创建index模式

NOTE

  1. 架构中各节点端口的开放,(9092,9200,9300等),ssh, telnet, 检查防火墙

  2. 一个logstash实例同时启动多个配置文件,善用 pipelines.yml利用pipeline可以将各个配置文件相互隔离开来,对于不同的log,我们就可以分别编写一个conf来处理,相互之间不干扰,可扩展性高,不易出错。

    pipelines.yml(实现一个logstash两个pipeline的关键

    参考:https://www.elastic.co/guide/en/logstash/7.6/pipeline-to-pipeline.html

    1
    2
    3
    4
    5
    6
    7
    
    - pipeline.id: pim_log-processing
      pipeline.workers: 4
      pipeline.batch.size: 256
      path.config: "/home/yufeng/logstash-7.6.0/config/conf/pim_log.conf"
    - pipeline.id: lxca_log-processing
      pipeline.workers: 4
      path.config: "/home/yufeng/logstash-7.6.0/config/conf/lxca_log.conf"
    
  3. ES用户拥有的可创建文件描述的权限太低 以及 filebeat.yml格式

  4. kafka 集群优化

TODO

Service方式启动Filebeat

  • filebeat一段时间以后自动关闭,可以通过添加为servie自动重启
    • 解决方法:https://www.jianshu.com/p/6720fe6d24fb

Logstash单调故障问题——利用Kafka Consumer Group实现高可用

参考: https://www.cnblogs.com/caoweixiong/p/12691458.html

img

Consumer Group: 是个逻辑上的概念,为一组consumer的集合,同一个topic的数据会广播给不同的group,同一个group中只有一个consumer能拿到这个数据。

也就是说对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个consumer消费,

基于这一点我们只需要启动多个logstsh,并将这些logstash分配在同一个组里边就可以实现logstash的高可用了。

Kafka Partition数目的确定?

  • 推荐partition的数量一定要大于同时运行的consumer的数量。

  • 建议partition的数量大于集群broker的数量,这样消息数据就可以均匀的分布在各个broker中。

如果partition少的话,某些消费者无法消费数据,那么,Topic为什么要设置多个Partition呢,这是因为kafka是基于文件存储的,通过配置多个partition可以将消息内容分散存储到多个broker上,这样可以避免文件尺寸达到单机磁盘的上限。同时,将一个topic切分成任意多个partitions,可以保证消息存储、消息消费的效率,因为越多的partitions可以容纳更多的consumer,可有效提升Kafka的吞吐率。因此,将Topic切分成多个partitions的好处是可以将大量的消息分成多批数据同时写到不同节点上,将写请求分担负载到各个集群节点。

Topic为什么要设置多个Partition呢

  1. 可有效提升Kafka的吞吐率
  2. 写请求分担负载到各个集群节点

Log topic的划分?(颗粒度)

项目log的格式 grok正则化解析式编写?

每个log文件的(Exception)异常情况处理,需要根据loglevel级别划分,并编写对应的grok解析式。

根据request api找到对应的response