[BigData] 實作 Spark 將 MSSQL 檔案平行匯出

Spark 2.1.0 之後,JDBC (Java Database Connectivity) 被引入作為一個 Spark 應用的其中一種輸入,在數位轉型的過程中我們很常需要將企業中的資料倉儲匯入資料湖庫,為了省去資料轉換的功夫,最好的方法就是直接將 RDBMS 的資料直接轉進 Datalake 裡面,本篇希望紀錄在 Spark 3.3.0 環境中將 MSSQL 的巨量資料轉移到地端的資料湖庫中的過程與 TroubleShooting。

Loading

關於 Spark 架構中 JDBC 的使用方法,詳細可以參考官網,透過 Spark 的框架首先可以利用以下的 Scala 指令將給定的 jdbc 資料來源載入成一個 JdbcRDD

val df = spark.read.format("com.microsoft.sqlserver.jdbc.spark")
    .option("url", url)
    .option("dbtable", dbTable)
    .option("username", username)
    .option("password", password)
    .load()
df.show()

此時可以看到 Spark 只呼叫了一個 executor 起來做資料讀取的工作。

Dependency

要使用 com.microsoft.sqlsever.jdbc.spark 來連結 MSSQL Server 需要引入 spark-mssql-connector 這個 Jar 包,由於這個 Repo 已經 Public Archive 了,感覺只支援到 Spark 3.4 版,Spark 3.5 版之後感覺就沒有支援了,另外由於在公開的 Maven Repo 裡面只看得到 1.3.0_BETA 版本,所以在整合的時候可能要特別注意一下,筆者嘗試在 Spark 3.5 的環境中去使用的時候就會遇到以下的錯誤訊息:

Py4JJavaError: An error occurred while calling o101.load.
: java.sql.SQLException: No suitable driver
  at java.sql.DriverManager.getDriver(DriverManager.java:315)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:109)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:109)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
  at py4j.Gateway.invoke(Gateway.java:282)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
  at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
  at java.lang.Thread.run(Thread.java:750)

Parallel Loading

如果想要利用多個 Connection 同時進行資料的載入,可以利用 numPartitions 的設定或者是 partitionColumn, lowerBound, upperBound 這幾個設定去達到 parallel loading,以下:

val df = spark.read.format("com.microsoft.sqlserver.jdbc.spark")
    .option("url", jdbcUrl)
    .option("numPartitions", 20)
    .load(()
val df = spark.read.format("com.microsoft.sqlserver.jdbc.spark")
    .option("url", jdbcUrl)
    .option("dbtable", dbTable)
    .option("partitionColumn", "c1")
    .option("lowerBound", "1")
    .option("upperBound", "100")
    .option("numPartitions", "3")
    .load()

可以直接設定 numPartitions 或是利用 partitionColumn 去定義 Partition 的方法,不過 partitionColumn 的欄位必須要是以下幾種:numeric, date, or timestamp,執行完之後,記得要檢查 numPartitions 是不是真的如同設定的一樣。

df.rdd.getNumPartitions()

近一步可以參考:Partitioning RDBMS data in Spark SQL

其他

在實作的時候,偶爾也會遇到以下的警告,由於使用的資料來源為 MSSQL 猜測可能的原因是因為 Connection 數量不夠 Spark 平行的載入,所以丟出這個警告。

04:29:05.762 INFO  HiveMetaStore - 0: get_database: default
04:29:05.762 INFO  audit - ugi=root  ip=unknown-ip-addr  cmd=get_database: default  
04:29:05.841 INFO  HiveMetaStore - 0: get_table : db=default tbl=test
04:29:05.841 INFO  audit - ugi=root  ip=unknown-ip-addr  cmd=get_table : db=default tbl=test  
04:29:05.913 INFO  HiveMetaStore - 0: get_database: default
04:29:05.913 INFO  audit - ugi=root  ip=unknown-ip-addr  cmd=get_database: default  
04:29:05.987 INFO  HiveMetaStore - 0: get_table : db=default tbl=test
04:29:05.987 INFO  audit - ugi=root  ip=unknown-ip-addr  cmd=get_table : db=default tbl=test  
04:29:06.048 INFO  HiveMetaStore - 0: get_table : db=default tbl=test
04:29:06.048 INFO  audit - ugi=root  ip=unknown-ip-addr  cmd=get_table : db=default tbl=test  
04:29:06.107 INFO  HiveMetaStore - 0: get_database: default
04:29:06.107 INFO  audit - ugi=root  ip=unknown-ip-addr  cmd=get_database: default  
04:29:06.184 INFO  HiveMetaStore - 0: get_database: default
04:29:06.184 INFO  audit - ugi=root  ip=unknown-ip-addr  cmd=get_database: default  
04:36:09.210 WARN  TaskSetManager - Lost task 17.0 in stage 25.0 (TID 589) (192.168.14.238 executor 229): com.microsoft.sqlserver.jdbc.SQLServerException: 查詢處理器無法為平行查詢的執行啟動必要的執行緒資源。
  at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:259)
  at com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:6388)
  at com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1647)
  at com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:988)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:344)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
  at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$10(ShuffleExchangeExec.scala:369)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
  at org.apache.spark.scheduler.Task.run(Task.scala:136)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)

[Stage 25:======================================================> (38 + 1) / 39]