MySQL问题排查-故障恢复实战
大约 6 分钟
MySQL问题排查-故障恢复实战
业务场景引入
MySQL故障恢复是运维工作中最关键的技能:
- 服务器宕机:硬件故障导致MySQL服务不可用
- 数据损坏:磁盘故障造成数据文件损坏
- 误操作恢复:DROP TABLE、DELETE等误操作数据恢复
- 主从切换:主库故障时的快速故障转移
故障类型分析
故障分级体系
故障响应时间
故障级别 | 响应时间 | 恢复时间 | 处理策略 |
---|---|---|---|
P0 | 5分钟 | 30分钟 | 立即处理 |
P1 | 15分钟 | 2小时 | 优先处理 |
P2 | 1小时 | 4小时 | 正常处理 |
服务启动失败恢复
启动问题诊断
#!/bin/bash
# MySQL启动失败诊断
ERROR_LOG="/var/log/mysql/error.log"
DATA_DIR="/var/lib/mysql"
diagnose_startup_failure() {
echo "=== MySQL启动失败诊断 ==="
# 1. 检查服务状态
echo "1. 服务状态:"
systemctl status mysql
# 2. 检查进程
echo -e "\n2. MySQL进程:"
ps aux | grep mysqld | grep -v grep
# 3. 检查错误日志
echo -e "\n3. 错误日志 (最近20行):"
if [ -f "$ERROR_LOG" ]; then
tail -20 "$ERROR_LOG"
else
echo "错误日志文件不存在"
fi
# 4. 检查磁盘空间
echo -e "\n4. 磁盘空间:"
df -h $DATA_DIR
# 5. 检查文件权限
echo -e "\n5. 数据目录权限:"
ls -la $DATA_DIR/ | head -10
}
# 修复常见问题
fix_startup_issues() {
echo "=== 修复启动问题 ==="
# 1. 修复权限
echo "1. 修复数据目录权限:"
chown -R mysql:mysql $DATA_DIR
chmod 750 $DATA_DIR
# 2. 清理PID文件
echo "2. 清理PID文件:"
rm -f /var/run/mysqld/mysqld.pid
# 3. 创建socket目录
echo "3. 检查socket目录:"
mkdir -p /var/run/mysqld
chown mysql:mysql /var/run/mysqld
echo "基础修复完成,请尝试重启MySQL"
}
# InnoDB恢复
innodb_recovery() {
echo "=== InnoDB恢复模式 ==="
# 备份配置
cp /etc/mysql/my.cnf /etc/mysql/my.cnf.backup
# 添加恢复参数
echo "# InnoDB恢复模式" >> /etc/mysql/my.cnf
echo "innodb_force_recovery = 1" >> /etc/mysql/my.cnf
echo "已启用InnoDB恢复模式"
echo "请尝试启动MySQL,成功后导出数据并重建"
}
case "${1:-diagnose}" in
"diagnose") diagnose_startup_failure ;;
"fix") fix_startup_issues ;;
"innodb") innodb_recovery ;;
*) echo "Usage: $0 {diagnose|fix|innodb}" ;;
esac
数据文件修复
#!/bin/bash
# 数据文件损坏修复
DATA_DIR="/var/lib/mysql"
# 检查表空间
check_tablespace() {
echo "=== 检查表空间文件 ==="
if [ -f "$DATA_DIR/ibdata1" ]; then
echo "✓ 系统表空间存在"
ls -lh "$DATA_DIR/ibdata1"
else
echo "✗ 系统表空间文件缺失"
fi
echo -e "\nRedo日志文件:"
ls -lh "$DATA_DIR/ib_logfile"* 2>/dev/null || echo "Redo日志不存在"
echo -e "\n独立表空间文件数量:"
find "$DATA_DIR" -name "*.ibd" | wc -l
}
# 修复损坏表
repair_tables() {
local database=${1:-ecommerce}
echo "=== 修复数据库 $database ==="
# 检查并修复表
mysql -e "
SELECT CONCAT('REPAIR TABLE ', table_schema, '.', table_name, ';') as repair_sql
FROM information_schema.tables
WHERE table_schema = '$database'
AND engine = 'MyISAM'
" | grep -v repair_sql | mysql
# 检查InnoDB表
mysql -e "CHECK TABLE $database.*"
}
case "${1:-check}" in
"check") check_tablespace ;;
"repair") repair_tables "$2" ;;
*) echo "Usage: $0 {check|repair}" ;;
esac
主从故障切换
自动切换脚本
#!/usr/bin/env python3
import mysql.connector
import time
import logging
class MySQLFailover:
def __init__(self, master_config, slave_configs):
self.master_config = master_config
self.slave_configs = slave_configs
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def check_master_health(self):
"""检查主库健康"""
try:
conn = mysql.connector.connect(
**self.master_config,
connection_timeout=5
)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
cursor.close()
conn.close()
return result[0] == 1
except:
return False
def get_slave_lag(self, slave_config):
"""获取从库延迟"""
try:
conn = mysql.connector.connect(**slave_config)
cursor = conn.cursor()
cursor.execute("SHOW SLAVE STATUS")
result = cursor.fetchone()
# Seconds_Behind_Master
lag = result[32] if result and result[32] is not None else float('inf')
cursor.close()
conn.close()
return lag
except:
return float('inf')
def select_best_slave(self):
"""选择最佳从库"""
best_slave = None
min_lag = float('inf')
for slave_config in self.slave_configs:
lag = self.get_slave_lag(slave_config)
self.logger.info(f"从库 {slave_config['host']} 延迟: {lag}秒")
if lag < min_lag:
min_lag = lag
best_slave = slave_config
return best_slave
def promote_slave(self, slave_config):
"""提升从库为主库"""
try:
conn = mysql.connector.connect(**slave_config)
cursor = conn.cursor()
# 停止复制
cursor.execute("STOP SLAVE")
cursor.execute("RESET SLAVE ALL")
# 设置为可写
cursor.execute("SET GLOBAL read_only = 0")
cursor.close()
conn.close()
self.logger.info(f"从库 {slave_config['host']} 已提升为主库")
return True
except Exception as e:
self.logger.error(f"提升失败: {e}")
return False
def execute_failover(self):
"""执行故障切换"""
self.logger.info("开始故障切换...")
# 选择最佳从库
best_slave = self.select_best_slave()
if not best_slave:
self.logger.error("无可用从库")
return False
# 提升为主库
if self.promote_slave(best_slave):
self.logger.info("故障切换成功")
return True
return False
def monitor(self):
"""持续监控"""
failed_checks = 0
while True:
if self.check_master_health():
failed_checks = 0
else:
failed_checks += 1
self.logger.warning(f"主库检查失败 {failed_checks}/3")
if failed_checks >= 3:
self.logger.critical("执行故障切换")
if self.execute_failover():
break
time.sleep(10)
# 使用示例
master_config = {'host': '192.168.1.10', 'user': 'root', 'password': 'password'}
slave_configs = [
{'host': '192.168.1.11', 'user': 'root', 'password': 'password'},
{'host': '192.168.1.12', 'user': 'root', 'password': 'password'}
]
failover = MySQLFailover(master_config, slave_configs)
failover.monitor()
误操作恢复
binlog数据恢复
#!/bin/bash
# 误操作数据恢复
DATA_DIR="/var/lib/mysql"
RECOVERY_DIR="/backup/recovery"
recover_from_binlog() {
local start_time=$1
local end_time=$2
local database=$3
echo "=== 从binlog恢复数据 ==="
echo "时间范围: $start_time 到 $end_time"
echo "数据库: $database"
mkdir -p $RECOVERY_DIR
# 获取binlog文件列表
mysql -e "SHOW BINARY LOGS" | awk '{print $1}' | grep -v "Log_name" > /tmp/binlogs.txt
# 生成恢复SQL
recovery_sql="$RECOVERY_DIR/recovery_$(date +%Y%m%d_%H%M%S).sql"
while read binlog_file; do
echo "处理 $binlog_file..."
mysqlbinlog \
--start-datetime="$start_time" \
--stop-datetime="$end_time" \
--database="$database" \
"$DATA_DIR/$binlog_file" >> "$recovery_sql" 2>/dev/null
done < /tmp/binlogs.txt
rm -f /tmp/binlogs.txt
echo "恢复SQL已生成: $recovery_sql"
echo "请检查内容后执行: mysql < $recovery_sql"
}
# 全备+binlog恢复
recover_full_backup() {
local backup_file=$1
local incident_time=$2
local database=$3
echo "=== 全备+binlog恢复 ==="
# 创建临时数据库
temp_db="${database}_recovery"
mysql -e "CREATE DATABASE IF NOT EXISTS $temp_db"
# 恢复全备
if [[ "$backup_file" == *.gz ]]; then
zcat "$backup_file" | sed "s/$database/$temp_db/g" | mysql
else
sed "s/$database/$temp_db/g" "$backup_file" | mysql
fi
echo "全备已恢复到临时库: $temp_db"
# 应用binlog
recover_from_binlog "$(date -d 'yesterday' '+%Y-%m-%d %H:%M:%S')" "$incident_time" "$temp_db"
echo "请验证临时库数据后手动合并到生产库"
}
case "${1:-help}" in
"binlog")
if [ $# -lt 4 ]; then
echo "Usage: $0 binlog <start_time> <end_time> <database>"
exit 1
fi
recover_from_binlog "$2" "$3" "$4"
;;
"full")
if [ $# -lt 4 ]; then
echo "Usage: $0 full <backup_file> <incident_time> <database>"
exit 1
fi
recover_full_backup "$2" "$3" "$4"
;;
*)
echo "MySQL误操作恢复工具"
echo "Usage: $0 {binlog|full}"
echo ""
echo "binlog - 从binlog恢复指定时间段数据"
echo "full - 从全备+binlog恢复"
;;
esac
快速数据导出
#!/bin/bash
# 应急数据导出
emergency_export() {
local database=${1:-ecommerce}
local export_dir="/backup/emergency_$(date +%Y%m%d_%H%M%S)"
echo "=== 应急数据导出 ==="
echo "数据库: $database"
echo "导出目录: $export_dir"
mkdir -p "$export_dir"
# 获取表列表
mysql -e "SELECT table_name FROM information_schema.tables WHERE table_schema = '$database'" \
| grep -v table_name > /tmp/tables.txt
# 逐表导出
while read table; do
if [ -n "$table" ]; then
echo "导出表: $table"
# 尝试mysqldump
mysqldump --single-transaction "$database" "$table" > "$export_dir/${table}.sql" 2>/dev/null
if [ $? -eq 0 ]; then
echo " ✓ $table SQL导出成功"
else
echo " ✗ $table SQL导出失败,尝试CSV"
# 尝试CSV导出
mysql -e "SELECT * FROM $database.$table" > "$export_dir/${table}.csv" 2>/dev/null
if [ $? -eq 0 ]; then
echo " ✓ $table CSV导出成功"
else
echo " ✗ $table 导出完全失败"
fi
fi
fi
done < /tmp/tables.txt
rm -f /tmp/tables.txt
echo "应急导出完成: $export_dir"
ls -lh "$export_dir"
}
emergency_export "$1"
监控与告警
故障检测脚本
#!/usr/bin/env python3
import mysql.connector
import time
import smtplib
from email.mime.text import MIMEText
class MySQLMonitor:
def __init__(self, db_config, alert_config):
self.db_config = db_config
self.alert_config = alert_config
def check_mysql_status(self):
"""检查MySQL状态"""
try:
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
conn.close()
return True
except:
return False
def send_alert(self, message):
"""发送告警"""
try:
msg = MIMEText(message)
msg['Subject'] = 'MySQL Alert'
msg['From'] = self.alert_config['from']
msg['To'] = self.alert_config['to']
smtp = smtplib.SMTP(self.alert_config['smtp_server'])
smtp.send_message(msg)
smtp.quit()
print(f"告警已发送: {message}")
except Exception as e:
print(f"告警发送失败: {e}")
def monitor(self):
"""持续监控"""
while True:
if not self.check_mysql_status():
self.send_alert("MySQL服务异常,请立即检查!")
time.sleep(60) # 每分钟检查一次
# 配置
db_config = {'host': 'localhost', 'user': 'monitor', 'password': 'password'}
alert_config = {
'smtp_server': 'smtp.company.com',
'from': 'alert@company.com',
'to': 'admin@company.com'
}
monitor = MySQLMonitor(db_config, alert_config)
monitor.monitor()
总结与最佳实践
故障恢复原则
- 快速响应:建立标准化应急流程
- 准确诊断:通过日志快速定位问题
- 优先恢复:先恢复服务,再处理数据
- 充分验证:确保恢复后数据完整性
- 经验总结:建立故障知识库
关键准备工作
- 完备备份:定期全备+实时binlog
- 监控体系:实时监控+自动告警
- 应急预案:标准化处理流程
- 工具准备:自动化恢复脚本
- 团队培训:定期故障演练
MySQL故障恢复需要平时充分准备,关键时刻才能快速准确地处理问题。