Java开发如何配置数据捕获环境?Debezium与Kafka CDC搭建步骤详解

来源:编程网作者:上海网站建设头衔:草根站长
导读:本期聚焦于小伙伴创作的《Java开发如何配置数据捕获环境?Debezium与Kafka CDC搭建步骤详解》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《Java开发如何配置数据捕获环境?Debezium与Kafka CDC搭建步骤详解》有用,将其分享出去将是对创作者最好的鼓励。

Java开发中实现数据实时捕获,Debezium与Kafka CDC的组合是成熟且高效的方案,能够监听数据库变更并实时同步到消息队列,供下游业务消费处理。该方案支持MySQL、PostgreSQL等多种主流数据库,适配性较强。

Java开发如何配置数据捕获环境?Debezium与Kafka CDC搭建步骤详解

环境准备

搭建前需要准备以下基础组件,确保版本兼容:

  • JDK 8及以上版本,配置好JAVA_HOME环境变量
  • Zookeeper 3.6.x版本,用于Kafka集群协调
  • Kafka 2.8.x版本,作为CDC数据的消息中间件
  • Debezium 1.9.x版本,对应数据库类型的连接器
  • MySQL 5.7及以上版本,作为待捕获数据的源数据库

基础组件部署

Zookeeper部署

解压Zookeeper安装包后,修改conf目录下的zoo.cfg配置文件,核心配置如下:

# 数据持久化目录
dataDir=/opt/zookeeper/data
# 客户端连接端口
clientPort=2181
# 集群初始化时间
initLimit=10
# 同步超时时间
syncLimit=5

启动Zookeeper服务:

cd /opt/zookeeper/bin
./zkServer.sh start

Kafka部署

解压Kafka安装包,修改config目录下的server.properties配置文件,核心配置如下:

# broker唯一标识
broker.id=0
# 监听地址
listeners=PLAINTEXT://localhost:9092
# 日志存储目录
log.dirs=/opt/kafka/logs
# Zookeeper连接地址
zookeeper.connect=localhost:2181

启动Kafka服务:

cd /opt/kafka/bin
./kafka-server-start.sh -daemon ../config/server.properties

Debezium连接器配置

首先将Debezium的MySQL连接器jar包放到Kafka的libs目录下,然后创建连接器配置文件mysql-connector.json,内容如下:

{
  "name": "mysql-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "root123",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "table.include.list": "test.user",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.mysql"
  }
}

通过Kafka Connect接口提交连接器配置:

curl -X POST -H "Content-Type: application/json" --data @mysql-connector.json http://localhost:8083/connectors

Java端消费CDC数据

Java项目需要引入Kafka客户端依赖,Maven配置如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

编写消费者代码,监听Debezium生成的CDC主题,主题命名规则为server_name.database_name.table_name,对应上述配置的主题为mysql-server.test.user

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CdcConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 消费者组ID
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "cdc-consumer-group");
        // 开启自动提交偏移量
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅CDC主题
        consumer.subscribe(Collections.singletonList("mysql-server.test.user"));

        while (true) {
            // 拉取消息,超时时间1秒
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            records.forEach(record -> {
                System.out.println("捕获到数据变更:");
                System.out.println("主题:" + record.topic());
                System.out.println("内容:" + record.value());
            });
        }
    }
}

注意事项

  • 源数据库需要开启binlog,并且binlog格式设置为ROW,否则Debezium无法捕获变更数据
  • Debezium连接器的database.server.id需要保证全局唯一,避免和数据库其他从节点冲突
  • 生产环境建议配置Kafka Connect集群,避免单点故障导致CDC数据丢失
  • Java消费端需要处理CDC数据的格式,Debezium输出的JSON包含变更类型、前后数据等完整信息,可根据业务需求解析

DebeziumKafka_CDCJava开发数据捕获修改时间:2026-07-04 07:33:28

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。