如何使用Apache Flink与Kafka构建实时连续查询

来源:个人站长作者:小宵头衔:网络博主
导读:本期聚焦于小伙伴创作的《如何使用Apache Flink与Kafka构建实时连续查询》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《如何使用Apache Flink与Kafka构建实时连续查询》有用,将其分享出去将是对创作者最好的鼓励。

实时连续查询是指对持续产生的动态数据流进行不间断的查询分析,实时输出符合要求的统计结果,在电商实时大屏、金融风控、物联网设备监控等场景中有广泛应用。Apache Flink作为高性能流处理框架,具备低延迟、高吞吐、精确一次处理等特性,Kafka则是主流的分布式消息队列,二者结合可以高效构建稳定的实时连续查询系统。

如何使用Apache Flink与Kafka构建实时连续查询

核心组件与原理说明

在构建系统前,需要先了解两个核心组件的作用与协作逻辑:

  • Kafka:负责接收上游业务系统产生的实时数据,作为数据流转的中间存储,保证数据的可靠性和有序性,为Flink提供稳定的数据源。
  • Apache Flink:从Kafka消费数据后,按照预设的查询逻辑进行实时计算,维护计算状态,持续输出最新的查询结果。

实时连续查询的核心逻辑是Flink会持续消费Kafka中的新数据,更新内部的计算状态,不需要用户手动触发查询,只要数据流入就会自动更新结果。

环境准备

开始前需要准备好运行环境,以下是基础依赖:

  • JDK 1.8及以上版本
  • Apache Kafka 2.8及以上版本,启动Zookeeper和Kafka服务
  • Apache Flink 1.14及以上版本,启动本地Flink集群
  • Maven 3.6及以上版本,用于管理项目依赖

添加项目依赖

创建Maven项目后,在pom.xml中添加Flink和Kafka相关的依赖:

<dependencies>
    <!-- Flink核心依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.6</version>
    </dependency>
    <!-- Flink Kafka连接器依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.6</version>
    </dependency>
    <!-- Flink客户端依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.14.6</version>
    </dependency>
</dependencies>

Kafka数据源准备

首先需要创建Kafka主题,用于存储实时产生的业务数据,这里以用户订单数据为例:

创建Kafka主题

执行以下命令创建名为order_topic的Kafka主题,分区数设置为3,副本数为1:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic order_topic --partitions 3 --replication-factor 1

模拟数据生产

可以编写一个简单的生产者程序,向order_topic中持续发送订单数据,数据格式为JSON:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.alibaba.fastjson.JSONObject;
import java.util.Properties;
import java.util.Random;

public class OrderProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        Random random = new Random();
        // 持续发送订单数据
        while (true) {
            JSONObject order = new JSONObject();
            order.put("order_id", System.currentTimeMillis());
            order.put("user_id", random.nextInt(1000));
            order.put("amount", random.nextDouble() * 1000);
            order.put("category", random.nextBoolean() ? "electronics" : "clothing");
            order.put("timestamp", System.currentTimeMillis());
            producer.send(new ProducerRecord<>("order_topic", order.toJSONString()));
            System.out.println("发送订单数据:" + order.toJSONString());
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Flink实时连续查询实现

接下来编写Flink程序,实现实时统计每个商品类目的总订单金额,结果持续更新输出:

数据消费与解析

首先配置Flink从Kafka消费数据,解析JSON格式的订单数据:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSONObject;
import java.time.Duration;

public class RealTimeContinuousQuery {
    public static void main(String[] args) throws Exception {
        // 初始化Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        
        // 配置Kafka数据源
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("order_topic")
                .setGroupId("flink_order_query_group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        
        // 消费Kafka数据,分配时间戳和水位线
        DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timestamp) -> {
                    JSONObject json = JSONObject.parseObject(event);
                    return json.getLong("timestamp");
                }), "Kafka Source");
    }
}

连续查询逻辑实现

对解析后的订单数据按照类目分组,持续累加订单金额,实现连续查询:

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import com.alibaba.fastjson.JSONObject;

// 接上面的RealTimeContinuousQuery类main方法后续代码
        // 解析订单数据,提取类目和金额
        DataStream<OrderInfo> orderStream = kafkaStream.map(event -> {
            JSONObject json = JSONObject.parseObject(event);
            return new OrderInfo(
                    json.getString("category"),
                    json.getDouble("amount")
            );
        });
        
        // 按照类目分组,实现连续累加总金额
        DataStream<CategoryTotal> resultStream = orderStream
                .keyBy(OrderInfo::getCategory)
                .process(new ContinuousQueryFunction());
        
        // 输出连续查询结果
        resultStream.print();
        
        env.execute("Flink Kafka Real Time Continuous Query");
    }
    
    // 订单信息实体类
    public static class OrderInfo {
        private String category;
        private Double amount;
        
        public OrderInfo(String category, Double amount) {
            this.category = category;
            this.amount = amount;
        }
        
        public String getCategory() {
            return category;
        }
        
        public Double getAmount() {
            return amount;
        }
    }
    
    // 类目总金额结果实体类
    public static class CategoryTotal {
        private String category;
        private Double totalAmount;
        private Long updateTime;
        
        public CategoryTotal(String category, Double totalAmount, Long updateTime) {
            this.category = category;
            this.totalAmount = totalAmount;
            this.updateTime = updateTime;
        }
        
        @Override
        public String toString() {
            return "CategoryTotal{" +
                    "category='" + category + ''' +
                    ", totalAmount=" + totalAmount +
                    ", updateTime=" + updateTime +
                    '}';
        }
    }
    
    // 连续查询处理函数,维护每个类目的总金额状态
    public static class ContinuousQueryFunction extends KeyedProcessFunction<String, OrderInfo, CategoryTotal> {
        private MapState<String, Double> totalState;
        
        @Override
        public void open(Configuration parameters) {
            // 初始化状态,存储每个类目的总订单金额
            MapStateDescriptor<String, Double> stateDescriptor = new MapStateDescriptor<>(
                    "category_total_state",
                    String.class,
                    Double.class
            );
            totalState = getRuntimeContext().getMapState(stateDescriptor);
        }
        
        @Override
        public void processElement(OrderInfo orderInfo, Context context, Collector<CategoryTotal> collector) throws Exception {
            String category = orderInfo.getCategory();
            Double currentAmount = orderInfo.getAmount();
            // 获取当前类目的累计总金额
            Double currentTotal = totalState.get(category);
            if (currentTotal == null) {
                currentTotal = 0.0;
            }
            // 更新累计金额
            Double newTotal = currentTotal + currentAmount;
            totalState.put(category, newTotal);
            // 输出最新的连续查询结果
            collector.collect(new CategoryTotal(category, newTotal, System.currentTimeMillis()));
        }
    }

结果验证

启动Kafka、Flink集群后,先运行OrderProducer发送模拟订单数据,再启动Flink程序,控制台会持续输出每个类目的总订单金额,每当有新的订单数据进入Kafka,对应的类目总金额就会自动更新,实现实时连续查询的效果。

如果需要将结果输出到外部存储,比如MySQL或者Elasticsearch,只需要在resultStream后面添加对应的sink即可,Flink会持续将最新的查询结果写入到外部系统中。

注意:生产环境中需要开启Flink的Checkpoint功能,保证状态数据不丢失,同时将Kafka的offset提交配置为开启,避免数据重复消费。

Apache_FlinkKafka实时连续查询流处理修改时间:2026-06-16 03:00:50

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。