以前查日志:SSH登录服务器,grep、tail、awk轮番上阵,10台服务器查一圈下来半小时过去了。现在:打开Kibana,输入关键词,所有服务器的日志一秒出结果。
一、为什么需要ELK?
先说说我们之前的"原始"日志管理:- # 服务器1
- ssh root@192.168.1.10
- tail -f /var/log/app/app.log | grep "ERROR"
- # 服务器2
- ssh root@192.168.1.11
- tail -f /var/log/app/app.log | grep "ERROR"
- # 服务器3...10
- # 开10个终端窗口...
复制代码 痛点:
- 10台服务器要开10个终端
- 日志量大的时候grep到眼花
- 想看历史日志?先下载再分析
- 跨服务追踪?基本不可能
- 老板问"昨天有多少报错"?数到明天也数不完
搭了ELK之后:
- 所有服务器日志集中一处
- 搜索秒出结果
- 自动生成图表
- 异常告警自动通知
二、ELK是什么?
ELK是三个开源项目的首字母:
组件作用一句话解释Elasticsearch存储+搜索日志数据库,支持全文搜索Logstash收集+处理从各处收集日志,清洗后存入ESKibana可视化Web界面,搜索和做图表现在还经常加个Filebeat,变成EFK或ELFK:- 应用日志 → Filebeat(轻量采集)→ Logstash(处理)→ Elasticsearch(存储)→ Kibana(展示)
复制代码 三、快速搭建(Docker Compose)
3.1 服务器要求
ELK比较吃资源,建议配置:
环境CPU内存磁盘测试2核4GB50GB生产8核+16GB+500GB+ SSD3.2 目录结构
- mkdir -p /data/elk/{elasticsearch,logstash,kibana}
- cd /data/elk
- # 目录结构
- elk/
- ├── docker-compose.yml
- ├── elasticsearch/
- │ └── data/
- ├── logstash/
- │ ├── config/
- │ │ └── logstash.yml
- │ └── pipeline/
- │ └── logstash.conf
- └── kibana/
- └── config/
- └── kibana.yml
复制代码 3.3 docker-compose.yml
- version: '3.8'
- services:
- elasticsearch:
- image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
- container_name: elasticsearch
- environment:
- - node.name=es01
- - cluster.name=elk-cluster
- - discovery.type=single-node
- - bootstrap.memory_lock=true
- - xpack.security.enabled=false
- - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
- ulimits:
- memlock:
- soft: -1
- hard: -1
- volumes:
- - ./elasticsearch/data:/usr/share/elasticsearch/data
- ports:
- - "9200:9200"
- networks:
- - elk
- logstash:
- image: docker.elastic.co/logstash/logstash:8.11.0
- container_name: logstash
- volumes:
- - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml
- - ./logstash/pipeline:/usr/share/logstash/pipeline
- ports:
- - "5044:5044" # Beats输入
- - "5000:5000" # TCP输入
- - "9600:9600" # API
- environment:
- - "LS_JAVA_OPTS=-Xms512m -Xmx512m"
- depends_on:
- - elasticsearch
- networks:
- - elk
- kibana:
- image: docker.elastic.co/kibana/kibana:8.11.0
- container_name: kibana
- volumes:
- - ./kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml
- ports:
- - "5601:5601"
- environment:
- - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- depends_on:
- - elasticsearch
- networks:
- - elk
- networks:
- elk:
- driver: bridge
复制代码 3.4 配置文件
logstash/config/logstash.yml:- http.host: "0.0.0.0"
- xpack.monitoring.elasticsearch.hosts: ["http://elasticsearch:9200"]
复制代码 logstash/pipeline/logstash.conf:- input {
- # 接收Filebeat发来的日志
- beats {
- port => 5044
- }
-
- # 也可以接收TCP发来的日志
- tcp {
- port => 5000
- codec => json
- }
- }
- filter {
- # 解析JSON格式日志
- if [message] =~ /^\{.*\}$/ {
- json {
- source => "message"
- }
- }
-
- # 解析时间戳
- date {
- match => ["timestamp", "yyyy-MM-dd HH:mm:ss.SSS", "ISO8601"]
- target => "@timestamp"
- }
-
- # 添加自定义字段
- mutate {
- add_field => { "env" => "production" }
- }
- }
- output {
- elasticsearch {
- hosts => ["elasticsearch:9200"]
- index => "app-logs-%{+YYYY.MM.dd}"
- }
-
- # 调试时可以输出到控制台
- # stdout { codec => rubydebug }
- }
复制代码 kibana/config/kibana.yml:- server.host: "0.0.0.0"
- server.name: kibana
- elasticsearch.hosts: ["http://elasticsearch:9200"]
- i18n.locale: "zh-CN"
复制代码 3.5 启动
- # 设置ES数据目录权限
- chown -R 1000:1000 elasticsearch/data
- # 调整系统参数(ES需要)
- sysctl -w vm.max_map_count=262144
- echo "vm.max_map_count=262144" >> /etc/sysctl.conf
- # 启动
- docker-compose up -d
- # 查看状态
- docker-compose ps
- # 查看日志
- docker-compose logs -f
复制代码 3.6 验证
- # ES健康检查
- curl http://localhost:9200/_cluster/health?pretty
- # Kibana访问
- # 浏览器打开 http://你的IP:5601
复制代码 四、配置日志收集
4.1 安装Filebeat
在需要收集日志的服务器上安装:- # CentOS
- rpm -ivh https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.11.0-x86_64.rpm
- # Ubuntu
- wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.11.0-amd64.deb
- dpkg -i filebeat-8.11.0-amd64.deb
- # Docker方式
- docker pull docker.elastic.co/beats/filebeat:8.11.0
复制代码 4.2 配置Filebeat
/etc/filebeat/filebeat.yml:- filebeat.inputs:
- # 收集应用日志
- - type: log
- enabled: true
- paths:
- - /var/log/app/*.log
- fields:
- app: my-app
- env: prod
- fields_under_root: true
-
- # 多行日志合并(Java堆栈)
- multiline.pattern: '^\d{4}-\d{2}-\d{2}'
- multiline.negate: true
- multiline.match: after
-
- # 收集Nginx日志
- - type: log
- enabled: true
- paths:
- - /var/log/nginx/access.log
- fields:
- app: nginx
- type: access
- fields_under_root: true
- # 收集系统日志
- - type: log
- enabled: true
- paths:
- - /var/log/messages
- - /var/log/secure
- fields:
- app: system
- fields_under_root: true
- # 输出到Logstash
- output.logstash:
- hosts: ["ELK服务器IP:5044"]
- # 如果直接输出到ES
- # output.elasticsearch:
- # hosts: ["ELK服务器IP:9200"]
- # index: "filebeat-%{+yyyy.MM.dd}"
- # 日志处理器
- processors:
- - add_host_metadata: ~
- - add_cloud_metadata: ~
复制代码 4.3 启动Filebeat
- # 测试配置
- filebeat test config
- # 启动
- systemctl enable filebeat
- systemctl start filebeat
- # 查看状态
- systemctl status filebeat
复制代码 五、Kibana使用指南
5.1 创建索引模式
- 打开Kibana:http://IP:5601
- 进入 Stack Management → Index Patterns
- 点击 Create index pattern
- 输入 app-logs-*
- 选择时间字段 @timestamp
5.2 日志搜索(Discover)
进入 Discover:- # 搜索包含ERROR的日志
- message: ERROR
- # 搜索特定应用
- app: my-app AND level: ERROR
- # 时间范围内的错误
- level: ERROR AND @timestamp >= "2024-01-01"
- # 组合查询
- (level: ERROR OR level: WARN) AND app: my-app AND NOT message: "expected error"
复制代码 5.3 常用搜索技巧
需求查询语法包含关键词message: "OutOfMemory"排除关键词NOT message: "health check"某个字段等于level: ERROR某个字段存在_exists_: traceId正则匹配message: /.*Exception.*/范围查询response_time: [100 TO 500]通配符message: *timeout*5.4 创建可视化图表
场景:统计每小时的错误数量
- 进入 Visualize → Create visualization
- 选择 Lens 或 Aggregation based → Line
- 配置:
- Y轴:Count
- X轴:@timestamp(Date Histogram,间隔1小时)
- 过滤:level: ERROR
场景:错误类型分布饼图
- 选择 Pie Chart
- Slice by:Terms → error_type
- Size by:Count
5.5 创建Dashboard
把多个可视化组合成仪表盘:
- 进入 Dashboard → Create dashboard
- 添加已创建的可视化
- 调整布局
- 保存
推荐的Dashboard组成:
- 日志总量趋势图
- 错误数量趋势图
- 错误类型分布
- 响应时间分布
- 各服务日志量对比
- 最近错误列表
六、Logstash高级配置
6.1 解析Nginx日志
- input {
- beats {
- port => 5044
- }
- }
- filter {
- if [fields][type] == "nginx-access" {
- grok {
- match => { "message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:bytes} "%{DATA:referrer}" "%{DATA:user_agent}"' }
- }
-
- date {
- match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
- target => "@timestamp"
- }
-
- geoip {
- source => "client_ip"
- target => "geoip"
- }
-
- useragent {
- source => "user_agent"
- target => "ua"
- }
-
- mutate {
- convert => {
- "status" => "integer"
- "bytes" => "integer"
- }
- remove_field => ["message", "timestamp"]
- }
- }
- }
- output {
- if [fields][type] == "nginx-access" {
- elasticsearch {
- hosts => ["elasticsearch:9200"]
- index => "nginx-access-%{+YYYY.MM.dd}"
- }
- }
- }
复制代码 6.2 解析Java日志
- filter {
- if [fields][app] == "java-app" {
- # 解析标准格式:2024-01-01 12:00:00.123 [thread] LEVEL class - message
- grok {
- match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:level} %{DATA:class} - %{GREEDYDATA:log_message}" }
- }
-
- date {
- match => ["timestamp", "yyyy-MM-dd HH:mm:ss.SSS"]
- target => "@timestamp"
- }
-
- # 提取异常类型
- if [level] == "ERROR" {
- grok {
- match => { "log_message" => "%{DATA:exception_class}:%{GREEDYDATA:exception_message}" }
- tag_on_failure => []
- }
- }
- }
- }
复制代码 6.3 日志脱敏
- filter {
- # 脱敏手机号
- mutate {
- gsub => [
- "message", "1[3-9]\d{9}", "***phone***"
- ]
- }
-
- # 脱敏身份证
- mutate {
- gsub => [
- "message", "\d{17}[\dXx]", "***idcard***"
- ]
- }
-
- # 脱敏银行卡
- mutate {
- gsub => [
- "message", "\d{16,19}", "***bankcard***"
- ]
- }
- }
复制代码 七、告警配置
7.1 使用Elastalert
- # 安装
- pip install elastalert2
- # 配置 config.yaml
- rules_folder: /opt/elastalert/rules
- es_host: localhost
- es_port: 9200
复制代码 告警规则 rules/error_alert.yaml:- name: "应用错误告警"
- type: frequency
- index: app-logs-*
- # 5分钟内超过10次ERROR就告警
- num_events: 10
- timeframe:
- minutes: 5
- filter:
- - term:
- level: ERROR
- alert:
- - dingtalk
- dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=xxx"
- dingtalk_msgtype: "markdown"
复制代码 7.2 使用Kibana自带告警(需订阅)
进入 Stack Management → Rules and Connectors
八、异地服务器日志收集
公司有多个机房,服务器分布在不同网络,怎么把日志集中到一个ELK?
方案1:公网暴露端口(不推荐)
把Logstash的5044端口暴露到公网,安全风险大。
方案2:VPN
在各机房部署VPN客户端,打通到ELK服务器的网络。
缺点:
方案3:SD-WAN组网(我在用的)
用星空组网把所有服务器组到一个虚拟网络:- # 在ELK服务器
- curl -sSL https://down.starvpn.cn/linux.sh | bash
- xkcli login your_token && xkcli up
- # 虚拟IP: 192.168.188.10
- # 在各机房的服务器
- # 同样操作,获得虚拟IP
复制代码 Filebeat配置直接用虚拟IP:- output.logstash:
- hosts: ["192.168.188.10:5044"]
复制代码 效果:
- 配置简单,5分钟搞定
- 延迟低(P2P直连)
- 稳定,用了3个月没断过
- 安全,流量加密
九、性能优化
9.1 ES优化
索引生命周期管理(ILM):- PUT _ilm/policy/logs-policy
- {
- "policy": {
- "phases": {
- "hot": {
- "actions": {
- "rollover": {
- "max_size": "50GB",
- "max_age": "1d"
- }
- }
- },
- "warm": {
- "min_age": "7d",
- "actions": {
- "shrink": { "number_of_shards": 1 },
- "forcemerge": { "max_num_segments": 1 }
- }
- },
- "delete": {
- "min_age": "30d",
- "actions": { "delete": {} }
- }
- }
- }
- }
复制代码 9.2 Logstash优化
- # logstash.yml
- pipeline.workers: 4 # CPU核心数
- pipeline.batch.size: 1000 # 批处理大小
- pipeline.batch.delay: 50 # 批处理等待时间(ms)
复制代码 9.3 Filebeat优化
- # 批量发送output.logstash:
- hosts: ["192.168.188.10:5044"] bulk_max_size: 2048 # 队列设置queue.mem: events: 4096 flush.min_events: 2048 flush.timeout: 1s
复制代码 十、效果对比
指标传统方式ELK查日志耗时10-30分钟几秒跨服务器查询手动逐台查一次搜索全部历史日志需要下载直接查统计分析写脚本点几下告警无自动实际收益:
- 排查问题效率提升10倍
- 终于能回答"昨天有多少报错"了
- 定位问题从"猜"变成"查"
总结
ELK搭建不难,难的是:
- 日志格式要规范(JSON最好)
- 字段要统一(各应用约定好)
- 索引要管理(不然磁盘爆炸)
- 网络要打通(异地服务器用组网)
建议先从单个应用开始,跑顺了再推广到其他应用。
有问题评论区交流~
[code][/code]
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |