Java开发中实现数据实时捕获,Debezium与Kafka CDC的组合是成熟且高效的方案,能够监听数据库变更并实时同步到消息队列,供下游业务消费处理。该方案支持MySQL、PostgreSQL等多种主流数据库,适配性较强。

环境准备
搭建前需要准备以下基础组件,确保版本兼容:
- JDK 8及以上版本,配置好JAVA_HOME环境变量
- Zookeeper 3.6.x版本,用于Kafka集群协调
- Kafka 2.8.x版本,作为CDC数据的消息中间件
- Debezium 1.9.x版本,对应数据库类型的连接器
- MySQL 5.7及以上版本,作为待捕获数据的源数据库
基础组件部署
Zookeeper部署
解压Zookeeper安装包后,修改conf目录下的zoo.cfg配置文件,核心配置如下:
# 数据持久化目录 dataDir=/opt/zookeeper/data # 客户端连接端口 clientPort=2181 # 集群初始化时间 initLimit=10 # 同步超时时间 syncLimit=5
启动Zookeeper服务:
cd /opt/zookeeper/bin ./zkServer.sh start
Kafka部署
解压Kafka安装包,修改config目录下的server.properties配置文件,核心配置如下:
# broker唯一标识 broker.id=0 # 监听地址 listeners=PLAINTEXT://localhost:9092 # 日志存储目录 log.dirs=/opt/kafka/logs # Zookeeper连接地址 zookeeper.connect=localhost:2181
启动Kafka服务:
cd /opt/kafka/bin ./kafka-server-start.sh -daemon ../config/server.properties
Debezium连接器配置
首先将Debezium的MySQL连接器jar包放到Kafka的libs目录下,然后创建连接器配置文件mysql-connector.json,内容如下:
{
"name": "mysql-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "root123",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"table.include.list": "test.user",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mysql"
}
}
通过Kafka Connect接口提交连接器配置:
curl -X POST -H "Content-Type: application/json" --data @mysql-connector.json http://localhost:8083/connectors
Java端消费CDC数据
Java项目需要引入Kafka客户端依赖,Maven配置如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
编写消费者代码,监听Debezium生成的CDC主题,主题命名规则为server_name.database_name.table_name,对应上述配置的主题为mysql-server.test.user:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class CdcConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka集群地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 消费者组ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cdc-consumer-group");
// 开启自动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅CDC主题
consumer.subscribe(Collections.singletonList("mysql-server.test.user"));
while (true) {
// 拉取消息,超时时间1秒
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
System.out.println("捕获到数据变更:");
System.out.println("主题:" + record.topic());
System.out.println("内容:" + record.value());
});
}
}
}
注意事项
- 源数据库需要开启binlog,并且binlog格式设置为ROW,否则Debezium无法捕获变更数据
- Debezium连接器的
database.server.id需要保证全局唯一,避免和数据库其他从节点冲突 - 生产环境建议配置Kafka Connect集群,避免单点故障导致CDC数据丢失
- Java消费端需要处理CDC数据的格式,Debezium输出的JSON包含变更类型、前后数据等完整信息,可根据业务需求解析