MySQL问题排查-数据不一致处理
大约 6 分钟
MySQL问题排查-数据不一致处理
业务场景引入
数据不一致是分布式MySQL环境中的常见问题:
- 主从数据不一致:从库数据滞后或与主库不同步
- 应用数据不一致:缓存与数据库数据不匹配
- 事务数据不一致:并发操作导致的数据错乱
- 跨库数据不一致:分库分表环境下的数据同步问题
主从复制不一致
主从状态检查
-- 主库状态
SHOW MASTER STATUS;
-- 从库状态
SHOW SLAVE STATUS\G
-- 关键指标检查
SELECT
'Slave_IO_Running' as metric,
'Yes' as expected_value,
(SELECT Slave_IO_Running FROM (SHOW SLAVE STATUS) s) as actual_value
UNION ALL
SELECT
'Slave_SQL_Running',
'Yes',
(SELECT Slave_SQL_Running FROM (SHOW SLAVE STATUS) s)
UNION ALL
SELECT
'Seconds_Behind_Master',
'< 60',
COALESCE((SELECT Seconds_Behind_Master FROM (SHOW SLAVE STATUS) s), 'NULL');
数据一致性检查
#!/usr/bin/env python3
import mysql.connector
import hashlib
class MySQLConsistencyChecker:
def __init__(self, master_config, slave_config):
self.master_config = master_config
self.slave_config = slave_config
def get_table_checksum(self, config, table):
"""计算表数据校验和"""
conn = mysql.connector.connect(**config)
cursor = conn.cursor()
try:
# 获取行数和校验和
cursor.execute(f"""
SELECT
COUNT(*) as row_count,
COALESCE(SUM(CRC32(CONCAT_WS('|', *))), 0) as checksum
FROM {table}
""")
result = cursor.fetchone()
return {'row_count': result[0], 'checksum': result[1]}
finally:
cursor.close()
conn.close()
def check_table_consistency(self, database, table):
"""检查单表一致性"""
print(f"检查表 {database}.{table}...")
# 使用数据库名
master_config = {**self.master_config, 'database': database}
slave_config = {**self.slave_config, 'database': database}
master_checksum = self.get_table_checksum(master_config, table)
slave_checksum = self.get_table_checksum(slave_config, table)
consistent = (master_checksum['row_count'] == slave_checksum['row_count'] and
master_checksum['checksum'] == slave_checksum['checksum'])
if consistent:
print(f" ✓ 数据一致")
else:
print(f" ✗ 数据不一致:")
print(f" 主库行数: {master_checksum['row_count']}, 从库行数: {slave_checksum['row_count']}")
print(f" 主库校验和: {master_checksum['checksum']}, 从库校验和: {slave_checksum['checksum']}")
return consistent
# 使用示例
checker = MySQLConsistencyChecker(
master_config={'host': '192.168.1.10', 'user': 'root', 'password': 'password'},
slave_config={'host': '192.168.1.11', 'user': 'root', 'password': 'password'}
)
# 检查表一致性
checker.check_table_consistency('ecommerce', 'users')
checker.check_table_consistency('ecommerce', 'orders')
主从同步修复
#!/bin/bash
# 主从复制修复脚本
MASTER_HOST="192.168.1.10"
SLAVE_HOST="192.168.1.11"
MYSQL_USER="root"
MYSQL_PASS="password"
# 检查复制状态
check_replication_status() {
echo "=== 检查复制状态 ==="
mysql -h${SLAVE_HOST} -u${MYSQL_USER} -p${MYSQL_PASS} -e "
SELECT
Slave_IO_Running,
Slave_SQL_Running,
Seconds_Behind_Master,
Last_Error
FROM (SHOW SLAVE STATUS) s;
"
}
# 修复复制错误
fix_replication() {
echo "=== 修复复制 ==="
# 停止复制
mysql -h${SLAVE_HOST} -u${MYSQL_USER} -p${MYSQL_PASS} -e "STOP SLAVE;"
# 获取主库状态
MASTER_STATUS=$(mysql -h${MASTER_HOST} -u${MYSQL_USER} -p${MYSQL_PASS} -e "SHOW MASTER STATUS")
MASTER_FILE=$(echo "$MASTER_STATUS" | tail -1 | awk '{print $1}')
MASTER_POS=$(echo "$MASTER_STATUS" | tail -1 | awk '{print $2}')
# 重新配置复制
mysql -h${SLAVE_HOST} -u${MYSQL_USER} -p${MYSQL_PASS} << EOF
CHANGE MASTER TO
MASTER_HOST='${MASTER_HOST}',
MASTER_USER='repl',
MASTER_PASSWORD='repl_password',
MASTER_LOG_FILE='${MASTER_FILE}',
MASTER_LOG_POS=${MASTER_POS};
START SLAVE;
EOF
echo "复制已重新配置"
# 检查结果
sleep 5
check_replication_status
}
# 跳过复制错误
skip_replication_error() {
echo "=== 跳过复制错误 ==="
mysql -h${SLAVE_HOST} -u${MYSQL_USER} -p${MYSQL_PASS} << EOF
STOP SLAVE;
SET GLOBAL SQL_SLAVE_SKIP_COUNTER = 1;
START SLAVE;
EOF
echo "已跳过一个复制错误"
check_replication_status
}
case "${1:-check}" in
"check")
check_replication_status
;;
"fix")
fix_replication
;;
"skip")
skip_replication_error
;;
*)
echo "Usage: $0 {check|fix|skip}"
;;
esac
事务一致性问题
并发事务检查
-- 检查活跃事务
SELECT
trx_id,
trx_state,
trx_started,
TIMESTAMPDIFF(SECOND, trx_started, NOW()) as duration_sec,
trx_rows_locked,
trx_mysql_thread_id,
LEFT(trx_query, 100) as query_snippet
FROM information_schema.innodb_trx
ORDER BY trx_started;
-- 检查锁等待
SELECT
r.trx_id as waiting_trx,
b.trx_id as blocking_trx,
w.requesting_lock_id,
w.blocking_lock_id
FROM information_schema.innodb_lock_waits w
JOIN information_schema.innodb_trx r ON w.requesting_trx_id = r.trx_id
JOIN information_schema.innodb_trx b ON w.blocking_trx_id = b.trx_id;
-- 检查死锁
SHOW ENGINE INNODB STATUS;
数据完整性验证
#!/usr/bin/env python3
import mysql.connector
class DataIntegrityChecker:
def __init__(self, db_config):
self.db_config = db_config
def check_foreign_keys(self):
"""检查外键完整性"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
# 检查订单表的用户外键
cursor.execute("""
SELECT COUNT(*) as orphan_orders
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE o.user_id IS NOT NULL AND u.id IS NULL
""")
orphan_count = cursor.fetchone()[0]
if orphan_count > 0:
print(f"发现 {orphan_count} 个孤儿订单记录")
else:
print("外键完整性检查通过")
cursor.close()
conn.close()
return orphan_count == 0
def check_business_constraints(self):
"""检查业务约束"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
violations = []
# 检查账户余额不能为负
cursor.execute("SELECT COUNT(*) FROM accounts WHERE balance < 0")
negative_balance = cursor.fetchone()[0]
if negative_balance > 0:
violations.append(f"负余额账户: {negative_balance}")
# 检查订单金额必须大于0
cursor.execute("SELECT COUNT(*) FROM orders WHERE total_amount <= 0")
invalid_amount = cursor.fetchone()[0]
if invalid_amount > 0:
violations.append(f"无效金额订单: {invalid_amount}")
# 检查邮箱格式
cursor.execute("SELECT COUNT(*) FROM users WHERE email NOT REGEXP '^[^@]+@[^@]+\.[^@]+$'")
invalid_email = cursor.fetchone()[0]
if invalid_email > 0:
violations.append(f"无效邮箱格式: {invalid_email}")
cursor.close()
conn.close()
if violations:
print("业务约束违规:")
for violation in violations:
print(f" - {violation}")
else:
print("业务约束检查通过")
return len(violations) == 0
# 使用示例
checker = DataIntegrityChecker({
'host': 'localhost',
'user': 'app_user',
'password': 'password',
'database': 'ecommerce'
})
checker.check_foreign_keys()
checker.check_business_constraints()
缓存一致性问题
缓存同步检查
#!/usr/bin/env python3
import mysql.connector
import redis
import json
class CacheConsistencyChecker:
def __init__(self, db_config, redis_config):
self.db_config = db_config
self.redis_client = redis.Redis(**redis_config)
def check_user_cache(self, user_id):
"""检查用户缓存一致性"""
# 获取数据库数据
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
db_user = cursor.fetchone()
cursor.close()
conn.close()
# 获取缓存数据
cache_key = f"user:{user_id}"
cached_data = self.redis_client.get(cache_key)
if not cached_data:
if db_user:
print(f"用户 {user_id}: 缓存缺失")
return False
else:
print(f"用户 {user_id}: 数据库和缓存都不存在")
return True
try:
cache_user = json.loads(cached_data.decode())
except:
print(f"用户 {user_id}: 缓存数据格式错误")
return False
if not db_user:
print(f"用户 {user_id}: 数据库记录不存在但缓存存在")
return False
# 比较关键字段
if (db_user['username'] != cache_user.get('username') or
db_user['email'] != cache_user.get('email')):
print(f"用户 {user_id}: 数据不一致")
print(f" 数据库: {db_user['username']}, {db_user['email']}")
print(f" 缓存: {cache_user.get('username')}, {cache_user.get('email')}")
return False
print(f"用户 {user_id}: 数据一致")
return True
def sync_user_cache(self, user_id):
"""同步用户缓存"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user_data = cursor.fetchone()
cursor.close()
conn.close()
cache_key = f"user:{user_id}"
if user_data:
# 更新缓存
cache_data = json.dumps(user_data, default=str)
self.redis_client.setex(cache_key, 3600, cache_data)
print(f"用户 {user_id} 缓存已同步")
else:
# 删除缓存
self.redis_client.delete(cache_key)
print(f"用户 {user_id} 缓存已删除")
# 使用示例
cache_checker = CacheConsistencyChecker(
db_config={'host': 'localhost', 'user': 'app_user', 'password': 'password', 'database': 'ecommerce'},
redis_config={'host': 'localhost', 'port': 6379, 'db': 0}
)
# 检查用户缓存
for user_id in range(1, 11):
if not cache_checker.check_user_cache(user_id):
cache_checker.sync_user_cache(user_id)
修复策略
自动修复脚本
#!/bin/bash
# 数据一致性自动修复
DB_HOST="localhost"
DB_USER="admin"
DB_PASS="password"
# 修复外键约束违规
fix_foreign_key_violations() {
echo "=== 修复外键约束违规 ==="
# 删除孤儿订单
mysql -h${DB_HOST} -u${DB_USER} -p${DB_PASS} << EOF
DELETE FROM orders
WHERE user_id NOT IN (SELECT id FROM users);
EOF
echo "孤儿订单已删除"
}
# 修复业务约束违规
fix_business_constraints() {
echo "=== 修复业务约束违规 ==="
# 修复负余额账户
mysql -h${DB_HOST} -u${DB_USER} -p${DB_PASS} << EOF
UPDATE accounts SET balance = 0 WHERE balance < 0;
EOF
# 删除无效订单
mysql -h${DB_HOST} -u${DB_USER} -p${DB_PASS} << EOF
DELETE FROM orders WHERE total_amount <= 0;
EOF
echo "业务约束违规已修复"
}
# 清理缓存
clear_inconsistent_cache() {
echo "=== 清理不一致缓存 ==="
# 使用Redis CLI清理用户缓存
redis-cli EVAL "
local keys = redis.call('keys', 'user:*')
for i=1,#keys do
redis.call('del', keys[i])
end
return #keys
" 0
echo "用户缓存已清理"
}
case "${1:-all}" in
"fk")
fix_foreign_key_violations
;;
"business")
fix_business_constraints
;;
"cache")
clear_inconsistent_cache
;;
"all")
fix_foreign_key_violations
fix_business_constraints
clear_inconsistent_cache
;;
*)
echo "Usage: $0 {fk|business|cache|all}"
;;
esac
总结与最佳实践
数据一致性保障策略
主从复制监控
- 定期检查复制状态
- 监控复制延迟
- 自动修复复制异常
事务管理
- 合理设置隔离级别
- 避免长时间事务
- 处理死锁和锁超时
缓存策略
- 实施缓存更新策略
- 设置合理的缓存过期时间
- 定期检查缓存一致性
数据验证
- 定期执行数据完整性检查
- 建立业务约束验证
- 自动修复数据不一致
数据不一致问题需要从多个层面进行预防和处理,建立完善的监控和自动修复机制。