实时连续查询是指对持续产生的动态数据流进行不间断的查询分析,实时输出符合要求的统计结果,在电商实时大屏、金融风控、物联网设备监控等场景中有广泛应用。Apache Flink作为高性能流处理框架,具备低延迟、高吞吐、精确一次处理等特性,Kafka则是主流的分布式消息队列,二者结合可以高效构建稳定的实时连续查询系统。

核心组件与原理说明
在构建系统前,需要先了解两个核心组件的作用与协作逻辑:
- Kafka:负责接收上游业务系统产生的实时数据,作为数据流转的中间存储,保证数据的可靠性和有序性,为Flink提供稳定的数据源。
- Apache Flink:从Kafka消费数据后,按照预设的查询逻辑进行实时计算,维护计算状态,持续输出最新的查询结果。
实时连续查询的核心逻辑是Flink会持续消费Kafka中的新数据,更新内部的计算状态,不需要用户手动触发查询,只要数据流入就会自动更新结果。
环境准备
开始前需要准备好运行环境,以下是基础依赖:
- JDK 1.8及以上版本
- Apache Kafka 2.8及以上版本,启动Zookeeper和Kafka服务
- Apache Flink 1.14及以上版本,启动本地Flink集群
- Maven 3.6及以上版本,用于管理项目依赖
添加项目依赖
创建Maven项目后,在pom.xml中添加Flink和Kafka相关的依赖:
<dependencies>
<!-- Flink核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<!-- Flink Kafka连接器依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<!-- Flink客户端依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.6</version>
</dependency>
</dependencies>
Kafka数据源准备
首先需要创建Kafka主题,用于存储实时产生的业务数据,这里以用户订单数据为例:
创建Kafka主题
执行以下命令创建名为order_topic的Kafka主题,分区数设置为3,副本数为1:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic order_topic --partitions 3 --replication-factor 1
模拟数据生产
可以编写一个简单的生产者程序,向order_topic中持续发送订单数据,数据格式为JSON:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.alibaba.fastjson.JSONObject;
import java.util.Properties;
import java.util.Random;
public class OrderProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Random random = new Random();
// 持续发送订单数据
while (true) {
JSONObject order = new JSONObject();
order.put("order_id", System.currentTimeMillis());
order.put("user_id", random.nextInt(1000));
order.put("amount", random.nextDouble() * 1000);
order.put("category", random.nextBoolean() ? "electronics" : "clothing");
order.put("timestamp", System.currentTimeMillis());
producer.send(new ProducerRecord<>("order_topic", order.toJSONString()));
System.out.println("发送订单数据:" + order.toJSONString());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Flink实时连续查询实现
接下来编写Flink程序,实现实时统计每个商品类目的总订单金额,结果持续更新输出:
数据消费与解析
首先配置Flink从Kafka消费数据,解析JSON格式的订单数据:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSONObject;
import java.time.Duration;
public class RealTimeContinuousQuery {
public static void main(String[] args) throws Exception {
// 初始化Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
// 配置Kafka数据源
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("order_topic")
.setGroupId("flink_order_query_group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 消费Kafka数据,分配时间戳和水位线
DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> {
JSONObject json = JSONObject.parseObject(event);
return json.getLong("timestamp");
}), "Kafka Source");
}
}
连续查询逻辑实现
对解析后的订单数据按照类目分组,持续累加订单金额,实现连续查询:
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import com.alibaba.fastjson.JSONObject;
// 接上面的RealTimeContinuousQuery类main方法后续代码
// 解析订单数据,提取类目和金额
DataStream<OrderInfo> orderStream = kafkaStream.map(event -> {
JSONObject json = JSONObject.parseObject(event);
return new OrderInfo(
json.getString("category"),
json.getDouble("amount")
);
});
// 按照类目分组,实现连续累加总金额
DataStream<CategoryTotal> resultStream = orderStream
.keyBy(OrderInfo::getCategory)
.process(new ContinuousQueryFunction());
// 输出连续查询结果
resultStream.print();
env.execute("Flink Kafka Real Time Continuous Query");
}
// 订单信息实体类
public static class OrderInfo {
private String category;
private Double amount;
public OrderInfo(String category, Double amount) {
this.category = category;
this.amount = amount;
}
public String getCategory() {
return category;
}
public Double getAmount() {
return amount;
}
}
// 类目总金额结果实体类
public static class CategoryTotal {
private String category;
private Double totalAmount;
private Long updateTime;
public CategoryTotal(String category, Double totalAmount, Long updateTime) {
this.category = category;
this.totalAmount = totalAmount;
this.updateTime = updateTime;
}
@Override
public String toString() {
return "CategoryTotal{" +
"category='" + category + ''' +
", totalAmount=" + totalAmount +
", updateTime=" + updateTime +
'}';
}
}
// 连续查询处理函数,维护每个类目的总金额状态
public static class ContinuousQueryFunction extends KeyedProcessFunction<String, OrderInfo, CategoryTotal> {
private MapState<String, Double> totalState;
@Override
public void open(Configuration parameters) {
// 初始化状态,存储每个类目的总订单金额
MapStateDescriptor<String, Double> stateDescriptor = new MapStateDescriptor<>(
"category_total_state",
String.class,
Double.class
);
totalState = getRuntimeContext().getMapState(stateDescriptor);
}
@Override
public void processElement(OrderInfo orderInfo, Context context, Collector<CategoryTotal> collector) throws Exception {
String category = orderInfo.getCategory();
Double currentAmount = orderInfo.getAmount();
// 获取当前类目的累计总金额
Double currentTotal = totalState.get(category);
if (currentTotal == null) {
currentTotal = 0.0;
}
// 更新累计金额
Double newTotal = currentTotal + currentAmount;
totalState.put(category, newTotal);
// 输出最新的连续查询结果
collector.collect(new CategoryTotal(category, newTotal, System.currentTimeMillis()));
}
}
结果验证
启动Kafka、Flink集群后,先运行OrderProducer发送模拟订单数据,再启动Flink程序,控制台会持续输出每个类目的总订单金额,每当有新的订单数据进入Kafka,对应的类目总金额就会自动更新,实现实时连续查询的效果。
如果需要将结果输出到外部存储,比如MySQL或者Elasticsearch,只需要在resultStream后面添加对应的sink即可,Flink会持续将最新的查询结果写入到外部系统中。
注意:生产环境中需要开启Flink的Checkpoint功能,保证状态数据不丢失,同时将Kafka的offset提交配置为开启,避免数据重复消费。
Apache_FlinkKafka实时连续查询流处理修改时间:2026-06-16 03:00:50