[BigData] 實作 Spark 將 MSSQL 檔案平行匯出
Spark 2.1.0 之後,JDBC (Java Database Connectivity) 被引入作為一個 Spark 應用的其中一種輸入,在數位轉型的過程中我們很常需要將企業中的資料倉儲匯入資料湖庫,為了省去資料轉換的功夫,最好的方法就是直接將 RDBMS 的資料直接轉進 Datalake 裡面,本篇希望紀錄在 Spark 3.3.0 環境中將 MSSQL 的巨量資料轉移到地端的資料湖庫中的過程與 TroubleShooting。
Loading
關於 Spark 架構中 JDBC 的使用方法,詳細可以參考官網,透過 Spark 的框架首先可以利用以下的 Scala 指令將給定的 jdbc 資料來源載入成一個 JdbcRDD。
val df = spark.read.format("com.microsoft.sqlserver.jdbc.spark")
.option("url", url)
.option("dbtable", dbTable)
.option("username", username)
.option("password", password)
.load()
df.show()
此時可以看到 Spark 只呼叫了一個 executor 起來做資料讀取的工作。
Dependency
要使用 com.microsoft.sqlsever.jdbc.spark 來連結 MSSQL Server 需要引入 spark-mssql-connector 這個 Jar 包,由於這個 Repo 已經 Public Archive 了,感覺只支援到 Spark 3.4 版,Spark 3.5 版之後感覺就沒有支援了,另外由於在公開的 Maven Repo 裡面只看得到 1.3.0_BETA 版本,所以在整合的時候可能要特別注意一下,筆者嘗試在 Spark 3.5 的環境中去使用的時候就會遇到以下的錯誤訊息:
Py4JJavaError: An error occurred while calling o101.load.
: java.sql.SQLException: No suitable driver
at java.sql.DriverManager.getDriver(DriverManager.java:315)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:109)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:109)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Parallel Loading
如果想要利用多個 Connection 同時進行資料的載入,可以利用 numPartitions 的設定或者是 partitionColumn, lowerBound, upperBound 這幾個設定去達到 parallel loading,以下:
val df = spark.read.format("com.microsoft.sqlserver.jdbc.spark")
.option("url", jdbcUrl)
.option("numPartitions", 20)
.load(()
val df = spark.read.format("com.microsoft.sqlserver.jdbc.spark")
.option("url", jdbcUrl)
.option("dbtable", dbTable)
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
可以直接設定 numPartitions 或是利用 partitionColumn 去定義 Partition 的方法,不過 partitionColumn 的欄位必須要是以下幾種:numeric, date, or timestamp,執行完之後,記得要檢查 numPartitions 是不是真的如同設定的一樣。
df.rdd.getNumPartitions()
近一步可以參考:Partitioning RDBMS data in Spark SQL
其他
在實作的時候,偶爾也會遇到以下的警告,由於使用的資料來源為 MSSQL 猜測可能的原因是因為 Connection 數量不夠 Spark 平行的載入,所以丟出這個警告。
04:29:05.762 INFO HiveMetaStore - 0: get_database: default
04:29:05.762 INFO audit - ugi=root ip=unknown-ip-addr cmd=get_database: default
04:29:05.841 INFO HiveMetaStore - 0: get_table : db=default tbl=test
04:29:05.841 INFO audit - ugi=root ip=unknown-ip-addr cmd=get_table : db=default tbl=test
04:29:05.913 INFO HiveMetaStore - 0: get_database: default
04:29:05.913 INFO audit - ugi=root ip=unknown-ip-addr cmd=get_database: default
04:29:05.987 INFO HiveMetaStore - 0: get_table : db=default tbl=test
04:29:05.987 INFO audit - ugi=root ip=unknown-ip-addr cmd=get_table : db=default tbl=test
04:29:06.048 INFO HiveMetaStore - 0: get_table : db=default tbl=test
04:29:06.048 INFO audit - ugi=root ip=unknown-ip-addr cmd=get_table : db=default tbl=test
04:29:06.107 INFO HiveMetaStore - 0: get_database: default
04:29:06.107 INFO audit - ugi=root ip=unknown-ip-addr cmd=get_database: default
04:29:06.184 INFO HiveMetaStore - 0: get_database: default
04:29:06.184 INFO audit - ugi=root ip=unknown-ip-addr cmd=get_database: default
04:36:09.210 WARN TaskSetManager - Lost task 17.0 in stage 25.0 (TID 589) (192.168.14.238 executor 229): com.microsoft.sqlserver.jdbc.SQLServerException: 查詢處理器無法為平行查詢的執行啟動必要的執行緒資源。
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:259)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:6388)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1647)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:988)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:344)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$10(ShuffleExchangeExec.scala:369)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[Stage 25:======================================================> (38 + 1) / 39]