引言
在数据库管理场景中,数据迁移是常见的需求。无论是系统升级、数据整合还是跨环境部署,Mysql到Mysql的数据迁移都需要兼顾效率与准确性。本文将通过Python脚本实现这一过程,重点讨论连接管理、数据读取与写入、性能优化及异常处理等关键环节。
环境准备与依赖安装
实现Mysql数据迁移前,需确保Python环境已安装必要的依赖库。推荐使用pymysql或mysql-connector-python作为数据库连接驱动,两者均支持标准的Mysql协议。
pip install pymysql mysql-connector-python
若需处理大数据量,可额外安装tqdm库以显示进度条,提升用户体验。
数据库连接配置
迁移脚本的核心是建立源数据库与目标数据库的连接。建议将连接参数(主机、端口、用户名、密码、数据库名)封装为配置类或字典,便于维护与复用。
import pymysqlsource_config = {'host': 'source_host','port': 3306,'user': 'source_user','password': 'source_password','database': 'source_db','charset': 'utf8mb4'}target_config = {'host': 'target_host','port': 3306,'user': 'target_user','password': 'target_password','database': 'target_db','charset': 'utf8mb4'}def get_connection(config):return pymysql.connect(host=config['host'],port=config['port'],user=config['user'],password=config['password'],database=config['database'],charset=config['charset'])
数据读取与写入策略
表结构迁移
首先迁移表结构(CREATE TABLE语句),确保目标库的表定义与源库一致。可通过SHOW CREATE TABLE命令获取源表结构,并在目标库执行。
def migrate_schema(source_conn, target_conn, table_name):with source_conn.cursor() as src_cursor:src_cursor.execute(f"SHOW CREATE TABLE {table_name}")create_table_sql = src_cursor.fetchone()[1]with target_conn.cursor() as tgt_cursor:tgt_cursor.execute(f"DROP TABLE IF EXISTS {table_name}")tgt_cursor.execute(create_table_sql)target_conn.commit()
数据迁移
数据迁移需考虑批量处理与内存优化。对于大表,建议分批次读取与写入,避免单次操作数据量过大导致内存溢出。
def migrate_data(source_conn, target_conn, table_name, batch_size=1000):with source_conn.cursor() as src_cursor:src_cursor.execute(f"SELECT COUNT(*) FROM {table_name}")total_rows = src_cursor.fetchone()[0]print(f"Total rows to migrate: {total_rows}")offset = 0while offset < total_rows:src_cursor.execute(f"SELECT * FROM {table_name} LIMIT {offset}, {batch_size}")rows = src_cursor.fetchall()if not rows:break# 动态生成INSERT语句(需处理字段名与值)columns = [desc[0] for desc in src_cursor.description]placeholders = ', '.join(['%s'] * len(columns))insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"with target_conn.cursor() as tgt_cursor:tgt_cursor.executemany(insert_sql, rows)target_conn.commit()offset += len(rows)print(f"Migrated {offset}/{total_rows} rows")
性能优化与异常处理
批量操作优化
使用executemany替代循环单条插入,可显著提升性能。对于超大数据表,可结合多线程或异步IO进一步优化。
事务管理
确保每个批次的数据操作在事务中完成,避免部分失败导致数据不一致。
try:with target_conn.cursor() as tgt_cursor:tgt_cursor.executemany(insert_sql, rows)target_conn.commit()except Exception as e:target_conn.rollback()print(f"Error occurred: {e}")
错误重试机制
针对网络波动或临时锁表问题,可实现指数退避重试逻辑,提升脚本健壮性。
完整脚本示例
import pymysqlfrom time import sleepdef migrate_table(source_config, target_config, table_name, batch_size=1000, max_retries=3):source_conn = get_connection(source_config)target_conn = get_connection(target_config)try:migrate_schema(source_conn, target_conn, table_name)migrate_data(source_conn, target_conn, table_name, batch_size)except Exception as e:print(f"Migration failed for {table_name}: {e}")raisefinally:source_conn.close()target_conn.close()# 使用示例if __name__ == "__main__":source = {...} # 填充源库配置target = {...} # 填充目标库配置migrate_table(source, target, "example_table")
最佳实践与注意事项
- 字段类型兼容性:检查源库与目标库的字段类型是否一致,尤其是日期、时间戳等特殊类型。
- 主键与自增列:若目标表需保留自增属性,确保INSERT语句不指定主键值。
- 字符集与排序规则:统一源库与目标库的字符集(如utf8mb4),避免乱码。
- 索引与约束:迁移后重建索引与外键约束,提升查询性能。
- 日志与监控:记录迁移过程中的关键指标(如耗时、错误率),便于问题排查。
总结
通过Python脚本实现Mysql到Mysql的数据迁移,可灵活控制迁移过程,适应多种场景需求。本文提供的方案涵盖了从环境准备到性能优化的全流程,开发者可根据实际需求调整批量大小、重试策略等参数,实现高效稳定的数据迁移。