在大数据处理的实际场景中,我们经常会遇到这样的困扰:明明写了逻辑正确的SQL查询,集群资源也足够,但查询就是执行得很慢,有时候甚至直接失败。排查之后往往会发现,罪魁祸首就是数据倾斜,再加上集群负载分配不合理,整个处理效率会被拉低很多。下面我们就来详细聊聊SQL怎么处理数据倾斜,以及大数据环境下的负载均衡方案。

一、认识数据倾斜
数据倾斜指的是在分布式计算过程中,数据没有均匀分布在各个计算节点上,导致部分节点需要处理的数据量远大于其他节点,最终出现“木桶效应”——整个任务的完成时间由处理最慢的那个节点决定。在SQL查询的场景下,数据倾斜通常出现在分组聚合、两表关联、数据去重等操作中。
1.1 数据倾斜的典型表现
我们可以通过几个现象快速判断是不是遇到了数据倾斜:
- SQL任务执行时,大部分task已经执行完成,但有几个task一直处于运行状态,进度卡在99%很久都不结束
- 监控界面显示部分节点的CPU、内存使用率远高于其他节点,甚至出现内存溢出报错
- 相同逻辑的查询,在数据量没有明显增长的情况下,执行时间突然变长好几倍
1.2 数据倾斜的常见原因
数据倾斜的根源通常是数据本身的分布特性,结合SQL处理逻辑共同导致的:
- 业务数据本身存在热点,比如电商场景中某几个大商家的订单量占整体订单量的很大比例,按商家ID分组时就会出现倾斜
- 关联键存在大量空值或者默认值,比如很多日志的user_id字段为空,关联时这些空值都会分到同一个分区
- 分区字段选择不合理,比如用日期做分区但某几天的数据量远超其他日期
- SQL语句写法不当,比如使用了低效的关联方式,或者没有提前过滤无效数据
二、SQL层面处理数据倾斜的方法
针对不同的数据倾斜场景,我们可以从SQL逻辑优化、数据预处理、参数调整等多个角度入手解决。
2.1 业务逻辑层面的优化
很多时候数据倾斜是业务特性导致的,如果能从业务角度调整处理逻辑,往往能从根源上解决问题。比如上面提到的电商大商家订单倾斜的问题,如果业务上允许,可以把大商家的订单单独拆分出来处理,和普通商家的订单分开计算,最后再做结果合并。
举个简单的例子,原来的查询是按商家ID统计订单金额:
-- 原始查询,可能存在倾斜 SELECT merchant_id, SUM(order_amount) AS total_amount FROM order_table GROUP BY merchant_id;
如果已知merchant_id为10001、10002的是大商家,订单量占整体的60%,可以改写查询:
-- 拆分大商家和普通商家处理 SELECT merchant_id, SUM(order_amount) AS total_amount FROM order_table WHERE merchant_id NOT IN (10001, 10002) GROUP BY merchant_id UNION ALL SELECT merchant_id, SUM(order_amount) AS total_amount FROM order_table WHERE merchant_id IN (10001, 10002) GROUP BY merchant_id;
这样大商家的订单会单独走一个计算任务,不会占用普通商家的计算资源,能大幅降低倾斜影响。
2.2 数据预处理打散热点
如果业务上不能拆分热点数据,我们可以在数据写入的时候或者查询前对热点key进行打散处理。最常见的方法是给热点key加上随机前缀,把原来的一个key拆成多个key,分散到不同的计算节点上,计算完成后再把结果聚合回来。
还是以上面的商家订单统计为例,给大商家的merchant_id加上随机前缀:
-- 给热点商家ID加随机前缀打散
WITH tmp_order AS (
SELECT
CASE
WHEN merchant_id IN (10001, 10002)
-- 给热点商家加1-10的随机前缀,拆成10个key
THEN CONCAT(merchant_id, '_', CAST(FLOOR(RAND() * 10) AS STRING))
ELSE CAST(merchant_id AS STRING)
END AS new_merchant_id,
order_amount
FROM order_table
)
-- 第一次聚合,打散后计算
SELECT new_merchant_id, SUM(order_amount) AS sub_total
FROM tmp_order
GROUP BY new_merchant_id
-- 第二次聚合,把打散的结果还原回去
SELECT
CASE
WHEN new_merchant_id LIKE '%_%'
THEN CAST(SPLIT_PART(new_merchant_id, '_', 1) AS BIGINT)
ELSE CAST(new_merchant_id AS BIGINT)
END AS merchant_id,
SUM(sub_total) AS total_amount
FROM (
SELECT new_merchant_id, SUM(order_amount) AS sub_total
FROM tmp_order
GROUP BY new_merchant_id
) t
GROUP BY
CASE
WHEN new_merchant_id LIKE '%_%'
THEN CAST(SPLIT_PART(new_merchant_id, '_', 1) AS BIGINT)
ELSE CAST(new_merchant_id AS BIGINT)
END;这种方法的核心是先把热点key拆成多个子key,分散计算压力,最后再把子key的结果合并成最终的热点key结果,适合热点key数量不多但单个key数据量极大的场景。
2.3 关联场景的倾斜处理
两表关联是数据倾斜的高发场景,尤其是大表关联小表,或者关联键存在大量空值的时候。针对不同的关联场景有不同的处理方式:
2.3.1 空值处理
如果关联键存在大量空值,这些空值在关联时会分到同一个分区,导致倾斜。我们可以提前把空值过滤掉,或者给空值赋一个随机值,避免集中到同一个分区。
-- 处理关联键空值,给空值赋随机值打散
SELECT
a.user_id,
a.order_id,
b.user_name
FROM (
SELECT
order_id,
-- 给空user_id赋随机值,避免空值集中
CASE WHEN user_id IS NULL THEN CAST(FLOOR(RAND() * 10000) AS STRING) ELSE user_id END AS user_id
FROM order_table
) a
LEFT JOIN user_table b
ON a.user_id = b.user_id;2.3.2 大表关联小表的倾斜
如果是大表关联小表,且小表的数据量不大,可以使用map端join的方式,把小表的数据加载到内存中,在map阶段就完成关联,避免reduce阶段的shuffle操作,从根源上避免数据倾斜。不同SQL引擎的实现方式略有不同,比如Hive中可以设置参数开启mapjoin:
-- Hive中开启mapjoin,指定小表
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize=25000000; -- 小表大小阈值,25M以下自动转为mapjoin
SELECT
a.order_id,
b.user_name
FROM big_order_table a
LEFT JOIN small_user_table b
ON a.user_id = b.user_id;如果SQL引擎不支持自动mapjoin,也可以手动把小表的数据写成常量,在查询中直接匹配:
-- 手动实现类似mapjoin的逻辑,适合小表数据极少的场景
SELECT
order_id,
CASE user_id
WHEN 1001 THEN '张三'
WHEN 1002 THEN '李四'
WHEN 1003 THEN '王五'
ELSE '未知用户'
END AS user_name
FROM big_order_table;2.4 SQL参数调整
大部分大数据SQL引擎都提供了针对数据倾斜的优化参数,合理调整这些参数也能缓解倾斜问题。
- 增加reduce任务数量:数据倾斜很多时候是因为reduce任务太少,单个reduce处理的数据量太大,可以适当增加reduce的数量,让数据分布更均匀。比如Hive中可以设置
mapreduce.job.reduces参数。 - 开启倾斜优化开关:比如Hive的
hive.groupby.skewindata参数,开启后会在分组聚合时自动生成一个额外的MR任务,先把数据打散再聚合,自动处理热点key。 - 调整内存参数:如果是部分节点内存溢出导致的任务失败,可以适当增加单个task的内存配置,避免因为内存不足导致的倾斜假象。
三、大数据环境下的负载均衡方案
除了SQL层面的优化,大数据集群整体的负载均衡也能有效提升SQL查询的执行效率,避免单个节点过载,其他节点空闲的情况。
3.1 资源调度层面的负载均衡
大数据集群通常使用YARN、K8s等作为资源调度框架,合理的资源调度配置能让SQL任务均匀分配到各个节点上。
- 配置公平调度或者容量调度:避免部分大任务占满所有集群资源,给其他任务留出足够的资源空间,保证不同优先级的SQL任务都能得到合理的资源分配。
- 开启节点资源监控和动态调度:调度框架实时监控各个节点的CPU、内存、磁盘IO使用率,当发现某个节点负载过高时,自动把新提交的任务分配到负载较低的节点上。
- 设置资源池隔离:把实时SQL查询和离线SQL查询分到不同的资源池,避免离线大任务占满资源影响实时查询的响应速度。
3.2 数据分布层面的负载均衡
数据在集群中的分布均匀程度直接影响SQL查询的负载均衡效果,我们可以从数据存储的角度做优化:
- 合理设计分区策略:分区字段要选择分布均匀的字段,避免使用存在热点的字段做分区。如果按日期分区,要保证每天的数据量相差不大,对于数据量异常的日期可以单独拆分分区。
- 使用分桶表:对于经常需要做分组聚合或者关联的表,可以创建分桶表,按照常用分组字段或者关联字段做分桶,保证相同key的数据分布在同一桶或者相邻桶中,减少查询时的数据移动。
- 定期做数据均衡:集群运行一段时间后,可能会出现部分节点的数据量远大于其他节点的情况,这时候可以执行数据均衡命令,把数据从过载节点迁移到空闲节点,保证数据分布均匀。
3.3 任务执行层面的负载均衡
SQL任务执行过程中,也可以通过一些策略保证负载均衡:
- 动态分区调整:查询执行时,根据每个分区的数据量动态调整task的数量,数据量大的分区分配更多的task,数据量小的分区分配更少的task,避免所有分区都用相同数量的task导致资源浪费或者倾斜。
- 推测执行机制:当发现某个task执行速度远慢于其他task时,调度框架会自动启动一个相同的task并行执行,哪个先执行完就用哪个的结果,避免单个慢task拖慢整个任务。不过要注意,推测执行不适合有外部写操作的SQL任务,避免重复写入导致数据错误。
- 小文件合并:如果表中有大量小文件,查询时需要启动大量的task处理这些小文件,会导致资源浪费和负载不均。可以定期合并小文件,减少文件数量,每个task处理的数据量更均匀。
四、方案选择建议
实际场景中我们不需要同时使用所有方案,可以根据具体情况选择最适合的方法:
| 场景 | 推荐方案 |
|---|---|
| 热点key数量少,数据量极大 | 热点key拆分打散+SQL参数调整 |
| 关联键存在大量空值 | 空值赋随机值+map端join |
| 大表关联小表 | map端join+资源池隔离 |
| 集群整体负载不均 | 动态调度+数据均衡+小文件合并 |
| 实时查询响应慢 | 资源池隔离+推测执行+分桶表优化 |
总的来说,处理SQL数据倾斜和负载均衡问题没有万能的方法,需要结合具体的业务场景、数据特性、集群配置综合判断,很多时候需要多种方案配合使用才能达到最好的效果。