The following post is to summarize a proof of concept I conducted with Erwan Gallen for deploying Apache Spark clusters on OpenShift Cloud Platform, using GPU support provided by the Spark Rapids Spark Plugin. The OpenShift objects and other artifacts referenced here can also be found at this github repo.

Background

The Spark Rapids plugin leverages the ability of Apache Spark to discover and load jar files that customize certain aspects of Spark’s behavior. In the case of Spark Rapids, the plugin enhances Spark’s Catalyst query planner so it can automatically detect Spark operations that can be accelerated by GPU hardware and route them to available GPUs.

Container Images for Spark Rapids

To run on a container orchestration platform such as OpenShift or its upstream Kubernetes, we must begin by creating a container image with Spark, CUDA, and Spark Rapids installed. CUDA can be obtained pre-packaged from one of the nvidia CUDA images. In this POC, we used a CUDA 10.x based on ubuntu. More recent versions of CUDA are now also available installed on top of Red Hat UBI.

You can refer to the Dockerfile we created on this POC for examples of how to install the Spark, Spark Rapids and GPU dependnecies.

Additionally, you must install GPU drivers appropriate for your hardware platform onto your image. As you can see in the Dockerfile, one can install a selection of GPU drivers directly from the ubuntu package manager: in our study we installed nvidia-driver-440. You can also download drivers directly to install them on your image.

Lastly, note that Spark typically uses a special entrypoint script that pre-configures an entry in the /etc/passwd file that Spark requires to start up.

The particular images we built for this study are available here.

Deploying on a Cluster

For our proof of concept, we ran spark-shell on a pod from inside the cluster.

In order for our spark-shell command to successfully operate inside the cluster, we need to deploy some supporting Kubernetes objects. Each item links to the corresponding YAML file on the project repository.

With these objects installed, we can now set up a pod for our spark-shell run. The run command looked like this example, which provides a remote shell on the cluster pod at our command line:

$ oc run -i -t --serviceaccount=spark spark --image=quay.io/erikerlandson/spark-rapids:latest --command -- /bin/bash

In the next section we will describe the configuration arguments for enabling a spark-shell with Spark Rapids GPU support.

Configuring Spark to use Rapids

We ran spark-shell from the command line of our pod, as created in the oc run command above. The following command details all the configuration arguments needed for Spark to connect to the in-cluster kubernetes API, set Spark’s resource requirements, instruct Spark to use GPUs for its tasks and enable Spark Rapids:

$ ${SPARK_HOME}/bin/spark-shell --master k8s://https://kubernetes.default:443 \
             --conf spark.kubernetes.authenticate.submission.oauthToken=$(cat /run/secrets/kubernetes.io/serviceaccount/token) \
             --conf spark.kubernetes.container.image=quay.io/erikerlandson/spark-rapids:latest \
             --conf spark.driver.host=$(hostname) \
             --conf spark.locality.wait=0s \
             --conf spark.driver.memory=2g  --conf spark.executor.memory=4g \
             --conf spark.executor.cores=1  --conf spark.task.cpus=1 \
             --conf spark.plugins=com.nvidia.spark.SQLPlugin \
             --conf spark.executor.resource.gpu.discoveryScript=/opt/getGpusResources.sh \
             --conf spark.executor.resource.gpu.vendor=nvidia.com  \
             --conf spark.rapids.sql.concurrentGpuTasks=1  \
             --conf spark.rapids.memory.pinnedPool.size=1g  \
             --conf spark.task.resource.gpu.amount=1  \
             --conf spark.executor.resource.gpu.amount=1 \
             --conf spark.worker.resource.gpu.amount=1  \
             --conf spark.sql.files.maxPartitionBytes=512m   \
             --conf spark.sql.shuffle.partitions=10

The following arguments tell Spark how to connect its driver (spark-shell) to the kubernetes API and use the service account we set up for our shell pod:

--master k8s://https://kubernetes.default:443
--conf spark.kubernetes.authenticate.submission.oauthToken=$(cat /run/secrets/kubernetes.io/serviceaccount/token)

The spark.driver.host configuration tells spark to advertise its driver location as the in-cluster address of the pod it’s running on:

--conf spark.driver.host=$(hostname)

These configurations tell spark how to discover GPU driver information on the physical machines its pods are running on, and to use GPU support for its tasks:

--conf spark.executor.resource.gpu.discoveryScript=/opt/getGpusResources.sh
--conf spark.executor.resource.gpu.vendor=nvidia.com 
--conf spark.task.resource.gpu.amount=1
--conf spark.executor.resource.gpu.amount=1
--conf spark.worker.resource.gpu.amount=1

Lastly, these configurations specifically instruct Spark to use the Spark Rapids plugins:

--conf spark.plugins=com.nvidia.spark.SQLPlugin
--conf spark.rapids.sql.concurrentGpuTasks=1
--conf spark.rapids.memory.pinnedPool.size=1g

Demonstrating Spark Rapids

Once we have started up our spark-shell as detailed above, we can run a test to demonstrate the Spark Rapids is accelerating our Spark operations. The following session runs a simple join operation and then examines the corresponding query plan using the Spark SQL explain operator to verify that Spark Rapids is operating on the query:

scala> val dfa = sc.makeRDD(1 to 10000000, 6).toDF("a")
dfa: org.apache.spark.sql.DataFrame = [a: int]
 
scala> val dfb = sc.makeRDD(1 to 10000000, 6).toDF("b")
dfb: org.apache.spark.sql.DataFrame = [b: int]
 
scala> dfa.registerTempTable("tba")
scala> dfb.registerTempTable("tbb")
 
scala> val join = spark.sql("select a from tba left join tbb on tba.a = tbb.b")
join: org.apache.spark.sql.DataFrame = [a: int]
 
scala> join.count
res44: Long = 10000000                                                          
 
scala> val plan = spark.sql("explain extended select a from tba left join tbb on tba.a = tbb.b")
plan: org.apache.spark.sql.DataFrame = [plan: string]
 
scala> println(s"${plan.first.getString(0)}")
== Parsed Logical Plan ==
'Project ['a]
+- 'Join LeftOuter, ('tba.a = 'tbb.b)
   :- 'UnresolvedRelation [tba]
   +- 'UnresolvedRelation [tbb]
 
== Analyzed Logical Plan ==
a: int
Project [a#78]
+- Join LeftOuter, (a#78 = b#85)
   :- SubqueryAlias tba
   :  +- Project [value#75 AS a#78]
   :     +- SerializeFromObject [input[0, int, false] AS value#75]
   :        +- ExternalRDD [obj#74]
   +- SubqueryAlias tbb
      +- Project [value#82 AS b#85]
         +- SerializeFromObject [input[0, int, false] AS value#82]
            +- ExternalRDD [obj#81]

== Optimized Logical Plan ==
Project [a#78]
+- Join LeftOuter, (a#78 = b#85)
   :- Project [value#75 AS a#78]
   :  +- SerializeFromObject [input[0, int, false] AS value#75]
   :     +- ExternalRDD [obj#74]
   +- Project [value#82 AS b#85]
      +- SerializeFromObject [input[0, int, false] AS value#82]
         +- ExternalRDD [obj#81]
 
== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [a#78]
   +- GpuShuffledHashJoin [a#78], [b#85], LeftOuter, GpuBuildRight, false
      :- GpuShuffleCoalesce 2147483647
      :  +- GpuColumnarExchange gpuhashpartitioning(a#78, 10), true, [id=#653]
      :     +- GpuProject [value#75 AS a#78]
      :        +- GpuRowToColumnar TargetSize(2147483647)
      :           +- *(1) SerializeFromObject [input[0, int, false] AS value#75]
      :              +- Scan[obj#74]
      +- GpuCoalesceBatches RequireSingleBatch
         +- GpuShuffleCoalesce 2147483647
            +- GpuColumnarExchange gpuhashpartitioning(b#85, 10), true, [id=#660]
               +- GpuProject [value#82 AS b#85]
                  +- GpuRowToColumnar TargetSize(2147483647)
                     +- *(2) SerializeFromObject [input[0, int, false] AS value#82]
                        +- Scan[obj#81]

As we can from the Physical Plan output above, Spark Rapids has altered the physical query plan to use GPU accelerated operations. This query plan output is also a good example of the Spark Rapids plugin design: Rapids allows Spark to perform logical plan optimization as usual, and applies the lower-level GPU optimizations to the resulting physical plan.

We can also make use of Spark’s Web UI to view the operation of Spark Rapids. The following screen shot shows the modifications that Spark Rapids has made to the query:

Spark Web UI

Benchmarking

The above test query ran in approximately 11 seconds using Spark Rapids. We ran a corresponding spark-shell configured without GPU support and the test join ran in approximately 40 seconds, and so Spark Rapids accelerated our test by ~4x in our hardware environment.