在业务系统迭代过程中,经常需要将源数据库的数据同步到数仓、缓存或者其他业务库,全量同步每次都要处理全部数据,不仅耗时久还会占用大量IO和带宽资源。利用更新时间做标记实现SQL增量同步,是兼顾效率和实现成本的主流方案。

核心原理
该方案的核心逻辑是为需要同步的表添加update_time字段,记录每条数据的最后修改时间。每次同步时,记录本次同步的最大更新时间作为同步位点,下次同步时只查询update_time大于该位点的数据,即可实现增量同步。
通常还需要配合create_time字段处理新增数据,不过如果update_time在插入时默认赋值为当前时间,也可以仅用update_time覆盖新增和更新的场景,删除场景则需要额外处理。
实现步骤
1. 表结构准备
源表需要包含时间标记字段,示例表结构如下:
-- MySQL源表示例 CREATE TABLE `user_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(50) NOT NULL, `age` int(11) DEFAULT NULL, `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), KEY `idx_update_time` (`update_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
这里update_time设置了自动更新,当数据被插入或更新时,会自动刷新为当前时间,不需要业务层手动维护。同时给update_time添加索引,提升同步查询的效率。
2. 同步位点记录
需要维护一个同步位点表,记录每个表的最后一次同步时间,示例表结构如下:
-- 同步位点表 CREATE TABLE `sync_position` ( `id` int(11) NOT NULL AUTO_INCREMENT, `table_name` varchar(100) NOT NULL COMMENT '同步的表名', `last_sync_time` datetime NOT NULL COMMENT '上次同步的最大更新时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `uk_table_name` (`table_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
3. 同步逻辑实现
同步流程分为三步:先获取上次同步位点,再查询增量数据,最后更新同步位点。以下是Python实现的示例逻辑:
import pymysql
import datetime
def get_increment_data():
# 连接源库和目标库
source_conn = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='source_db')
target_conn = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='target_db')
source_cursor = source_conn.cursor()
target_cursor = target_conn.cursor()
table_name = 'user_info'
# 1. 获取上次同步位点
source_cursor.execute(f"SELECT last_sync_time FROM sync_position WHERE table_name = '{table_name}'")
result = source_cursor.fetchone()
if result:
last_sync_time = result[0]
else:
# 首次同步,默认取当前时间前一天
last_sync_time = datetime.datetime.now() - datetime.timedelta(days=1)
source_cursor.execute(f"INSERT INTO sync_position (table_name, last_sync_time) VALUES ('{table_name}', '{last_sync_time}')")
source_conn.commit()
# 2. 查询增量数据
source_cursor.execute(f"SELECT id, username, age, create_time, update_time FROM {table_name} WHERE update_time > '{last_sync_time}' ORDER BY update_time ASC")
increment_data = source_cursor.fetchall()
# 3. 写入目标库(这里用replace into处理更新和新增,假设目标表结构和源表一致)
for row in increment_data:
target_cursor.execute(
"REPLACE INTO user_info (id, username, age, create_time, update_time) VALUES (%s, %s, %s, %s, %s)",
row
)
target_conn.commit()
# 4. 更新同步位点,取本次同步的最大更新时间
if increment_data:
max_update_time = max([row[4] for row in increment_data])
source_cursor.execute(f"UPDATE sync_position SET last_sync_time = '{max_update_time}' WHERE table_name = '{table_name}'")
source_conn.commit()
source_cursor.close()
target_cursor.close()
source_conn.close()
target_conn.close()
注意事项
- 时间字段的精度问题:如果数据更新非常频繁,秒级精度可能导致部分数据漏同步,建议使用毫秒级时间戳字段,比如用
bigint类型存储Unix毫秒时间戳。 - 删除数据处理:更新时间方案无法感知数据删除操作,如果需要同步删除,需要额外维护删除日志表,或者在业务层做软删除,通过
is_deleted字段和更新时间配合同步。 - 时区一致性:源库和目标库的时区需要保持一致,避免出现时间转换错误导致同步位点偏差。
- 大表同步优化:如果表数据量极大,建议按时间范围分批次同步,避免单次查询返回过多数据导致内存溢出。
不同数据库适配示例
如果是PostgreSQL数据库,自动更新时间的字段定义略有不同,示例代码如下:
-- PostgreSQL源表更新时间设置 CREATE TABLE user_info ( id serial PRIMARY KEY, username varchar(50) NOT NULL, age int, create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); -- 创建更新时间的触发器函数 CREATE OR REPLACE FUNCTION update_update_time() RETURNS TRIGGER AS $$ BEGIN NEW.update_time = CURRENT_TIMESTAMP; RETURN NEW; END; $$ LANGUAGE plpgsql; -- 创建触发器 CREATE TRIGGER trigger_update_time BEFORE UPDATE ON user_info FOR EACH ROW EXECUTE FUNCTION update_update_time();
如果是SQL Server数据库,查询增量数据的语句可以调整为:
-- SQL Server查询增量数据 DECLARE @last_sync_time datetime SELECT @last_sync_time = last_sync_time FROM sync_position WHERE table_name = 'user_info' SELECT id, username, age, create_time, update_time FROM user_info WHERE update_time > @last_sync_time ORDER BY update_time ASC