在项目从本地轻量部署转向服务端规模化部署时,很多开发者需要将原本使用的SQLite数据库迁移到MySQL,本文就通过实际脚本实例介绍完整的迁移流程。

迁移前准备
首先确保本地已经安装好SQLite和MySQL环境,并且提前创建好目标MySQL数据库,同时准备好可以操作两个数据库的权限。需要特别注意的是,SQLite和MySQL的字段类型存在差异,比如SQLite的INTEGER类型对应MySQL的INT,TEXT类型对应MySQL的VARCHAR或者TEXT,提前梳理好类型映射关系能减少后续报错。
数据结构迁移脚本
我们可以先通过SQLite的导出功能获取表结构,再转换为MySQL兼容的建表语句,以下是Python实现的自动转换脚本:
import sqlite3
import pymysql
# SQLite类型到MySQL类型的映射关系
type_map = {
'INTEGER': 'INT',
'TEXT': 'VARCHAR(255)',
'REAL': 'DOUBLE',
'BLOB': 'BLOB',
'DATETIME': 'DATETIME'
}
def get_sqlite_tables(sqlite_conn):
# 获取所有用户表名
cursor = sqlite_conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [row[0] for row in cursor.fetchall()]
return tables
def get_table_structure(sqlite_conn, table_name):
# 获取表结构
cursor = sqlite_conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
return columns
def convert_to_mysql_sql(table_name, columns):
# 生成MySQL建表语句
sql = f"CREATE TABLE IF NOT EXISTS {table_name} (\n"
col_defs = []
for col in columns:
cid, name, type_, notnull, dflt_value, pk = col
mysql_type = type_map.get(type_.upper(), 'VARCHAR(255)')
notnull_str = 'NOT NULL' if notnull else ''
pk_str = 'PRIMARY KEY AUTO_INCREMENT' if pk else ''
default_str = f"DEFAULT '{dflt_value}'" if dflt_value else ''
col_def = f" {name} {mysql_type} {pk_str} {notnull_str} {default_str}".strip()
col_defs.append(col_def)
sql += ',\n'.join(col_defs) + '\n);'
return sql
if __name__ == '__main__':
# 连接SQLite数据库
sqlite_conn = sqlite3.connect('source.db')
tables = get_sqlite_tables(sqlite_conn)
for table in tables:
columns = get_table_structure(sqlite_conn, table)
mysql_sql = convert_to_mysql_sql(table, columns)
print(f"表{table}的MySQL建表语句:")
print(mysql_sql)
sqlite_conn.close()数据迁移脚本
完成表结构创建后,需要把SQLite中的数据导出并导入到MySQL,以下是数据迁移的脚本实例:
import sqlite3
import pymysql
from datetime import datetime
def transfer_data(sqlite_db, mysql_config):
# 连接SQLite
sqlite_conn = sqlite3.connect(sqlite_db)
sqlite_cursor = sqlite_conn.cursor()
# 连接MySQL
mysql_conn = pymysql.connect(**mysql_config)
mysql_cursor = mysql_conn.cursor()
# 获取所有表
sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [row[0] for row in sqlite_cursor.fetchall()]
for table in tables:
# 获取表数据
sqlite_cursor.execute(f"SELECT * FROM {table}")
rows = sqlite_cursor.fetchall()
if not rows:
continue
# 获取列名
sqlite_cursor.execute(f"PRAGMA table_info({table})")
columns = [col[1] for col in sqlite_cursor.fetchall()]
# 生成插入语句
placeholders = ','.join(['%s'] * len(columns))
insert_sql = f"INSERT INTO {table} ({','.join(columns)}) VALUES ({placeholders})"
# 处理特殊类型数据,比如时间格式转换
processed_rows = []
for row in rows:
new_row = []
for val in row:
if isinstance(val, str):
# 尝试转换时间格式
try:
dt = datetime.strptime(val, '%Y-%m-%d %H:%M:%S')
new_row.append(dt)
except:
new_row.append(val)
else:
new_row.append(val)
processed_rows.append(new_row)
# 批量插入数据
mysql_cursor.executemany(insert_sql, processed_rows)
mysql_conn.commit()
print(f"表{table}迁移完成,共迁移{len(rows)}条数据")
sqlite_conn.close()
mysql_conn.close()
if __name__ == '__main__':
mysql_config = {
'host': '127.0.0.1',
'user': 'root',
'password': '123456',
'database': 'target_db',
'charset': 'utf8mb4'
}
transfer_data('source.db', mysql_config)迁移后验证
迁移完成后需要验证数据完整性,可以通过以下方式检查:
- 对比两个数据库中每个表的行数,确保数量一致
- 随机抽取部分数据,对比字段内容是否完全匹配
- 测试目标MySQL数据库的业务查询功能,确保没有语法报错
如果在迁移过程中遇到字段类型不匹配的问题,可以回到类型映射脚本中调整type_map字典的对应关系,比如如果某个TEXT字段存储的内容超过255字符,可以把对应的映射改为TEXT类型,重新执行迁移脚本即可。