导读:本期聚焦于小伙伴创作的《Docker Compose搭建Zookeeper与Kafka集群指南:从环境配置到Java客户端测试》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《Docker Compose搭建Zookeeper与Kafka集群指南:从环境配置到Java客户端测试》有用,将其分享出去将是对创作者最好的鼓励。

docker搭建zookeeper集群和kafka集群并使用Java测试详解

在分布式系统和微服务架构中,ZookeeperKafka是两个非常重要的组件。Zookeeper用于分布式协调,而Kafka作为高吞吐量的消息队列,广泛应用于日志收集、消息系统和流处理等场景。本文将详细介绍如何使用Docker Compose搭建一个3节点的Zookeeper集群和3节点的Kafka集群,并使用Java编写生产者和消费者进行测试。

一、环境准备

在开始之前,请确保您的机器已经安装了以下环境:

  • Docker

  • Docker Compose

  • JDK 1.8+

  • Maven

二、编写Docker Compose配置

我们将使用Docker Compose来定义和运行多容器Docker应用程序。创建一个名为docker-compose.yml的文件,内容如下:

version: '3'
services:
  zoo1:
    image: zookeeper:3.4.14
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

  zoo2:
    image: zookeeper:3.4.14
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888

  zoo3:
    image: zookeeper:3.4.14
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888

  kafka1:
    image: wurstmeister/kafka:2.12-2.5.0
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka2:
    image: wurstmeister/kafka:2.12-2.5.0
    ports:
      - 9093:9092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka3:
    image: wurstmeister/kafka:2.12-2.5.0
    ports:
      - 9094:9092
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9094
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    depends_on:
      - zoo1
      - zoo2
      - zoo3

配置说明:

  • ZOO_MY_ID:Zookeeper节点的唯一标识,必须与ZOO_SERVERS中的配置一致。

  • KAFKA_BROKER_ID:Kafka节点的唯一标识。

  • KAFKA_ADVERTISED_LISTENERS:Kafka对外暴露的监听地址,这里我们映射到宿主机的localhost和对应端口,以便宿主机上的Java程序能够访问。

  • KAFKA_ZOOKEEPER_CONNECT:Kafka连接Zookeeper集群的地址。

三、启动集群

docker-compose.yml文件所在目录下,执行以下命令启动集群:

docker-compose up -d

稍等片刻,等待所有容器启动完毕。可以使用以下命令查看容器运行状态:

docker-compose ps

如果看到6个容器的状态都是Up,说明集群已经成功启动。

四、Java测试验证

接下来,我们编写Java程序来测试Kafka集群的消息生产和消费功能。

4.1 引入Maven依赖

在您的Maven项目的pom.xml文件中添加Kafka客户端依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>
</dependencies>

4.2 编写生产者代码

创建一个Java类KafkaProducerTest,用于向Kafka发送消息:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerTest {

    public static void main(String[] args) {
        Properties props = new Properties();
        // 配置Kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
        // 配置Key和Value的序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        String topic = "test-topic";
        for (int i = 0; i < 10; i++) {
            String key = "key-" + i;
            String value = "value-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("发送成功: 分区[" + metadata.partition() + "], 偏移量[" + metadata.offset() + "]");
                } else {
                    exception.printStackTrace();
                }
            });
        }

        producer.close();
    }
}

4.3 编写消费者代码

创建一个Java类KafkaConsumerTest,用于从Kafka消费消息:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerTest {

    public static void main(String[] args) {
        Properties props = new Properties();
        // 配置Kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
        // 配置消费者组ID
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        // 配置Key和Value的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 自动提交Offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        String topic = "test-topic";
        consumer.subscribe(Collections.singletonList(topic));

        System.out.println("开始消费消息...");
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
                System.out.println("收到消息: Key[" + record.key() + "], Value[" + record.value() + "], Partition[" + record.partition() + "], Offset[" + record.offset() + "]");
            });
        }
    }
}

五、测试结果

1. 先运行KafkaConsumerTest,让消费者处于等待状态。

2. 运行KafkaProducerTest,生产者将向集群发送10条消息。

3. 观察消费者的控制台输出,您将看到类似如下的日志,表明Kafka集群搭建成功,且消息的收发功能正常:

开始消费消息...
收到消息: Key[key-0], Value[value-0], Partition[0], Offset[0]
收到消息: Key[key-1], Value[value-1], Partition[1], Offset[0]
...

六、总结

本文详细介绍了如何通过Docker Compose快速搭建Zookeeper和Kafka集群,并通过Java代码验证了集群的消息收发能力。使用Docker不仅极大简化了集群的部署过程,还方便了开发环境的统一。在实际的生产环境中,还需要注意Kafka的参数调优、数据持久化以及高可用机制的配置,这些都可以在此基础配置上进行扩展。

Docker ComposeZookeeper集群Kafka集群Java客户端消息队列测试

免责声明:已尽一切努力确保本网站所含信息的准确性。网站部分内容来源于网络或由用户自行发表,内容观点不代表本站立场。本站是个人网站免费分享,内容仅供个人学习、研究或参考使用,如内容中引用了第三方作品,其版权归原作者所有。若内容触犯了您的权益,请联系我们进行处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。前端、网络、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握网站开发与运维所需的核心技术栈。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端逻辑,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。