MySQL优化-大数据量处理
大约 10 分钟
MySQL优化-大数据量处理
业务场景引入
在大数据场景下,MySQL面临诸多挑战:
- 数据导入:每日千万级订单数据批量导入
- 历史数据处理:清理一年前的日志数据
- 报表生成:分析几亿条用户行为数据
- 数据迁移:在线业务的大表结构变更
本文将系统介绍MySQL大数据量处理的优化策略。
大数据量查询优化
分页查询优化
-- ❌ 传统分页(深度分页性能差)
SELECT * FROM orders
ORDER BY order_date DESC
LIMIT 1000000, 20;
-- ✅ 游标分页优化
-- 1. 使用主键偏移
SELECT * FROM orders
WHERE order_id > 1000000
ORDER BY order_id
LIMIT 20;
-- 2. 使用覆盖索引 + 延迟连接
SELECT o.* FROM orders o
INNER JOIN (
SELECT order_id FROM orders
ORDER BY order_date DESC
LIMIT 1000000, 20
) t ON o.order_id = t.order_id;
-- 3. 使用复合条件游标分页
SELECT * FROM orders
WHERE (order_date, order_id) < ('2024-03-15 10:30:00', 12345)
ORDER BY order_date DESC, order_id DESC
LIMIT 20;
大表JOIN优化
-- ❌ 低效的大表关联
SELECT u.username, COUNT(o.order_id) as order_count
FROM users u
LEFT JOIN orders o ON u.user_id = o.user_id
WHERE u.registration_date >= '2024-01-01'
GROUP BY u.user_id, u.username;
-- ✅ 分步骤优化
-- 1. 先过滤小结果集
CREATE TEMPORARY TABLE temp_users AS
SELECT user_id, username
FROM users
WHERE registration_date >= '2024-01-01';
CREATE INDEX idx_temp_user_id ON temp_users(user_id);
-- 2. 再进行关联
SELECT tu.username, COALESCE(oc.order_count, 0) as order_count
FROM temp_users tu
LEFT JOIN (
SELECT user_id, COUNT(*) as order_count
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY user_id
) oc ON tu.user_id = oc.user_id;
-- ✅ 使用分区表优化
-- 按月分区的订单表查询
SELECT
DATE_FORMAT(order_date, '%Y-%m') as month,
COUNT(*) as order_count,
SUM(total_amount) as revenue
FROM orders
WHERE order_date >= '2024-01-01'
AND order_date < '2024-04-01' -- 分区裁剪
GROUP BY DATE_FORMAT(order_date, '%Y-%m');
聚合查询优化
-- 大数据量统计查询优化
-- 1. 使用预聚合表
CREATE TABLE daily_order_stats (
stat_date DATE PRIMARY KEY,
order_count INT NOT NULL,
total_revenue DECIMAL(15,2) NOT NULL,
avg_order_amount DECIMAL(10,2) NOT NULL,
unique_customers INT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB;
-- 定时任务更新预聚合表
INSERT INTO daily_order_stats (stat_date, order_count, total_revenue, avg_order_amount, unique_customers)
SELECT
DATE(order_date) as stat_date,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_amount,
COUNT(DISTINCT user_id) as unique_customers
FROM orders
WHERE DATE(order_date) = DATE_SUB(CURDATE(), INTERVAL 1 DAY)
GROUP BY DATE(order_date)
ON DUPLICATE KEY UPDATE
order_count = VALUES(order_count),
total_revenue = VALUES(total_revenue),
avg_order_amount = VALUES(avg_order_amount),
unique_customers = VALUES(unique_customers),
updated_at = CURRENT_TIMESTAMP;
-- 2. 使用增量计算
-- 月度统计增量更新
INSERT INTO monthly_stats (year_month, total_orders, total_revenue)
SELECT
DATE_FORMAT(stat_date, '%Y-%m') as year_month,
SUM(order_count) as total_orders,
SUM(total_revenue) as total_revenue
FROM daily_order_stats
WHERE stat_date >= DATE_FORMAT(CURDATE(), '%Y-%m-01')
GROUP BY DATE_FORMAT(stat_date, '%Y-%m')
ON DUPLICATE KEY UPDATE
total_orders = VALUES(total_orders),
total_revenue = VALUES(total_revenue);
批量数据处理
高效批量插入
-- 1. 批量INSERT优化
-- ❌ 逐条插入
INSERT INTO products (product_name, price, category_id) VALUES ('Product1', 100.00, 1);
INSERT INTO products (product_name, price, category_id) VALUES ('Product2', 200.00, 1);
-- ... 重复千万次
-- ✅ 批量插入(每批1000-5000条)
INSERT INTO products (product_name, price, category_id) VALUES
('Product1', 100.00, 1),
('Product2', 200.00, 1),
('Product3', 300.00, 2),
-- ... 批量插入1000条
;
-- 2. LOAD DATA INFILE 大批量导入
LOAD DATA INFILE '/tmp/products.csv'
INTO TABLE products
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS
(product_name, price, category_id, @description)
SET description = NULLIF(@description, '');
-- 3. 使用存储过程批量处理
DELIMITER $$
CREATE PROCEDURE BulkInsertOrders(IN batch_size INT, IN total_batches INT)
BEGIN
DECLARE i INT DEFAULT 0;
DECLARE batch_start INT DEFAULT 1;
-- 禁用自动提交提高性能
SET autocommit = 0;
WHILE i < total_batches DO
START TRANSACTION;
-- 构造批量插入SQL
SET @sql = 'INSERT INTO orders (user_id, total_amount, order_date) VALUES ';
SET @values = '';
SET @j = 0;
WHILE @j < batch_size DO
SET @values = CONCAT(@values,
IF(@j > 0, ',', ''),
'(', FLOOR(1 + RAND() * 10000), ',',
ROUND(RAND() * 1000, 2), ',NOW())');
SET @j = @j + 1;
END WHILE;
SET @sql = CONCAT(@sql, @values);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
COMMIT;
SET i = i + 1;
SET batch_start = batch_start + batch_size;
-- 每100批输出进度
IF i % 100 = 0 THEN
SELECT CONCAT('Processed ', i, ' batches') as progress;
END IF;
END WHILE;
SET autocommit = 1;
END$$
DELIMITER ;
-- 调用存储过程
CALL BulkInsertOrders(1000, 1000); -- 插入100万条记录
批量更新优化
-- 1. 使用临时表批量更新
CREATE TEMPORARY TABLE temp_price_updates (
product_id BIGINT PRIMARY KEY,
new_price DECIMAL(10,2) NOT NULL,
update_reason VARCHAR(100)
) ENGINE=MEMORY;
-- 批量导入更新数据
LOAD DATA INFILE '/tmp/price_updates.csv'
INTO TABLE temp_price_updates
FIELDS TERMINATED BY ','
(product_id, new_price, update_reason);
-- 批量更新主表
UPDATE products p
INNER JOIN temp_price_updates t ON p.product_id = t.product_id
SET p.price = t.new_price,
p.updated_at = NOW(),
p.update_reason = t.update_reason;
-- 2. 分批次更新大表
DELIMITER $$
CREATE PROCEDURE BatchUpdateProducts(IN batch_size INT)
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE update_count INT DEFAULT 0;
DECLARE total_updated INT DEFAULT 0;
REPEAT
UPDATE products
SET price = price * 1.1,
updated_at = NOW()
WHERE updated_at < DATE_SUB(NOW(), INTERVAL 1 YEAR)
LIMIT batch_size;
SET update_count = ROW_COUNT();
SET total_updated = total_updated + update_count;
-- 提交当前批次
COMMIT;
-- 休眠避免对生产环境影响
SELECT SLEEP(0.1);
-- 输出进度
IF total_updated % (batch_size * 10) = 0 THEN
SELECT CONCAT('Updated ', total_updated, ' rows') as progress;
END IF;
UNTIL update_count = 0 END REPEAT;
SELECT CONCAT('Total updated: ', total_updated, ' rows') as final_result;
END$$
DELIMITER ;
-- 执行批量更新(每批10000条)
CALL BatchUpdateProducts(10000);
大表维护操作
在线DDL操作
-- 1. 大表添加索引(Online DDL)
-- MySQL 5.6+支持在线添加索引
ALTER TABLE orders
ADD INDEX idx_user_date (user_id, order_date),
ALGORITHM=INPLACE, LOCK=NONE;
-- 2. 大表添加列
-- 在表末尾添加列通常支持INPLACE
ALTER TABLE orders
ADD COLUMN order_source VARCHAR(50) DEFAULT 'WEB' NOT NULL,
ALGORITHM=INPLACE, LOCK=NONE;
-- 3. 修改列默认值(元数据操作)
ALTER TABLE orders
ALTER COLUMN order_source SET DEFAULT 'MOBILE',
ALGORITHM=INSTANT;
-- 4. 分批次修改列数据类型
-- 创建新表结构
CREATE TABLE orders_new LIKE orders;
ALTER TABLE orders_new MODIFY COLUMN description TEXT;
-- 分批迁移数据
DELIMITER $$
CREATE PROCEDURE MigrateOrdersTable()
BEGIN
DECLARE last_id BIGINT DEFAULT 0;
DECLARE batch_size INT DEFAULT 10000;
DECLARE row_count INT DEFAULT 0;
REPEAT
INSERT INTO orders_new
SELECT * FROM orders
WHERE order_id > last_id
ORDER BY order_id
LIMIT batch_size;
SET row_count = ROW_COUNT();
IF row_count > 0 THEN
SELECT MAX(order_id) INTO last_id FROM orders_new;
COMMIT;
SELECT CONCAT('Migrated up to order_id: ', last_id) as progress;
SELECT SLEEP(1); -- 休眠1秒
END IF;
UNTIL row_count = 0 END REPEAT;
END$$
DELIMITER ;
CALL MigrateOrdersTable();
-- 切换表名(原子操作)
RENAME TABLE orders TO orders_old, orders_new TO orders;
大数据量删除
-- 1. 分批删除历史数据
DELIMITER $$
CREATE PROCEDURE DeleteOldOrders(IN cutoff_date DATE, IN batch_size INT)
BEGIN
DECLARE delete_count INT DEFAULT 0;
DECLARE total_deleted INT DEFAULT 0;
REPEAT
DELETE FROM orders
WHERE order_date < cutoff_date
LIMIT batch_size;
SET delete_count = ROW_COUNT();
SET total_deleted = total_deleted + delete_count;
COMMIT;
-- 避免长时间持锁
SELECT SLEEP(0.5);
-- 每删除10万条记录输出进度
IF total_deleted % 100000 = 0 THEN
SELECT CONCAT('Deleted ', total_deleted, ' rows') as progress;
END IF;
UNTIL delete_count = 0 END REPEAT;
SELECT CONCAT('Total deleted: ', total_deleted, ' rows') as final_result;
END$$
DELIMITER ;
-- 删除一年前的订单数据
CALL DeleteOldOrders(DATE_SUB(CURDATE(), INTERVAL 1 YEAR), 5000);
-- 2. 使用分区删除(推荐)
-- 按月分区的表可以直接删除分区
ALTER TABLE orders_partitioned DROP PARTITION p202301;
-- 3. 创建清理任务
CREATE EVENT IF NOT EXISTS cleanup_old_data
ON SCHEDULE EVERY 1 DAY
STARTS '2024-01-01 02:00:00'
DO
BEGIN
-- 删除30天前的日志
CALL DeleteOldOrders(DATE_SUB(CURDATE(), INTERVAL 30 DAY), 1000);
-- 清理临时表
DROP TEMPORARY TABLE IF EXISTS temp_cleanup;
END;
数据导入导出优化
高性能数据导出
-- 1. 使用SELECT INTO OUTFILE
SELECT
order_id,
user_id,
total_amount,
order_date
FROM orders
WHERE order_date >= '2024-01-01'
INTO OUTFILE '/tmp/orders_export.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n';
-- 2. 分批次导出大表
DELIMITER $$
CREATE PROCEDURE ExportLargeTable(IN table_name VARCHAR(100), IN export_path VARCHAR(255))
BEGIN
DECLARE last_id BIGINT DEFAULT 0;
DECLARE batch_size INT DEFAULT 100000;
DECLARE file_counter INT DEFAULT 1;
DECLARE row_count INT DEFAULT 0;
REPEAT
SET @sql = CONCAT(
'SELECT * FROM ', table_name,
' WHERE id > ', last_id,
' ORDER BY id LIMIT ', batch_size,
' INTO OUTFILE ''', export_path, '_part_', file_counter, '.csv''',
' FIELDS TERMINATED BY '','' ENCLOSED BY ''"''',
' LINES TERMINATED BY ''\n'''
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- 获取最后一个ID
SET @sql = CONCAT(
'SELECT MAX(id) FROM (SELECT id FROM ', table_name,
' WHERE id > ', last_id, ' ORDER BY id LIMIT ', batch_size, ') t'
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET file_counter = file_counter + 1;
UNTIL row_count = 0 END REPEAT;
END$$
DELIMITER ;
Python大数据处理脚本
import mysql.connector
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import logging
from datetime import datetime, timedelta
class MySQLBigDataProcessor:
def __init__(self, db_config):
self.db_config = db_config
self.engine = create_engine(
f"mysql+pymysql://{db_config['user']}:{db_config['password']}@"
f"{db_config['host']}:{db_config['port']}/{db_config['database']}"
)
self.setup_logging()
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def batch_insert_dataframe(self, df, table_name, batch_size=10000):
"""批量插入DataFrame数据"""
total_rows = len(df)
self.logger.info(f"Starting batch insert of {total_rows} rows to {table_name}")
for i in range(0, total_rows, batch_size):
batch_df = df.iloc[i:i+batch_size]
try:
batch_df.to_sql(
table_name,
self.engine,
if_exists='append',
index=False,
method='multi' # 使用多行INSERT
)
self.logger.info(f"Inserted batch {i//batch_size + 1}: rows {i+1} to {min(i+batch_size, total_rows)}")
except Exception as e:
self.logger.error(f"Error inserting batch {i//batch_size + 1}: {e}")
raise
def chunked_query(self, query, chunk_size=50000):
"""分块查询大结果集"""
try:
# 使用pandas的chunksize参数
chunk_iter = pd.read_sql(
query,
self.engine,
chunksize=chunk_size
)
for chunk_num, chunk_df in enumerate(chunk_iter):
self.logger.info(f"Processing chunk {chunk_num + 1}, size: {len(chunk_df)}")
yield chunk_df
except Exception as e:
self.logger.error(f"Error in chunked query: {e}")
raise
def parallel_data_processing(self, table_name, process_func, num_workers=4):
"""并行处理大表数据"""
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
# 获取表的ID范围
query = f"SELECT MIN(id) as min_id, MAX(id) as max_id FROM {table_name}"
result = pd.read_sql(query, self.engine)
min_id, max_id = result.iloc[0]['min_id'], result.iloc[0]['max_id']
# 计算每个进程处理的范围
chunk_size = (max_id - min_id) // num_workers
ranges = []
for i in range(num_workers):
start_id = min_id + i * chunk_size
end_id = min_id + (i + 1) * chunk_size if i < num_workers - 1 else max_id
ranges.append((start_id, end_id))
# 并行处理
with ProcessPoolExecutor(max_workers=num_workers) as executor:
futures = []
for start_id, end_id in ranges:
future = executor.submit(
self._process_data_range,
table_name, start_id, end_id, process_func
)
futures.append(future)
# 等待所有任务完成
for future in futures:
try:
result = future.result()
self.logger.info(f"Completed processing range: {result}")
except Exception as e:
self.logger.error(f"Error in parallel processing: {e}")
def _process_data_range(self, table_name, start_id, end_id, process_func):
"""处理指定ID范围的数据"""
query = f"SELECT * FROM {table_name} WHERE id BETWEEN {start_id} AND {end_id}"
for chunk_df in self.chunked_query(query):
processed_df = process_func(chunk_df)
# 这里可以将处理结果写入新表或文件
return f"Range {start_id}-{end_id} completed"
def optimize_table_maintenance(self, table_name):
"""大表维护优化"""
with self.engine.connect() as conn:
# 分析表
conn.execute(f"ANALYZE TABLE {table_name}")
self.logger.info(f"Analyzed table {table_name}")
# 优化表(谨慎使用,会锁表)
# conn.execute(f"OPTIMIZE TABLE {table_name}")
# 检查表状态
result = conn.execute(f"CHECK TABLE {table_name}")
for row in result:
self.logger.info(f"Table check result: {row}")
def export_large_table(self, table_name, output_dir, chunk_size=100000):
"""分块导出大表"""
import os
os.makedirs(output_dir, exist_ok=True)
# 获取总行数
count_query = f"SELECT COUNT(*) as total FROM {table_name}"
total_rows = pd.read_sql(count_query, self.engine).iloc[0]['total']
self.logger.info(f"Exporting {total_rows} rows from {table_name}")
# 分块导出
offset = 0
file_counter = 1
while offset < total_rows:
query = f"SELECT * FROM {table_name} LIMIT {chunk_size} OFFSET {offset}"
chunk_df = pd.read_sql(query, self.engine)
if len(chunk_df) == 0:
break
output_file = os.path.join(output_dir, f"{table_name}_part_{file_counter:04d}.csv")
chunk_df.to_csv(output_file, index=False)
self.logger.info(f"Exported {len(chunk_df)} rows to {output_file}")
offset += chunk_size
file_counter += 1
# 使用示例
def process_orders_data(df):
"""示例数据处理函数"""
# 添加计算列
df['order_month'] = pd.to_datetime(df['order_date']).dt.to_period('M')
df['amount_category'] = pd.cut(df['total_amount'],
bins=[0, 100, 500, 1000, float('inf')],
labels=['Low', 'Medium', 'High', 'Premium'])
return df
if __name__ == "__main__":
db_config = {
'host': 'localhost',
'port': 3306,
'user': 'username',
'password': 'password',
'database': 'ecommerce'
}
processor = MySQLBigDataProcessor(db_config)
# 示例:处理大表数据
processor.parallel_data_processing('orders', process_orders_data, num_workers=4)
# 示例:导出大表
processor.export_large_table('orders', '/tmp/orders_export')
性能监控与调优
大数据操作监控
-- 监控长时间运行的查询
SELECT
ID,
USER,
HOST,
DB,
COMMAND,
TIME as duration_seconds,
STATE,
LEFT(INFO, 100) as query_snippet
FROM information_schema.PROCESSLIST
WHERE COMMAND != 'Sleep'
AND TIME > 300 -- 超过5分钟的查询
ORDER BY TIME DESC;
-- 监控大事务
SELECT
trx_id,
trx_state,
trx_started,
TIMESTAMPDIFF(SECOND, trx_started, NOW()) as duration_seconds,
trx_rows_locked,
trx_rows_modified,
LEFT(trx_query, 100) as query_snippet
FROM information_schema.INNODB_TRX
WHERE TIMESTAMPDIFF(SECOND, trx_started, NOW()) > 60 -- 超过1分钟的事务
ORDER BY duration_seconds DESC;
-- 监控锁等待
SELECT
r.trx_id as waiting_trx_id,
r.trx_mysql_thread_id as waiting_thread,
TIMESTAMPDIFF(SECOND, r.trx_wait_started, NOW()) as wait_time_seconds,
b.trx_id as blocking_trx_id,
b.trx_mysql_thread_id as blocking_thread,
LEFT(r.trx_query, 100) as waiting_query,
LEFT(b.trx_query, 100) as blocking_query
FROM information_schema.INNODB_LOCK_WAITS w
INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.requesting_trx_id;
自动化调优脚本
#!/bin/bash
# MySQL大数据处理监控脚本
MYSQL_USER="admin"
MYSQL_PASS="password"
MYSQL_HOST="localhost"
LOG_FILE="/var/log/mysql_bigdata_monitor.log"
# 记录日志函数
log_message() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
# 检查长时间运行的查询
check_long_queries() {
long_queries=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -se "
SELECT COUNT(*) FROM information_schema.PROCESSLIST
WHERE COMMAND != 'Sleep' AND TIME > 1800
")
if [ "$long_queries" -gt 0 ]; then
log_message "WARNING: Found $long_queries queries running over 30 minutes"
# 记录详细信息
mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -e "
SELECT ID, USER, HOST, TIME, LEFT(INFO, 200) as QUERY
FROM information_schema.PROCESSLIST
WHERE COMMAND != 'Sleep' AND TIME > 1800
" >> $LOG_FILE
fi
}
# 检查大事务
check_large_transactions() {
large_trx=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -se "
SELECT COUNT(*) FROM information_schema.INNODB_TRX
WHERE trx_rows_modified > 100000 OR trx_rows_locked > 100000
")
if [ "$large_trx" -gt 0 ]; then
log_message "WARNING: Found $large_trx large transactions"
fi
}
# 检查表空间使用率
check_tablespace_usage() {
mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -e "
SELECT
table_schema,
ROUND(SUM(data_length + index_length) / 1024 / 1024 / 1024, 2) as size_gb
FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys')
GROUP BY table_schema
HAVING size_gb > 50
ORDER BY size_gb DESC
" | while read schema size; do
if [ "$size" != "size_gb" ]; then
log_message "INFO: Database $schema size: ${size}GB"
fi
done
}
# 主监控循环
main() {
log_message "Starting big data monitoring cycle"
check_long_queries
check_large_transactions
check_tablespace_usage
log_message "Big data monitoring cycle completed"
}
# 执行监控
main
总结与最佳实践
大数据处理原则
分而治之:
- 大查询分解为小查询
- 大事务分解为小事务
- 使用分区和分表技术
批量操作:
- 避免逐条处理
- 合理设置批次大小
- 添加进度监控和错误处理
索引优化:
- 为大数据查询创建合适索引
- 定期维护索引统计信息
- 避免过度索引
资源控制:
- 限制并发度避免系统过载
- 在业务低峰期执行大操作
- 监控系统资源使用情况
容错处理:
- 实现断点续传机制
- 添加重试逻辑
- 完善的日志和监控
大数据量处理需要综合考虑性能、稳定性和业务连续性,通过合理的设计和优化策略,MySQL可以有效处理大规模数据场景。