高效运行 Spark on K8s:配置与调优秘籍
Apache Spark 作为大数据处理领域的事实标准,凭借其强大的内存计算能力和灵活的 API,在批处理、流处理、机器学习等场景中得到了广泛应用。而 Kubernetes (K8s) 作为容器编排领域的领导者,提供了强大的资源管理、弹性伸缩和自愈能力。将 Spark 运行在 K8s 之上,不仅能够充分利用 K8s 的优势,实现资源的统一管理和调度,还能简化 Spark 集群的部署和运维,提升整体效率。然而,要在 K8s 环境中高效运行 Spark,并非易事,需要深入理解两者结合的机制,并进行细致的配置与调优。本文将详细探讨 Spark on K8s 的配置方法、关键调优参数以及最佳实践,助您打造高性能的 Spark on K8s 平台。
一、 Spark on K8s 运行模式简介
Spark on K8s 支持两种主要的部署模式:
-
Cluster Mode (集群模式):
- 用户通过
spark-submit
提交应用到 K8s 集群。 - K8s Master 会创建一个 Spark Driver Pod。
- Driver Pod 启动后,会向 K8s Master 请求创建 Executor Pods。
- Driver 和 Executor Pods 之间通过 K8s Service 或直接的 Pod IP 进行通信。
- 这是生产环境中最常用的模式,Driver 运行在集群内部,具备高可用性。
- 用户通过
-
Client Mode (客户端模式):
- Spark Driver 运行在提交应用的客户端节点(例如本地机器或 K8s 集群外的某个节点)。
- Driver 向 K8s Master 请求创建 Executor Pods。
- Executor Pods 启动后,会反向连接到客户端的 Driver。
- 这种模式主要用于交互式查询和开发调试,因为 Driver 的生命周期与客户端进程绑定,且需要确保 K8s 集群内的 Executor Pods 能够访问到客户端 Driver 的网络。
本文主要聚焦于更常用且更适合生产环境的 Cluster Mode。
二、 核心配置:让 Spark 在 K8s 上跑起来
要在 K8s 上成功运行 Spark 应用,首先需要进行一系列基础配置。
-
构建自定义 Spark Docker 镜像:
- 为何需要? Spark 官方提供的镜像可能不包含您应用所需的所有依赖(如特定的 JDBC 驱动、Python 库、Hadoop connector 等),或者您可能需要特定版本的 JDK 或 Spark。
- 如何构建?
- 基于官方 Spark 镜像(
apache/spark
或bitnami/spark
等)或基础操作系统镜像(如openjdk
)。 -
在 Dockerfile 中添加自定义依赖:
“`dockerfile
FROM apache/spark:3.3.1-debian # 或者您选择的基础镜像USER root # 切换到 root 用户安装依赖
安装额外的 Python 库
RUN apt-get update && apt-get install -y python3-pip && \
pip3 install numpy pandas scikit-learn && \
apt-get clean && rm -rf /var/lib/apt/lists/*添加自定义 JAR 包 (例如 Hadoop/S3 connector)
COPY ./jars/hadoop-aws-3.3.1.jar /opt/spark/jars/
COPY ./jars/aws-java-sdk-bundle-1.11.901.jar /opt/spark/jars/
USER ${spark_uid} # 切换回 Spark 用户
“`
* 构建并推送到您的私有或公有 Docker Registry (如 Docker Hub, Harbor, ACR, ECR, GCR)。
* 关键点:
* 保持 Driver 和 Executor 使用相同版本的 Spark 镜像,避免兼容性问题。
* 镜像尽可能小,以减少 Pod 启动时间。
- 基于官方 Spark 镜像(
-
配置 K8s RBAC (Role-Based Access Control):
- Spark Driver Pod 需要权限来创建和管理 Executor Pods、ConfigMaps、Services 等 K8s 资源。
- 创建 ServiceAccount:
yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
namespace: spark-jobs # 建议为 Spark 作业使用单独的 namespace -
创建 Role/ClusterRole 和 RoleBinding/ClusterRoleBinding:
“`yaml
# Role (namespace 级别)
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: spark-jobs
name: spark-role
rules:- apiGroups: [“”]
resources: [“pods”, “services”, “configmaps”]
verbs: [“create”, “get”, “list”, “watch”, “delete”, “patch”, “update”] - apiGroups: [“apps”] # 如果使用 Spark Operator 可能需要
resources: [“statefulsets”, “deployments”]
verbs: [“create”, “get”, “list”, “watch”, “delete”]
… 可能还需要其他权限,如 events
RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-role-binding
namespace: spark-jobs
subjects:
– kind: ServiceAccount
name: spark
namespace: spark-jobs
roleRef:
kind: Role
name: spark-role
apiGroup: rbac.authorization.k8s.io
``
ClusterRole
如果 Spark 作业需要在多个 Namespace 中创建资源,或者需要访问集群级别的资源,则应使用和
ClusterRoleBinding`。 - apiGroups: [“”]
-
spark-submit
命令参数:
提交 Spark 应用到 K8s 时,需要指定 K8s Master URL 和一系列 K8s 特定的配置。
bash
$SPARK_HOME/bin/spark-submit \
--master k8s://https://<k8s-api-server-host>:<k8s-api-server-port> \
--deploy-mode cluster \
--name my-spark-app \
--conf spark.kubernetes.namespace=spark-jobs \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=your-registry/your-spark-image:tag \
--conf spark.executor.instances=3 \
--conf spark.executor.memory=2g \
--conf spark.executor.cores=1 \
--conf spark.driver.memory=1g \
--conf spark.driver.cores=1 \
--conf spark.kubernetes.driver.podTemplateFile=path/to/driver-pod-template.yaml \ # 可选
--conf spark.kubernetes.executor.podTemplateFile=path/to/executor-pod-template.yaml \ # 可选
your-application-jar-or-python-file \
[application-arguments]
关键参数解释:--master k8s://...
: 指定 K8s API Server 的地址。--deploy-mode cluster
: 必须为集群模式。spark.kubernetes.namespace
: Spark 应用运行的 K8s Namespace。spark.kubernetes.authenticate.driver.serviceAccountName
: Driver Pod 使用的 ServiceAccount。spark.kubernetes.container.image
: 指定 Driver 和 Executor 使用的 Docker 镜像。spark.executor.instances
: 初始 Executor 数量。spark.executor.memory
,spark.executor.cores
: Executor 的内存和 CPU。spark.driver.memory
,spark.driver.cores
: Driver 的内存和 CPU。spark.kubernetes.driver.podTemplateFile
,spark.kubernetes.executor.podTemplateFile
: (可选) 允许您更细致地自定义 Driver 和 Executor Pod 的规格,例如挂载特定的 Volume、设置 Node Selector、Affinity/Anti-affinity、Tolerations 等。
三、 性能调优:榨干 Spark on K8s 的潜力
仅仅让 Spark 在 K8s 上跑起来是不够的,还需要进行细致的调优才能获得理想的性能。
-
资源配置与管理 (Resource Sizing):
- Driver 资源:
spark.driver.memory
: Driver 内存。需要足够大以容纳 RDD 元数据、任务调度信息等。如果 Driver OOM,整个应用会失败。通常设置为 1G-4G,具体取决于应用复杂度和数据规模。spark.driver.cores
: Driver CPU 核数。通常 1-2 核即可,Driver 主要负责调度,计算密集度不高。spark.kubernetes.driver.request.cores
/spark.kubernetes.driver.limit.cores
: K8s层面的CPU请求和限制。spark.kubernetes.driver.memoryOverheadFactor
或spark.driver.memoryOverhead
: JVM 堆外内存,用于字符串、NIO Buffer 等。默认为 Driver 内存的 10% 或 384MB 中较大者。如果遇到 “Container killed by OOM” 而 JVM 内部没有 OOM,通常需要调高这个值。
- Executor 资源:
spark.executor.instances
: Executor 数量。根据集群资源和并行度需求设定。spark.executor.memory
: 每个 Executor 的 JVM 堆内存。这是 Spark 计算的主要内存区域。spark.executor.cores
: 每个 Executor 分配的 CPU 核数。直接影响任务并行度。通常设置为 2-5 核,太少无法充分利用 CPU,太多可能导致线程切换开销和 HDFS I/O 瓶颈。spark.kubernetes.executor.request.cores
/spark.kubernetes.executor.limit.cores
: K8s 层面的 CPU 请求和限制。spark.kubernetes.executor.memoryOverheadFactor
或spark.executor.memoryOverhead
: Executor 的 JVM 堆外内存。非常重要,尤其在使用 PySpark 或大量堆外操作时。如果不足,Executor Pod 会被 K8s OOMKilled。建议设置为 Executor Memory 的 10%-20%,甚至更高。
- 黄金法则:
- 内存:
(Executor Pod Memory Limit) = (spark.executor.memory) + (spark.executor.memoryOverhead)
。确保 K8s 分配给 Pod 的总内存大于 Spark JVM 堆内存和堆外内存之和。 - CPU:
(Executor Pod CPU Limit) = (spark.executor.cores)
。 - 右 sizing: 通过 Spark UI 和 K8s 监控(如 Prometheus/Grafana)观察资源使用情况,逐步调整,避免资源浪费或不足。
- 内存:
- Driver 资源:
-
Shuffle 性能优化:
Shuffle 是 Spark 中最昂贵的操作之一。在 K8s 环境下,由于 Pod 的动态性和可能的网络隔离,Shuffle 优化尤为重要。- External Shuffle Service (ESS):
- 为何重要? 在 K8s 中,Executor Pod 可能会因为节点故障、动态缩容等原因被销毁。如果 Shuffle 数据存储在 Executor 本地磁盘,一旦 Executor 丢失,其上的 Shuffle 文件也会丢失,导致任务重算。
- ESS 将 Shuffle 数据写入独立于 Executor Pod 的服务中。即使 Executor 失败,Shuffle 数据依然存在,避免了大量重算。
- 配置: Spark 3.0+ 开始原生支持 K8s 的 External Shuffle Service。需要部署 K8s Shuffle Service (通常是 DaemonSet)。
bash
# spark-submit 时启用
--conf spark.shuffle.service.enabled=true
--conf spark.kubernetes.shuffle.namespace=<namespace-of-shuffle-service>
--conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,component=shuffle-service"
# 确保 Driver/Executor Pod 的 ServiceAccount 有权限 get/list/watch Shuffle Service 的 Endpoints。
spark.sql.shuffle.partitions
: Spark SQL 中 Shuffle 操作的默认分区数。默认为 200。如果数据量小,过多的分区会导致大量小任务和调度开销;如果数据量大,过少的分区可能导致单个任务处理数据过多,引发 OOM 或处理缓慢。建议根据数据大小和 Executor 核数调整,通常设置为 Executor 总核数的 2-3 倍。spark.default.parallelism
: RDD 操作的默认并行度。如果没有显式设置,它通常等于集群的总核心数(或由spark.sql.shuffle.partitions
影响)。
- External Shuffle Service (ESS):
-
序列化 (Serialization):
- Spark 支持 Java 序列化和 Kryo 序列化。Kryo 通常比 Java 序列化更快、更紧凑。
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired=false
(方便使用,但设为true
并注册所有自定义类能获得更好性能和稳定性)spark.kryoserializer.buffer.max=128m
(或更高,根据数据大小调整,避免序列化大对象时出错)
-
垃圾回收 (Garbage Collection) 调优:
- 长时间的 GC pause 会严重影响 Spark 性能。
- 推荐使用 G1GC (Garbage-First Garbage Collector) 作为 Executor 和 Driver 的 GC 策略。
bash
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35"
--conf spark.driver.extraJavaOptions="-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35" - 通过 Spark UI 的 Executors 标签页监控 GC 时间,如果 GC 时间占比过高,需要调整 JVM 参数或内存分配。
-
动态资源分配 (Dynamic Allocation):
- 允许 Spark 根据工作负载动态增减 Executor 数量。
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.shuffleTracking.enabled=true
(推荐开启,更智能地处理包含 Shuffle 数据的 Executor)spark.dynamicAllocation.minExecutors
: 最小 Executor 数。spark.dynamicAllocation.maxExecutors
: 最大 Executor 数。spark.dynamicAllocation.initialExecutors
: 初始 Executor 数 (通常等于 minExecutors)。spark.dynamicAllocation.executorIdleTimeout
: Executor 空闲多久后被移除。spark.dynamicAllocation.cachedExecutorIdleTimeout
: 缓存了数据的 Executor 空闲多久后被移除。- 注意: 动态分配需要 External Shuffle Service 才能有效工作,否则移除 Executor 会导致 Shuffle 数据丢失。
-
Pod 放置策略 (Pod Placement):
- 使用 K8s 的调度特性来优化 Spark Pod 的放置。
- Node Selector (
spark.kubernetes.node.selector.<labelKey>=<labelValue>
): 将 Spark Pod 调度到具有特定标签的节点上(例如,有 SSD 的节点,或特定硬件配置的节点)。 - Node Affinity/Anti-affinity (
spark.kubernetes.driver.affinity
和spark.kubernetes.executor.affinity
通过 Pod Template 实现):- Affinity: 将 Pod 调度到满足特定条件的节点。例如,将计算密集型 Executor 调度到 CPU 优化型节点。
- Anti-affinity: 避免将某些 Pod 调度到一起。例如,将 Driver 和重要的 Executor 分散到不同节点或可用区,以提高容错性。
- Tolerations (
spark.kubernetes.driver.tolerations
和spark.kubernetes.executor.tolerations
通过 Pod Template 实现): 允许 Spark Pod 调度到带有特定污点 (Taints) 的节点上。
-
数据本地性 (Data Locality):
- 在纯 K8s 环境中,如果数据存储在外部系统(如 S3, HDFS, GCS),传统的节点本地性(NODE_LOCAL)意义不大。
- 主要关注 PROCESS_LOCAL (数据在同一 Executor 内) 和 RACK_LOCAL (如果数据中心支持机架感知)。
- 优化策略更多地依赖于高效的数据连接器和合理的并行度。
- 如果使用 HDFS on K8s (例如通过 Rook+Ceph 或专门的 HDFS Operator 部署),可以尝试通过 Node Affinity 将 Spark Executor Pod 调度到 HDFS DataNode Pod 所在的 K8s Worker 节点,以期获得更好的数据本地性。
-
日志与监控:
- Spark UI: 依然是诊断性能问题的核心工具。确保可以通过 K8s Ingress 或
kubectl port-forward
访问 Driver Pod 的 Spark UI (默认4040端口)。 - K8s 日志:
kubectl logs <pod-name>
查看 Driver 和 Executor Pod 的标准输出和标准错误日志。 - Prometheus & Grafana: 部署 Prometheus 监控 K8s 集群资源(节点 CPU、内存、网络、磁盘)和 Spark 指标(通过 JMX Exporter 或 Spark 提供的 metrics endpoint)。Grafana 用于可视化这些指标。
spark.eventLog.enabled=true
和spark.eventLog.dir
: 将 Spark 事件日志持久化到 HDFS, S3 等共享存储,以便后续通过 Spark History Server 查看已完成应用的详细信息。
- Spark UI: 依然是诊断性能问题的核心工具。确保可以通过 K8s Ingress 或
-
存储配置 (Storage):
- 临时数据: Spark Shuffle 和溢写数据默认使用 Pod 的本地临时存储 (ephemeral storage)。如果节点磁盘空间不足或 I/O 性能差,会影响性能。可以考虑:
- 使用具有高性能本地 SSD 的节点。
- 通过 Pod Template 将
emptyDir
挂载到tmpfs
(内存文件系统) 以加速临时数据读写,但需注意内存消耗。 spark.local.dir
: 可以配置为 Pod 内的多个路径,分散 I/O。
- 持久化数据 (RDD.persist):
MEMORY_ONLY
,MEMORY_AND_DISK
: 如果磁盘是 Pod 本地磁盘,Pod 销毁数据会丢失。- 对于需要跨应用或更长久保留的持久化数据,应使用外部存储如 HDFS, S3, Ceph 等,并通过相应的 Connector 访问。
- 临时数据: Spark Shuffle 和溢写数据默认使用 Pod 的本地临时存储 (ephemeral storage)。如果节点磁盘空间不足或 I/O 性能差,会影响性能。可以考虑:
四、 最佳实践与进阶技巧
-
使用 Spark Operator:
- GoogleCloudPlatform/spark-on-k8s-operator 或类似项目,通过声明式 CRD (Custom Resource Definition) 管理 Spark 应用,简化了提交流程、生命周期管理和监控集成。
- 例如,定义一个
SparkApplication
YAML 文件,然后kubectl apply -f
即可。
-
Namespace 隔离:
- 为不同的团队或项目使用不同的 K8s Namespace 运行 Spark 应用。
- 结合 ResourceQuota 和 LimitRange 控制每个 Namespace 的资源使用,避免资源争抢。
-
安全:
- 确保 K8s API Server 安全,使用 TLS。
- 最小权限原则配置 RBAC。
- 如果需要访问安全的外部服务(如 Kerberized HDFS, S3),需要配置相应的认证机制,例如通过 Secrets 挂载 Keytab 文件或配置 IAM Role for Service Accounts (IRSA on EKS, Workload Identity on GKE)。
spark.kubernetes.kerberos.krb5.path
/spark.kubernetes.kerberos.krb5.configMapName
。spark.kubernetes.driver.secrets.[SecretName]=/mnt/secrets/[SecretName]
spark.kubernetes.executor.secrets.[SecretName]=/mnt/secrets/[SecretName]
-
依赖管理:
- 除了构建到 Docker 镜像中,还可以通过
spark.jars.packages
(Maven 坐标)、spark.jars
(逗号分隔的 JAR 路径)、spark.files
(普通文件) 提交依赖。 - 对于 Python,可以使用
--py-files
和虚拟环境 (如 venv, conda) 打包到镜像中或通过spark.archives
分发。
- 除了构建到 Docker 镜像中,还可以通过
-
版本一致性:
- 确保 Spark 版本、Scala 版本 (如果使用 Scala)、Java 版本、Python 版本以及关键依赖库 (如 Hadoop client) 在 Driver、Executors 和客户端提交环境(如果相关)之间保持一致,避免难以排查的运行时错误。
-
小文件问题:
- 如果处理大量小文件,会导致过多的 Spark Task,增加调度开销和元数据压力。
- 在读取前进行小文件合并,或使用
spark.sql.files.maxPartitionBytes
、spark.sql.adaptive.coalescePartitions.enabled=true
(AQE特性) 等参数进行优化。
五、 总结
将 Spark 运行在 Kubernetes 上是一项强大而灵活的技术组合,它能够带来资源利用率、运维效率和可伸缩性的显著提升。然而,要充分发挥其潜力,需要对 Spark 和 K8s 的配置选项有深入的理解,并根据实际应用场景和数据特性进行细致的调优。
核心要点包括:精心构建包含所有依赖的 Spark 镜像,正确配置 RBAC 权限,合理分配 Driver 和 Executor 的 CPU 与内存资源(特别是 JVM 堆外内存),启用并配置 External Shuffle Service,优化序列化和垃圾回收机制,以及利用 K8s 的调度能力进行 Pod 放置。同时,动态资源分配、完善的日志监控体系和对存储的合理规划也是不可或缺的环节。
调优是一个持续迭代的过程。从一个合理的基线配置开始,通过 Spark UI、K8s 监控工具和实际运行表现,不断分析瓶颈,调整参数,最终找到最适合您业务场景的“甜蜜点”。希望本文提供的这份“秘籍”能为您在 Spark on K8s 的征途上提供有力的指引。