在Spark数据处理流程中,并行读取数据后执行分区写入时出现单核运行的问题,会大幅降低任务执行效率,该问题通常和分区配置、数据分布、写入逻辑相关,需要从多个维度排查优化。

问题原因分析
默认分区数不足
Spark读取数据后默认的分区数可能远小于集群可用核心数,若后续没有主动调整分区,写入时只会按照现有分区数分配任务,导致大量核心闲置。比如读取小文件时,Spark默认会按照文件数量生成分区,若文件数少于核心数,就会出现单核运行的情况。
数据倾斜导致分区不均
如果分区键对应的数据分布极度不均,部分分区的数据量远大于其他分区,Spark会优先处理小分区,最后剩下的大分区只能由单个核心处理,表现为写入后期单核运行。
写入算子配置不当
部分写入场景如果使用了coalesce(1)这类缩减分区的操作,会强制将所有数据合并到单个分区,写入时自然只会使用单核。
解决方案
调整分区数匹配核心数
读取数据后,通过repartition方法手动设置分区数,使其和集群可用核心数匹配,让每个核心都能分配到对应的写入任务。
import org.apache.spark.sql.SparkSession
object SparkWriteOptimize {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("SparkWriteOptimize")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 模拟并行读取数据
val df = spark.read.csv("data/input.csv")
// 获取可用核心数,手动设置分区数
val availableCores = spark.sparkContext.defaultParallelism
// 重新分区,让分区数和核心数匹配
val repartitionedDf = df.repartition(availableCores)
// 写入分区数据
repartitionedDf.write.partitionBy("dt").parquet("data/output")
spark.stop()
}
}
优化分区键解决数据倾斜
如果分区键存在数据倾斜,可以给分区键添加随机前缀,打散数据后重新分区,避免单个分区数据量过大。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object DataSkewSolve {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("DataSkewSolve")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val df = spark.read.csv("data/input.csv")
// 给分区键dt添加随机前缀,打散数据
val saltedDf = df.withColumn("salt", (rand() * 10).cast("int"))
.withColumn("dt_salt", concat(col("dt"), lit("_"), col("salt")))
// 按照加盐后的分区键重新分区
val repartitionedDf = saltedDf.repartition(20, col("dt_salt"))
// 写入时去掉随机前缀,按照原始分区键写入
repartitionedDf.drop("salt", "dt_salt")
.write.partitionBy("dt").parquet("data/output_skew")
spark.stop()
}
}
避免不必要的分区缩减操作
检查写入前的代码逻辑,移除coalesce(1)这类强制缩减分区的操作,除非明确需要输出单个文件。如果确实需要减少文件数量,可以使用coalesce设置合理的分区数,而不是强制缩减到1。
效果验证
优化后可以通过Spark UI查看写入阶段的任务分配情况,正常情况下每个核心都会分配到对应的写入任务,单核运行的问题会得到解决,整体任务耗时会有明显下降。如果仍然存在单核运行的情况,可以进一步检查数据分布和集群资源配置,排查是否有其他隐藏的分区限制因素。