[Rapids] Support Spark SQL with Spark Rapids

Spark SQL 是很多公司提供給商業智慧 BI (Business Intelligence) 的大數據介面,🍋爸在介紹完 Spark Thrift Server 之後,使用者可以透過 Spark SQL 對自己的資料做操作,Spark SQL 的好處是可以很容易去滿足使用者的需求,但是往往容易遇到因為使用者執行複雜 Query 導致的效能低落,本篇想要透過導入 Spark Rapids 配合 Spark Thrift Server 引入 GPU 的資源看看是否能夠加速 Spark SQL 的運算效率?

Rapids Accelerator For Apache Spark

在 Rapids 的介紹網站中,就有強調:

Apache Spark 3.0+ lets users provide a plugin that can replace the backend for SQL and DataFrame operations. This requires no API changes from the user. The plugin will replace SQL operations it supports with GPU accelerated versions. If an operation is not supported it will fall back to using the Spark CPU version. Note that the plugin cannot accelerate operations that manipulate RDDs directly.

Spark Configurations

介紹幾個很重要的 Spark Rapid Configuration

  • spark.executor.resource.gpu.amount:一個 executor 可以執行幾個 gpu
  • spark.task.resource.gpu.amount:
  • spark.plugins

問題一:無法取得 Spark Executor

一開始設定的 Spark Configurations 如下,結果就發生了無法初始化 Executor 的情況。

spark.executor.cores 2
spark.executor.memory 4g
spark.executor.resource.gpu.discoveryScript /tmp/getGpusResources.sh
spark.task.cpus 1
spark.executor.resource.gpu.amount 1
spark.task.resource.gpu.amount 0.5
spark.rapids.memory.pinnedPool.size 2g
spark.rapids.sql.concurrentGpuTasks 2
spark.rapids.sql.csv.read.double.enabled true
spark.rapids.sql.hasNans false
spark.rapids.sql.explain ALL
spark.plugins com.nvidia.spark.SQLPlugin

查看問題之後發現 executor memory 設定太高導致不夠資源開出可以跑的 executor,下調之後解決,如果 executor 都開不出來的話,GPU 也就沒有辦法使用。

問題二:SparkSQL 回傳空表格

執行簡單的 SQL command 例如:SELECT * FROM demo 結果拿到以下空字串,已回報給 Spark Rapids with Ticket https://github.com/NVIDIA/spark-rapids/issues/9255,原因是因為不支援使用 columnMapping = name 的 DeltaTable 表格。

+----+----+
| _c0| _c1|
+----+----+
|null|null|
|null|null|
|null|null|
|null|null|
|null|null|
|null|null|
|null|null|
|null|null|
+----+----+
only showing top 20 rows

後續需求已經提交,預計在 23.10 版本就可以利用 Spark Rapids 去跑有啟動 columnMapping 的 DeltaTable,原本的 Spark Rapids 會把不同的 DeltaTable 一律當成 ParquetFileFormat 來讀取,不會特別去處理 DeltaTableFileFormat,但是就沒辦法處理 columnMapping = “name” 的 DeltaTable。修改完之後的程式碼:

  1. 在 GpuFileSourceScanExec 裡面利用 ExternalSource 去處理 DeltaTableFileFormat 的輸入資訊。
  2. 利用在 ExternalSource 裡面的 DeltaProvider 去執行createMultiFileReaderFactory。

原本的 createMultiFileReaderFactory 會回傳以下 GpuParquetMultiFilePartitionReaderFactory

GpuParquetMultiFilePartitionReaderFactory(
  sqlConf,
  broadcastedHadoopConf,
  relation.dataSchema,
  requiredSchema,
  readPartitionSchema,
  pushedDownFilters.toArray,
  rapidsConf,
  allMetrics,
  queryUsesInputFile,
  alluxioPathReplacementMap)

後來改成

GpuParquetMultiFilePartitionReaderFactory(
  fileScan.conf,
  broadcastedConf,
  prepareSchema(fileScan.relation.dataSchema),
  prepareSchema(fileScan.requiredSchema),
  prepareSchema(fileScan.readPartitionSchema),
  pushedFilters,
  fileScan.rapidsConf,
  fileScan.allMetrics,
  fileScan.queryUsesInputFile,
  fileScan.alluxioPathsMap)

在確定 DeltaTable 可以直接透過 Spark Rapids 讀取之後,我們遇到了效能的問題。

問題三:如何優化 Spark Configuration 參數可以最有效率的使用 GPU Cluster 的資源?

為了解決這個問題,我們參考了這篇文章,裡面針對以下的設定做了一個清楚的描述,最重要的概念就是要讓 GPU parallelism 可以等於 CPU parallelism。

Profiling Tool

後續我們將繼續測試不同 SparkSQL 使用 GPU 的情況並且利用 Spark Rapids 的 Profiling Tool 來協助觀察,關於 Spark Rapids Profiling Tool 詳細可以參考。