[BigData] 大數據中的 Join
Join 是一個在關聯性資料庫裡面很常使用的一個運算元,在大數據資料庫慢慢普及的今天,Join 還是一個幫助我們了解資料關係不可或缺的角色,今天想要討論的是在 Spark 裡面 Join 背後執行的運算原理,筆者在執行 Spark 工作的時候,有時候需要優化資料的運算過程以降低運算所需要的時間,本篇的資料來源可以參考連結,另外筆者也很建議大家觀看以下這一個 Youtube 影片。
我們利用 Python(關於使用 Python 可以參考這一篇)舉一個簡單的例子展示如何可以知道背景的 Join 使用的是哪一個,假設我們分別有城市與國家的檔案,他們的內容如下面展示:
city = sqlContext.read.option("multiline", "true").json('city.json')
country = sqlContext.read.option("multiline", "true").json('country.json')
city.show()
country.show()
city.join(country, city.country == country.countryId).show()
輸出得到:
+---------+------+-------+-------------+
| city|cityId|country|population(M)|
+---------+------+-------+-------------+
|Kaohsiung| 1| 1| 280.0|
| Taipei| 2| 1| 269.0|
| France| 3| 2| 214.0|
| Lyon| 4| 2| 48.4|
+---------+------+-------+-------------+
+---------+-------+---------+
|continent|country|countryId|
+---------+-------+---------+
| 2| Taiwan| 1|
| 1| France| 2|
+---------+-------+---------+
+---------+------+-------+-------------+---------+-------+---------+
| city|cityId|country|population(M)|continent|country|countryId|
+---------+------+-------+-------------+---------+-------+---------+
|Kaohsiung| 1| 1| 280.0| 2| Taiwan| 1|
| Taipei| 2| 1| 269.0| 2| Taiwan| 1|
| France| 3| 2| 214.0| 1| France| 2|
| Lyon| 4| 2| 48.4| 1| France| 2|
+---------+------+-------+-------------+---------+-------+---------+
使用 explain 探查 Join 型態:
想要知道究竟 Join 使用的是哪一種 Join 的背後機制,可以使用下面的函式 .explain()。
city.join(country, city.country == country.countryId).explain()
得到以下輸出資訊,我們知道這是使用的是 BroadcastHashJoin,主要的運作邏輯是將 country 的資料 broadcast 去跟 city 做 join。
== Physical Plan ==
*(2) BroadcastHashJoin [country$2L], [countryId$27L], Inner, BuildRight
:- *(2) Project [city$0, cityId$1L, country$2L, population(M)$3]
: +- *(2) Filter isnotnull(country$2L)
: +- *(2) FileScan json [city$0,cityId$1L,country$2L,population(M)$3] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/.../city.json], PartitionFilters: [], PushedFilters: [IsNotNull(country)], ReadSchema: struct<city:string,cityId:bigint,country:bigint,population(M):double>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[2, bigint, true]))
+- *(1) Project [continent$25L, country$26, countryId$27L]
+- *(1) Filter isnotnull(countryId$27L)
+- *(1) FileScan json [continent$25L,country$26,countryId$27L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/.../country.json], PartitionFilters: [], PushedFilters: [IsNotNull(countryId)], ReadSchema: struct<continent:bigint,country:string,countryId:bigint>
Youtube 影片介紹的 Join 方法:
Basic Joins: | Special Cases: |
|
|
最常用的兩種 Join 方式是 Shuffle Hash Join 跟 Broadcast Hash Join,以下的內容主要擷取自 Youtube 的內容:
Shuffle Hash Join:
工作原理是傳統的 Map Reduce 方法,首先先根據 join on 的 key 去把兩個表格的內容丟到不同的 Worker Node 上面如下圖(擷取自 Youtube 影片)所示,如此一來 join 的工作就可以根據 join key 的數量進行平行運算!
Shuffle Hash Join 的效能主要受制於以下兩個要素,
- 如果檔案沒有平均分配到不同的 key 上面 -> Uneven Sharding
- 是否擁有適當的 key 的數量來進行平行化 -> Limited Parallelism
Broadcast Hash Join
如同上面 explain 的例子,當 Broadcast Hash Join 在執行的時候,他比較適合執行在其中一個 DataFrame 沒有那麼大的時候,他的運作邏輯是將 Small DataFrame 廣播道不同的 Partition 裡面,所以他的平行化數量是 Large DataFrame 的 Partition 數量。
Cartesian Join (笛卡兒直積)
假設處理的兩個表格分別是 10000 行與 1000 行,使用 Cartesian Join 會產生 10000 x 1000 行的 row ,主要應該是要用來處理一些 UDF 的計算,例如 Linear Regression 或是 avg, max 等等的計算!
其他的 Join
最後其實還有很多 Join 是會被呼叫的,例如我們改編一下之前的的程式碼如:
city.join(country, city.country > country.countryId).explain()
結果會得到 BroadcastNestedLoopJoin
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner, (country$2L > countryId$27L)
:- *(1) Project [city$0, cityId$1L, country$2L, population(M)$3]
: +- *(1) Filter isnotnull(country$2L)
: +- *(1) FileScan json [city$0,cityId$1L,country$2L,population(M)$3] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/.../city.json], PartitionFilters: [], PushedFilters: [IsNotNull(country)], ReadSchema: struct<city:string,cityId:bigint,country:bigint,population(M):double>
+- BroadcastExchange IdentityBroadcastMode
+- *(2) Project [continent$25L, country$26, countryId$27L]
+- *(2) Filter isnotnull(countryId$27L)
+- *(2) FileScan json [continent$25L,country$26,countryId$27L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/.../country.json], PartitionFilters: [], PushedFilters: [IsNotNull(countryId)], ReadSchema: struct<continent:bigint,country:string,countryId:bigint>
One thought on “[BigData] 大數據中的 Join”