日志数据收集策略
大约 5 分钟ELK日志收集技术日志收集数据收集
日志数据收集策略
日志收集概述
日志数据收集是ELK Stack的核心功能之一,通过合理设计和实施日志收集策略,可以有效监控系统状态、分析用户行为、排查问题和保障系统安全。一个完善的日志收集策略需要考虑数据源类型、收集方式、处理流程和存储管理等多个方面。
日志收集架构
1. 基本收集架构
[应用服务器] --> [Filebeat] --> [Logstash] --> [Elasticsearch] --> [Kibana]
2. 分布式收集架构
[应用服务器1] --> [Filebeat1] --\
[应用服务器2] --> [Filebeat2] ---\--> [Logstash] --> [Elasticsearch] --> [Kibana]
[应用服务器3] --> [Filebeat3] --/
3. 直接收集架构
[应用服务器] --> [Filebeat] --> [Elasticsearch] --> [Kibana]
数据源类型
1. 应用日志
Web服务器日志:
- Nginx访问日志:记录HTTP请求信息
- Nginx错误日志:记录服务器错误信息
- Apache访问日志:记录HTTP请求信息
- Apache错误日志:记录服务器错误信息
应用程序日志:
- Java应用日志:使用Log4j、Logback等框架生成
- Python应用日志:使用logging模块生成
- Node.js应用日志:使用winston、bunyan等库生成
数据库日志:
- MySQL慢查询日志:记录执行时间较长的SQL语句
- MySQL错误日志:记录数据库错误信息
- PostgreSQL日志:记录数据库操作信息
2. 系统日志
Linux系统日志:
- syslog:系统标准日志
- auth.log:认证日志
- kern.log:内核日志
- messages:通用系统消息
Windows事件日志:
- 应用程序日志:应用程序相关事件
- 系统日志:系统组件相关事件
- 安全日志:安全相关事件
3. 容器日志
Docker日志:
- 容器标准输出:应用直接输出到stdout/stderr
- 容器日志文件:容器内应用生成的日志文件
Kubernetes日志:
- Pod日志:Kubernetes Pod中的容器日志
- 节点日志:Kubernetes节点系统日志
收集策略设计
1. 收集频率策略
实时收集:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
scan_frequency: 1s # 高频扫描
定期收集:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
scan_frequency: 10s # 定期扫描
2. 数据过滤策略
按级别过滤:
filter {
if [level] == "ERROR" or [level] == "FATAL" {
# 处理错误日志
} else if [level] == "WARN" {
# 处理警告日志
} else {
# 丢弃其他级别日志
drop { }
}
}
按内容过滤:
filter {
if [message] =~ /^DEBUG/ {
drop { }
}
}
3. 数据丰富策略
添加主机信息:
processors:
- add_host_metadata: ~
添加云环境信息:
processors:
- add_cloud_metadata: ~
添加Docker信息:
processors:
- add_docker_metadata: ~
收集配置示例
1. Nginx日志收集
Filebeat配置:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/nginx/error.log
fields:
log_type: nginx
fields_under_root: true
multiline.pattern: '^\d{4}/\d{2}/\d{2}'
multiline.negate: true
multiline.match: after
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
output.logstash:
hosts: ["localhost:5044"]
Logstash配置:
input {
beats {
port => 5044
}
}
filter {
if [log_type] == "nginx" {
grok {
match => {
"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-)"
}
tag_on_failure => ["_grokparsefailure_nginx"]
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
}
mutate {
remove_field => [ "timestamp" ]
convert => {
"response" => "integer"
"bytes" => "integer"
}
}
useragent {
source => "agent"
target => "user_agent"
}
geoip {
source => "clientip"
target => "geoip"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
}
2. Java应用日志收集
Filebeat配置:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/myapp/*.log
fields:
log_type: java_app
fields_under_root: true
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after
output.logstash:
hosts: ["localhost:5044"]
Logstash配置:
input {
beats {
port => 5044
}
}
filter {
if [log_type] == "java_app" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:level} %{JAVACLASS:class} - %{GREEDYDATA:log_message}"
}
tag_on_failure => ["_grokparsefailure_java"]
}
date {
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]
target => "@timestamp"
}
mutate {
remove_field => [ "timestamp" ]
}
# 提取异常信息
if [log_message] =~ /Exception/ {
mutate {
add_tag => [ "exception" ]
}
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "java-app-%{+YYYY.MM.dd}"
}
}
3. 系统日志收集
Filebeat配置:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/syslog
- /var/log/auth.log
- /var/log/messages
fields:
log_type: system
fields_under_root: true
# 启用系统模块
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
output.logstash:
hosts: ["localhost:5044"]
Logstash配置:
input {
beats {
port => 5044
}
}
filter {
if [log_type] == "system" {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"
}
tag_on_failure => ["_grokparsefailure_syslog"]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
target => "@timestamp"
}
mutate {
remove_field => [ "syslog_timestamp" ]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
性能优化策略
1. 批量处理优化
Filebeat批量配置:
output.logstash:
bulk_max_size: 2048
flush_interval: 1s
worker: 2
Logstash批量配置:
output {
elasticsearch {
hosts => ["localhost:9200"]
flush_size => 1000
idle_flush_time => 5
}
}
2. 资源限制优化
Filebeat资源限制:
max_procs: 2
queue.mem:
events: 4096
flush.min_events: 2048
flush.timeout: 1s
Logstash资源限制:
pipeline.workers: 4
pipeline.batch.size: 250
pipeline.batch.delay: 50
3. 网络优化
连接池配置:
output.logstash:
loadbalance: true
hosts: ["logstash1:5044", "logstash2:5044", "logstash3:5044"]
安全收集策略
1. 数据传输安全
SSL/TLS配置:
output.logstash:
hosts: ["localhost:5044"]
ssl.enabled: true
ssl.certificate_authorities: ["/etc/filebeat/certs/logstash-ca.crt"]
ssl.certificate: "/etc/filebeat/certs/filebeat.crt"
ssl.key: "/etc/filebeat/certs/filebeat.key"
2. 数据过滤安全
敏感信息过滤:
filter {
# 移除敏感字段
mutate {
remove_field => [ "password", "credit_card", "ssn" ]
}
# 遮蔽敏感信息
mutate {
gsub => [
"message", "\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b", "XXXX-XXXX-XXXX-XXXX"
]
}
}
3. 访问控制
用户认证:
output.elasticsearch:
hosts: ["localhost:9200"]
username: "filebeat_internal"
password: "${FILEBEAT_PASSWORD}"
监控与维护
1. 收集监控
Filebeat监控:
monitoring.enabled: true
monitoring.elasticsearch:
hosts: ["localhost:9200"]
username: "beats_system"
password: "password"
Logstash监控:
input {
beats {
port => 5044
client_inactivity_timeout => 3600
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
# 监控输出
stdout {
codec => rubydebug
}
}
2. 日志轮转
系统日志轮转:
# /etc/logrotate.d/nginx
/var/log/nginx/*.log {
daily
missingok
rotate 52
compress
delaycompress
notifempty
create 644 nginx nginx
sharedscripts
postrotate
[ -f /var/run/nginx.pid ] && kill -USR1 `cat /var/run/nginx.pid`
endscript
}
3. 数据清理
Elasticsearch索引清理:
# 删除7天前的索引
curator --config /etc/curator/curator.yml /etc/curator/delete_indices.yml
# delete_indices.yml
actions:
1:
action: delete_indices
description: "Delete indices older than 7 days"
options:
ignore_empty_list: True
timeout_override:
continue_if_exception: False
disable_action: False
filters:
- filtertype: pattern
kind: prefix
value: logstash-
- filtertype: age
source: creation_date
direction: older
unit: days
unit_count: 7
最佳实践
1. 配置管理
模块化配置:
# 按应用类型组织配置
filebeat.config.inputs:
enabled: true
path: ${path.config}/inputs.d/*.yml
reload.enabled: true
reload.period: 10s
环境差异化:
# 根据环境加载不同配置
filebeat.inputs:
- type: log
paths:
- /var/log/${ENVIRONMENT}/*.log
2. 性能优化
合理设置扫描频率:
# 避免过于频繁的扫描
scan_frequency: 10s
优化批量大小:
# 根据网络和处理能力调整
bulk_max_size: 1024
3. 故障处理
错误重试:
# 配置重试机制
output.elasticsearch:
retry_max_interval: 60
retry_on_conflict: 5
数据备份:
# 配置多个输出目标
output:
elasticsearch:
hosts: ["localhost:9200"]
file:
path: "/backup/logs/%{+YYYY-MM-dd}.log"
总结
日志数据收集策略的设计和实施需要综合考虑数据源类型、收集方式、处理流程和存储管理等多个方面。通过合理配置Filebeat、Logstash等组件,可以构建高效、可靠的日志收集系统。在实际应用中,需要根据具体业务需求和系统环境进行相应的优化和调整,确保日志收集的完整性和及时性,为系统监控和问题排查提供有力支持。