Dec 9, 2023
the motivation for this post is to document our experience migrating our spark data processing to run on kubernetes.
its useful to know what we use spark for because this will play into our choices for our setup for spark on kubernetes. our main use of spark is for data processing, where the data is many times larger than what fits in memory on spark executors, and we tend to write data that is the same order of magnitude as the data that is being read. because of this our focus is on efficient out-of-core processing and large shuffles. we are less interested in query performance (e.g. using spark as the back-end for a SQL engine), which has somewhat different requirements such as efficient predicate push-down, query plan optimizations, quick resource allocation, etc.
we started out by running spark on hadoop on physical servers, and when we initially migrated to cloud platforms we leveraged the hadoop as-a-service offerings like EMR, dataproc and hdinsight, where we would install our own spark version with our software. our workflows would pop up temporary hadoop clusters, and for each cluster copy data from bucket store (S3, GCS, ABFS) into HDFS on the cluster, run the jobs, and copy result from HDFS back to the bucket store, after which the cluster was destroyed.
the main benefits of this setup were that that it was similar to what we were used to, we could leverage HDFS (which is still a lot faster than bucket stores) for intermediate results, and we had the spark shuffle service available. the downsides were that dynamic resource scaling capabilities were limited, there was a lot of overhead for these pop-up clusters, and we always had to deal with potential library (jar) conflicts with whatever was installed on these clusters in the hadoop classpath (a problem that also existed on traditional hadoop clusters).
the first thing to get your head around on kubernetes is that there is no server you launch your jobs from. what i mean by this is that the setup where you launch your job using spark-submit from a semi-permanent virtual machine (on which you can get all comfortable and install all your favorite tools) is gone.
now you might read about –deploy-mode=cluster and think this is you ticket to launching jobs from a virtual machine that sits besides the kubernetes cluster, and you are right, although it it is not really the same. for example you cannot use local files anymore. despite the fact that this approach works i think its an anti-pattern, because what –deploy-mode=cluster does is to launch a pod on kubernetes and in that pod launch spark-submit again with –deploy-mode=client. so this big install of spark that you launch from has become a really complex way to launch a single pod, something that can be done more simply using kubectl. another reason to avoid this setup is that the launch sits outside the kubernetes cluster, making it harder to manage using the really powerful kubernetes tool set.
another tool worth mentioning is the spark operator, which is pretty neat. the operator leverages a custom resource. it creates a new resource type in the kubernetes API called SparkApplication for spark jobs. there is a controller (that nobody needs direct access to, it runs on the cluster as a pod somewhere) that watches for any creation of or changes to SparkApplication resources and takes action to get them to the desired state. to me this is a way better way to go about it that than spark-submit –deploy-mode cluster. however now you launch a SparkApplication not a pod, and its not like SparkApplication extends pod or can be used in its place easily. why does that matter? well there are lots of places you can use pods, like in workflows, so this SparkApplication we found to be somewhat limiting.
we decided the best way to go about it is to launch the spark jobs directly in pods ourselves. our main motivations for this choice were that we had full control and that we could use all the tools that leverage pods like workflow engines.
spark batch jobs are not typical kubernetes applications: they require bulk scheduling of large amounts of compute resources and large and fast local scratch storage in pods.
note that tresata currently deploys kubernetes on AWS EKS and azure AKS so we will focus on those platforms here. since i am more familiar with AWS i will go in to more detail for AWS than for azure.
batch jobs require large amounts of compute resources for limited time periods. we ideally want to allocate resources on demand so we only pay for the resources we use.
on AWS for this one could use fargate, however when we were building this fargate had no good support for mounting additional storage or increasing the default container storage size, and 20G of disk space is not enough for a spark executor. apparently today fargate supports up to 175Gb of storage, see issue 1794. we haven’t tested this but 175Gb would still not be enough for many of our use cases either. instead of using fargate we decided to use managed node pools and karpenter to add or remove kubernetes nodes (EC2 instances) as required.
on azure we use cluster autoscaler which is not as versatile or powerful as karpenter but it works for now.
large scratch storage
to get the large scratch space in the spark executor pods there are generally three options:
when spark is launched on kubernetes what is being launched is the driver, which in turn will launch the executor pods. to do so requires the pod that runs the spark driver to be launched with a service account that has the required kubernetes RBAC permissions to launch pods in the same namespace. in kubernetes RBAC the permission to launch pods means the permission to launch pods as any service account within that namespace. because of this it should be assumed that spark jobs can gain access to the permissions of any service account in the namespace, and therefore to avoid unintended privilege escalations spark jobs are usually run in their own dedicated namespace, or at least in a namespace where no service accounts exists with escalated privileges.
the spark jobs run on kubernetes using kubernetes RBAC permissions, but you will also want access to data that is stored in your cloud providers bucket store (and perhaps also databases etc.) for which spark will need to use the cloud providers IAM permissions. this involves setting up an OIDC provider for the kubernetes cluster that makes the identities for the kubernetes service accounts available in IAM.
For AWS see aws documentation. to assign an AWS IAM role to a kubernetes service account the role must have a trust policy that allows the service account to become that role, and the service account must have the annotation eks.amazonaws.com/role-arn that links back to the role.
In azure the same setup is newer and is called workload identity and uses managed identities. Note that hadoop-azure as of version 3.3.6 does not support workload identity, see HADOOP-18610. To enable it you have to build hadoop-azure yourself after merging in this pull request and replace the hadoop-azure jar in your spark install with the patched version.
we build spark for scala 2.13. to build the spark docker image first build the spark distribution that is suitable for kubernetes and next the docker image:
$ git clone https://github.com/apache/spark.git
$ cd spark
$ git checkout v3.5.0
$ dev/change-scala-version.sh 2.13
$ dev/make-distribution.sh --name kubernetes --tgz -Pscala-2.13 -Pkubernetes -Phadoop-cloud
$ tar -xvzf spark-3.5.0-bin-kubernetes.tgz
$ cd spark-3.5.0-bin-kubernetes/
$ docker build -t spark:3.5.0 -f ../resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile .
after this we build another docker image that captures our preferred spark settings using the just created spark docker image as your base (parent) image. this layering doesn’t cost anything in docker and it provides a clear separation of concerns. at tresata our docker image that has our preferred spark settings is called
tresata/spark-submit (for historical reasons). it is basically the same as the spark image but with custom
/opt/spark/conf/spark-defaults.conf. the next sections will go over the changes to
you will want to customize
/opt/spark/conf/spark-defaults.conf to provide reasonable defaults for kubernetes on your target cluster provider. tresata currently maintains images with these default settings for AWS EKS and azure AKS.
you might encounter this error in your spark driver pod:
ERROR Main: Failed to initialize Spark session.
org.apache.hadoop.security.KerberosAuthException: failure to login: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
the fix is to add to
a big appeal of deploying on kubernetes is that resources can be allocated on demand, as discussed in the scale horizontally section. however for this to work for spark jobs need to scale up and down resources as needed, which is known within spark as dynamic allocation. settings we recommend in spark-defaults.conf:
shuffleTtracking.enabled is to avoid losing shuffle blocks which are kept on the executor nodes. this makes scale-down less efficient but its better than the alternative which is to lose shuffle blocks and restart tasks.
executorAllocationRatio is lowered from its default to avoid too aggressive scale up which risks wasting (and paying for) unnecessary virtual machines.
on kubernetes you likely do not have a spark shuffle service available, unless you deployed something like uniffle in which case you can skip this section. without a shuffle service you have to rely on shuffle blocks being stored in the executor pods, which means these shuffle blocks can get lost when executor pods get shut down, which is not unusual if you use dynamic allocation. to handle this you will have to rely on graceful decommissioning for executor pods. in
we considered deploying HDFS on kubernetes but decided against it due to the fixed costs of having a HDFS cluster and because data transfer performance of S3 and ABFS have gotten better in the last few years. however spark still uses hadoop libraries for interacting with filesystems including S3 and ABFS, using hadoop OutputFormat, OutputCommitter and related classes. we chose to use magic committer for S3, and the manifest committer for ABFS. For s3 we also use version 2 of the fileoutputcommitter algorithm, which is risky, but performance on S3 was otherwise simply not acceptable. ABFS performance on azure is overall much better.
Mirror Link – https://technotes.tresata.com/spark-on-k8s/