Apache Kafka Connect作为Kafka官方提供的数据集成框架,主要用于实现Kafka与其他系统之间的数据流转,默认支持JSON、Avro等常见数据格式,但面对XML这类结构化文本格式时,需要额外做适配处理才能正常完成消息的读写与转换。

XML消息处理的核心思路
Kafka Connect处理XML消息的核心是将XML格式的字节流转换为Kafka Connect内部的数据结构Struct,或者将Struct转换为XML字节流,这个过程需要通过转换器(Converter)来实现。默认的转换器不支持XML,因此需要自定义转换器或者引入第三方XML转换器实现。
处理流程拆解
- 消息接收阶段:Source Connector从外部系统读取XML格式的原始数据,或者Sink Connector从Kafka topic中消费XML格式的字节消息
- 格式转换阶段:通过自定义的XML Converter将XML字节流解析为
Struct对象,或者将Struct对象序列化为XML字节流 - 数据流转阶段:转换后的
Struct对象按照Kafka Connect的规范完成写入topic或者写入目标系统的操作
自定义XML Converter实现步骤
Kafka Connect的Converter接口定义了fromConnectData和toConnectData两个核心方法,分别用于序列化与反序列化,我们可以基于这两个方法实现XML格式的转换逻辑。
依赖引入
首先需要在项目中引入Kafka Connect相关的依赖以及XML解析的依赖,这里使用DOM4J作为XML解析工具:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.1.4</version>
</dependency>
</dependencies>
Converter核心代码实现
以下是一个简单的XML Converter实现示例,支持将XML字符串转换为Struct,以及将Struct转换为XML字符串:
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.storage.Converter;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class XmlConverter implements Converter {
private static final String DEFAULT_ROOT_ELEMENT = "record";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 可在此处读取配置,比如自定义根节点名称
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object data) {
if (data == null) {
return null;
}
// 将Struct转换为XML
if (data instanceof Struct) {
Struct struct = (Struct) data;
Document document = DocumentHelper.createDocument();
Element root = document.addElement(DEFAULT_ROOT_ELEMENT);
for (org.apache.kafka.connect.data.Field field : struct.schema().fields()) {
Object value = struct.get(field);
if (value != null) {
root.addElement(field.name()).addText(value.toString());
}
}
return document.asXML().getBytes(StandardCharsets.UTF_8);
}
return data.toString().getBytes(StandardCharsets.UTF_8);
}
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
if (value == null || value.length == 0) {
return new SchemaAndValue(null, null);
}
try {
String xmlStr = new String(value, StandardCharsets.UTF_8);
Document document = DocumentHelper.parseText(xmlStr);
Element root = document.getRootElement();
// 动态构建Schema
Schema schema = SchemaBuilder.struct().name("xml_record").build();
for (Element element : root.elements()) {
schema = SchemaBuilder.struct().field(element.getName(), Schema.OPTIONAL_STRING).build();
}
// 构建Struct
Struct struct = new Struct(schema);
for (Element element : root.elements()) {
struct.put(element.getName(), element.getText());
}
return new SchemaAndValue(schema, struct);
} catch (Exception e) {
throw new RuntimeException("解析XML失败", e);
}
}
}
打包与部署
将写好的Converter代码打包成jar包,放到Kafka Connect的插件目录下,比如libs/目录,然后重启Kafka Connect服务即可加载该转换器。
Connector配置示例
配置Connector时,需要指定使用我们自定义的XML Converter,以下是Sink Connector的示例配置:
{
"name": "xml-sink-connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "xml-topic",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.example.XmlConverter",
"file": "/tmp/xml-output.txt"
}
}
注意事项
- XML格式的结构复杂度较高,如果XML包含嵌套结构或者属性,需要额外扩展Converter的逻辑,适配嵌套字段的解析与序列化
- 生产环境中建议对XML的大小做限制,避免超大XML消息导致内存溢出
- 如果XML有对应的XSD schema,可以在Converter中引入校验逻辑,保证消息格式的正确性
- 自定义Converter的版本需要和Kafka Connect的版本保持兼容,避免版本冲突导致服务异常
Kafka_ConnectXML消息消息转换自定义转换器数据集成修改时间:2026-06-14 05:39:19