MySQL作为常用的关系型数据库,在很多业务架构中承担着核心存储的角色,而业务往往需要将这些数据同步到Elasticsearch用于搜索、同步到Kafka用于消息流转、同步到数据仓库用于分析。传统的全量定时同步方式存在延迟高、资源占用大的问题,基于binlog的增量实时同步成为更优的选择,canal就是这类场景下的常用工具。

canal实现同步的核心原理
canal的核心能力是模拟MySQL slave的交互协议,伪装成MySQL的从节点,向MySQL master发送dump协议请求。MySQL master收到请求后会推送binlog日志给canal,canal解析这些binlog日志,提取出数据的增删改变更记录,再将这些变更记录投递到下游的目标数据源,从而实现实时同步。
整个流程主要分为三个环节:
- binlog采集:canal伪装成slave订阅MySQL的binlog,获取增量变更数据
- 数据解析:canal将二进制的binlog解析成结构化的变更记录,包含库名、表名、操作类型、变更前后的字段值等信息
- 数据投递:canal将解析后的变更记录发送到适配的下游组件,再由下游组件写入目标数据源
环境准备与canal部署
1. MySQL配置调整
首先需要确保MySQL开启了binlog,并且binlog格式为ROW,这是canal正常工作的前提。修改MySQL的配置文件my.cnf,添加以下配置:
# 开启binlog log-bin=mysql-bin # binlog格式设置为ROW binlog_format=ROW # 设置binlog的过期时间,避免日志过大 expire_logs_days=7 # 给当前MySQL实例设置唯一ID server-id=1
配置完成后重启MySQL,然后创建一个用于canal连接的用户,并授予相应的权限:
-- 创建用户 CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password'; -- 授予复制权限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- 刷新权限 FLUSH PRIVILEGES;
2. canal服务端部署
下载canal的发行包,解压后修改conf/example/instance.properties配置文件,配置MySQL的连接信息和binlog相关参数:
# MySQL地址 canal.instance.master.address=127.0.0.1:3306 # MySQL用户名 canal.instance.dbUsername=canal # MySQL密码 canal.instance.dbPassword=canal_password # 监听的数据库,不填则监听所有 canal.instance.filter.regex=.*\..* # binlog起始位置,首次启动可以不填,canal会自动获取最新位置 # canal.instance.master.journal.name= # canal.instance.master.position=
配置完成后启动canal服务端,启动成功后canal会开始监听MySQL的binlog变更。
实现MySQL到不同数据源的同步
同步到Kafka
canal本身支持将变更数据直接投递到Kafka,只需要修改canal的conf/canal.properties配置文件,开启Kafka投递模式:
# 投递模式设置为kafka canal.serverMode=kafka # Kafka地址 kafka.bootstrap.servers=127.0.0.1:9092 # 投递的topic,可按需求自定义 canal.mq.topic=canal_topic # 分区策略,按表名分区保证同一张表的变更有序 canal.mq.partitionHash=.*\..*
重启canal后,MySQL的增量变更就会自动发送到Kafka的指定topic中,下游消费者订阅该topic即可获取变更数据,再根据业务需求写入其他存储。
同步到Elasticsearch
canal没有内置直接写入Elasticsearch的组件,通常需要配合canal adapter实现。首先下载canal adapter,修改conf/application.yml配置文件:
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
canal.conf:
mode: tcp
canalServerHost: 127.0.0.1:11111
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test_db?useUnicode=true
username: canal
password: canal_password
canalAdapters:
- instance: example
groups:
- groupId: g1
adapters:
- name: es7
hosts: 127.0.0.1:9200
properties:
cluster.name: elasticsearch
然后在conf/es7目录下创建表映射配置文件,比如将test_db库的user表同步到ES的user索引:
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: user _type: _doc _id: id sql: "SELECT id, username, age, email FROM user" etlCondition: "where id >= 1" commitBatch: 3000
启动canal adapter后,MySQL中user表的增删改操作都会实时同步到Elasticsearch的user索引中。
同步到数据仓库(以Hive为例)
同步到Hive可以先将canal的数据投递到Kafka,再通过Flink等流计算组件消费Kafka数据写入Hive。Flink消费Kafka的示例代码如下:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class CanalToHive {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "127.0.0.1:9092");
props.setProperty("group.id", "canal_hive_group");
// 消费canal投递到Kafka的topic
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("canal_topic", new SimpleStringSchema(), props));
// 解析canal的json格式数据,提取变更信息
// 这里省略解析逻辑,解析后调用Hive的写入接口即可
stream.print();
env.execute("canal to hive job");
}
}
注意事项
- binlog格式必须为ROW,否则canal无法获取完整的变更前后数据,可能导致同步数据不准确
- canal的位点管理需要注意,异常重启后要避免重复消费或者漏消费binlog,建议定期备份canal的位点信息
- 同步到目标数据源时要做好幂等性处理,避免重复同步导致数据不一致
- 如果MySQL是集群部署,需要确保所有节点的server-id唯一,并且canal连接的master节点稳定
canal除了同步场景,还可以用于数据审计、缓存失效等场景,开发者可以根据实际需求扩展其使用方式。