导读:本期聚焦于小伙伴创作的《如何通过canal等工具实现MySQL到其他数据源的实时同步?》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《如何通过canal等工具实现MySQL到其他数据源的实时同步?》有用,将其分享出去将是对创作者最好的鼓励。

MySQL作为常用的关系型数据库,在很多业务架构中承担着核心存储的角色,而业务往往需要将这些数据同步到Elasticsearch用于搜索、同步到Kafka用于消息流转、同步到数据仓库用于分析。传统的全量定时同步方式存在延迟高、资源占用大的问题,基于binlog的增量实时同步成为更优的选择,canal就是这类场景下的常用工具。

如何通过canal等工具实现MySQL到其他数据源的实时同步?

canal实现同步的核心原理

canal的核心能力是模拟MySQL slave的交互协议,伪装成MySQL的从节点,向MySQL master发送dump协议请求。MySQL master收到请求后会推送binlog日志给canal,canal解析这些binlog日志,提取出数据的增删改变更记录,再将这些变更记录投递到下游的目标数据源,从而实现实时同步。

整个流程主要分为三个环节:

  • binlog采集:canal伪装成slave订阅MySQL的binlog,获取增量变更数据
  • 数据解析:canal将二进制的binlog解析成结构化的变更记录,包含库名、表名、操作类型、变更前后的字段值等信息
  • 数据投递:canal将解析后的变更记录发送到适配的下游组件,再由下游组件写入目标数据源

环境准备与canal部署

1. MySQL配置调整

首先需要确保MySQL开启了binlog,并且binlog格式为ROW,这是canal正常工作的前提。修改MySQL的配置文件my.cnf,添加以下配置:

# 开启binlog
log-bin=mysql-bin
# binlog格式设置为ROW
binlog_format=ROW
# 设置binlog的过期时间,避免日志过大
expire_logs_days=7
# 给当前MySQL实例设置唯一ID
server-id=1

配置完成后重启MySQL,然后创建一个用于canal连接的用户,并授予相应的权限:

-- 创建用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
-- 授予复制权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;

2. canal服务端部署

下载canal的发行包,解压后修改conf/example/instance.properties配置文件,配置MySQL的连接信息和binlog相关参数:

# MySQL地址
canal.instance.master.address=127.0.0.1:3306
# MySQL用户名
canal.instance.dbUsername=canal
# MySQL密码
canal.instance.dbPassword=canal_password
# 监听的数据库,不填则监听所有
canal.instance.filter.regex=.*\..*
# binlog起始位置,首次启动可以不填,canal会自动获取最新位置
# canal.instance.master.journal.name=
# canal.instance.master.position=

配置完成后启动canal服务端,启动成功后canal会开始监听MySQL的binlog变更。

实现MySQL到不同数据源的同步

同步到Kafka

canal本身支持将变更数据直接投递到Kafka,只需要修改canal的conf/canal.properties配置文件,开启Kafka投递模式:

# 投递模式设置为kafka
canal.serverMode=kafka
# Kafka地址
kafka.bootstrap.servers=127.0.0.1:9092
# 投递的topic,可按需求自定义
canal.mq.topic=canal_topic
# 分区策略,按表名分区保证同一张表的变更有序
canal.mq.partitionHash=.*\..*

重启canal后,MySQL的增量变更就会自动发送到Kafka的指定topic中,下游消费者订阅该topic即可获取变更数据,再根据业务需求写入其他存储。

同步到Elasticsearch

canal没有内置直接写入Elasticsearch的组件,通常需要配合canal adapter实现。首先下载canal adapter,修改conf/application.yml配置文件:

spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
canal.conf:
  mode: tcp
  canalServerHost: 127.0.0.1:11111
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test_db?useUnicode=true
      username: canal
      password: canal_password
  canalAdapters:
  - instance: example
    groups:
    - groupId: g1
      adapters:
      - name: es7
        hosts: 127.0.0.1:9200
        properties:
          cluster.name: elasticsearch

然后在conf/es7目录下创建表映射配置文件,比如将test_db库的user表同步到ES的user索引:

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: user
  _type: _doc
  _id: id
  sql: "SELECT id, username, age, email FROM user"
  etlCondition: "where id >= 1"
  commitBatch: 3000

启动canal adapter后,MySQL中user表的增删改操作都会实时同步到Elasticsearch的user索引中。

同步到数据仓库(以Hive为例)

同步到Hive可以先将canal的数据投递到Kafka,再通过Flink等流计算组件消费Kafka数据写入Hive。Flink消费Kafka的示例代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class CanalToHive {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "127.0.0.1:9092");
        props.setProperty("group.id", "canal_hive_group");
        // 消费canal投递到Kafka的topic
        DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("canal_topic", new SimpleStringSchema(), props));
        // 解析canal的json格式数据,提取变更信息
        // 这里省略解析逻辑,解析后调用Hive的写入接口即可
        stream.print();
        env.execute("canal to hive job");
    }
}

注意事项

  • binlog格式必须为ROW,否则canal无法获取完整的变更前后数据,可能导致同步数据不准确
  • canal的位点管理需要注意,异常重启后要避免重复消费或者漏消费binlog,建议定期备份canal的位点信息
  • 同步到目标数据源时要做好幂等性处理,避免重复同步导致数据不一致
  • 如果MySQL是集群部署,需要确保所有节点的server-id唯一,并且canal连接的master节点稳定
canal除了同步场景,还可以用于数据审计、缓存失效等场景,开发者可以根据实际需求扩展其使用方式。

canalMySQL实时同步binlog解析数据源同步增量同步修改时间:2026-06-29 02:09:33

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。