Python脚本实现Mysql到Mysql数据迁移指南

引言

在数据库管理场景中,数据迁移是常见的需求。无论是系统升级、数据整合还是跨环境部署,Mysql到Mysql的数据迁移都需要兼顾效率与准确性。本文将通过Python脚本实现这一过程,重点讨论连接管理、数据读取与写入、性能优化及异常处理等关键环节。

环境准备与依赖安装

实现Mysql数据迁移前,需确保Python环境已安装必要的依赖库。推荐使用pymysqlmysql-connector-python作为数据库连接驱动,两者均支持标准的Mysql协议。

  1. pip install pymysql mysql-connector-python

若需处理大数据量,可额外安装tqdm库以显示进度条,提升用户体验。

数据库连接配置

迁移脚本的核心是建立源数据库与目标数据库的连接。建议将连接参数(主机、端口、用户名、密码、数据库名)封装为配置类或字典,便于维护与复用。

  1. import pymysql
  2. source_config = {
  3. 'host': 'source_host',
  4. 'port': 3306,
  5. 'user': 'source_user',
  6. 'password': 'source_password',
  7. 'database': 'source_db',
  8. 'charset': 'utf8mb4'
  9. }
  10. target_config = {
  11. 'host': 'target_host',
  12. 'port': 3306,
  13. 'user': 'target_user',
  14. 'password': 'target_password',
  15. 'database': 'target_db',
  16. 'charset': 'utf8mb4'
  17. }
  18. def get_connection(config):
  19. return pymysql.connect(
  20. host=config['host'],
  21. port=config['port'],
  22. user=config['user'],
  23. password=config['password'],
  24. database=config['database'],
  25. charset=config['charset']
  26. )

数据读取与写入策略

表结构迁移

首先迁移表结构(CREATE TABLE语句),确保目标库的表定义与源库一致。可通过SHOW CREATE TABLE命令获取源表结构,并在目标库执行。

  1. def migrate_schema(source_conn, target_conn, table_name):
  2. with source_conn.cursor() as src_cursor:
  3. src_cursor.execute(f"SHOW CREATE TABLE {table_name}")
  4. create_table_sql = src_cursor.fetchone()[1]
  5. with target_conn.cursor() as tgt_cursor:
  6. tgt_cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
  7. tgt_cursor.execute(create_table_sql)
  8. target_conn.commit()

数据迁移

数据迁移需考虑批量处理与内存优化。对于大表,建议分批次读取与写入,避免单次操作数据量过大导致内存溢出。

  1. def migrate_data(source_conn, target_conn, table_name, batch_size=1000):
  2. with source_conn.cursor() as src_cursor:
  3. src_cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
  4. total_rows = src_cursor.fetchone()[0]
  5. print(f"Total rows to migrate: {total_rows}")
  6. offset = 0
  7. while offset < total_rows:
  8. src_cursor.execute(
  9. f"SELECT * FROM {table_name} LIMIT {offset}, {batch_size}"
  10. )
  11. rows = src_cursor.fetchall()
  12. if not rows:
  13. break
  14. # 动态生成INSERT语句(需处理字段名与值)
  15. columns = [desc[0] for desc in src_cursor.description]
  16. placeholders = ', '.join(['%s'] * len(columns))
  17. insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
  18. with target_conn.cursor() as tgt_cursor:
  19. tgt_cursor.executemany(insert_sql, rows)
  20. target_conn.commit()
  21. offset += len(rows)
  22. print(f"Migrated {offset}/{total_rows} rows")

性能优化与异常处理

批量操作优化

使用executemany替代循环单条插入,可显著提升性能。对于超大数据表,可结合多线程或异步IO进一步优化。

事务管理

确保每个批次的数据操作在事务中完成,避免部分失败导致数据不一致。

  1. try:
  2. with target_conn.cursor() as tgt_cursor:
  3. tgt_cursor.executemany(insert_sql, rows)
  4. target_conn.commit()
  5. except Exception as e:
  6. target_conn.rollback()
  7. print(f"Error occurred: {e}")

错误重试机制

针对网络波动或临时锁表问题,可实现指数退避重试逻辑,提升脚本健壮性。

完整脚本示例

  1. import pymysql
  2. from time import sleep
  3. def migrate_table(source_config, target_config, table_name, batch_size=1000, max_retries=3):
  4. source_conn = get_connection(source_config)
  5. target_conn = get_connection(target_config)
  6. try:
  7. migrate_schema(source_conn, target_conn, table_name)
  8. migrate_data(source_conn, target_conn, table_name, batch_size)
  9. except Exception as e:
  10. print(f"Migration failed for {table_name}: {e}")
  11. raise
  12. finally:
  13. source_conn.close()
  14. target_conn.close()
  15. # 使用示例
  16. if __name__ == "__main__":
  17. source = {...} # 填充源库配置
  18. target = {...} # 填充目标库配置
  19. migrate_table(source, target, "example_table")

最佳实践与注意事项

  1. 字段类型兼容性:检查源库与目标库的字段类型是否一致,尤其是日期、时间戳等特殊类型。
  2. 主键与自增列:若目标表需保留自增属性,确保INSERT语句不指定主键值。
  3. 字符集与排序规则:统一源库与目标库的字符集(如utf8mb4),避免乱码。
  4. 索引与约束:迁移后重建索引与外键约束,提升查询性能。
  5. 日志与监控:记录迁移过程中的关键指标(如耗时、错误率),便于问题排查。

总结

通过Python脚本实现Mysql到Mysql的数据迁移,可灵活控制迁移过程,适应多种场景需求。本文提供的方案涵盖了从环境准备到性能优化的全流程,开发者可根据实际需求调整批量大小、重试策略等参数,实现高效稳定的数据迁移。