[BigData] 大數據中的 Join

Join 是一個在關聯性資料庫裡面很常使用的一個運算元,在大數據資料庫慢慢普及的今天,Join 還是一個幫助我們了解資料關係不可或缺的角色,今天想要討論的是在 Spark 裡面 Join 背後執行的運算原理,筆者在執行 Spark 工作的時候,有時候需要優化資料的運算過程以降低運算所需要的時間,本篇的資料來源可以參考連結,另外筆者也很建議大家觀看以下這一個 Youtube 影片。

我們利用 Python(關於使用 Python 可以參考這一篇)舉一個簡單的例子展示如何可以知道背景的 Join 使用的是哪一個,假設我們分別有城市與國家的檔案,他們的內容如下面展示:

city = sqlContext.read.option("multiline", "true").json('city.json')
country = sqlContext.read.option("multiline", "true").json('country.json')
city.show()
country.show()
city.join(country, city.country == country.countryId).show()

輸出得到:

+---------+------+-------+-------------+
|     city|cityId|country|population(M)|
+---------+------+-------+-------------+
|Kaohsiung|     1|      1|        280.0|
|   Taipei|     2|      1|        269.0|
|   France|     3|      2|        214.0|
|     Lyon|     4|      2|         48.4|
+---------+------+-------+-------------+

+---------+-------+---------+
|continent|country|countryId|
+---------+-------+---------+
|        2| Taiwan|        1|
|        1| France|        2|
+---------+-------+---------+

+---------+------+-------+-------------+---------+-------+---------+
|     city|cityId|country|population(M)|continent|country|countryId|
+---------+------+-------+-------------+---------+-------+---------+
|Kaohsiung|     1|      1|        280.0|        2| Taiwan|        1|
|   Taipei|     2|      1|        269.0|        2| Taiwan|        1|
|   France|     3|      2|        214.0|        1| France|        2|
|     Lyon|     4|      2|         48.4|        1| France|        2|
+---------+------+-------+-------------+---------+-------+---------+
使用 explain 探查 Join 型態:

想要知道究竟 Join 使用的是哪一種 Join 的背後機制,可以使用下面的函式 .explain()。

city.join(country, city.country == country.countryId).explain()

得到以下輸出資訊,我們知道這是使用的是 BroadcastHashJoin,主要的運作邏輯是將 country 的資料 broadcast 去跟 city 做 join。

== Physical Plan ==
*(2) BroadcastHashJoin [country$2L], [countryId$27L], Inner, BuildRight
:- *(2) Project [city$0, cityId$1L, country$2L, population(M)$3]
:  +- *(2) Filter isnotnull(country$2L)
:     +- *(2) FileScan json [city$0,cityId$1L,country$2L,population(M)$3] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/.../city.json], PartitionFilters: [], PushedFilters: [IsNotNull(country)], ReadSchema: struct<city:string,cityId:bigint,country:bigint,population(M):double>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[2, bigint, true]))
   +- *(1) Project [continent$25L, country$26, countryId$27L]
      +- *(1) Filter isnotnull(countryId$27L)
         +- *(1) FileScan json [continent$25L,country$26,countryId$27L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/.../country.json], PartitionFilters: [], PushedFilters: [IsNotNull(countryId)], ReadSchema: struct<continent:bigint,country:string,countryId:bigint>

 

Youtube 影片介紹的 Join 方法:
Basic Joins: Special Cases:
  • Shuffle Hash Join
  • Broadcast Hash Join
  • Cartesian Join
  • Theta Join
  • One-to-Many Join

最常用的兩種 Join 方式是 Shuffle Hash Join 跟 Broadcast Hash Join,以下的內容主要擷取自 Youtube 的內容:

Shuffle Hash Join:

工作原理是傳統的 Map Reduce 方法,首先先根據 join on 的 key 去把兩個表格的內容丟到不同的 Worker Node 上面如下圖(擷取自 Youtube 影片)所示,如此一來 join 的工作就可以根據 join key 的數量進行平行運算!

Shuffle Hash Join 的效能主要受制於以下兩個要素, 

  1. 如果檔案沒有平均分配到不同的 key 上面 -> Uneven Sharding
  2. 是否擁有適當的 key 的數量來進行平行化 -> Limited Parallelism
Broadcast Hash Join

如同上面 explain 的例子,當 Broadcast Hash Join 在執行的時候,他比較適合執行在其中一個 DataFrame 沒有那麼大的時候,他的運作邏輯是將 Small DataFrame 廣播道不同的 Partition 裡面,所以他的平行化數量是 Large DataFrame 的 Partition 數量。

Cartesian Join (笛卡兒直積)

假設處理的兩個表格分別是 10000 行與 1000 行,使用 Cartesian Join 會產生 10000 x 1000 行的 row ,主要應該是要用來處理一些 UDF 的計算,例如 Linear Regression 或是 avg, max 等等的計算!

其他的 Join 

最後其實還有很多 Join 是會被呼叫的,例如我們改編一下之前的的程式碼如:

city.join(country, city.country > country.countryId).explain()

結果會得到 BroadcastNestedLoopJoin

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner, (country$2L > countryId$27L)
:- *(1) Project [city$0, cityId$1L, country$2L, population(M)$3]
:  +- *(1) Filter isnotnull(country$2L)
:     +- *(1) FileScan json [city$0,cityId$1L,country$2L,population(M)$3] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/.../city.json], PartitionFilters: [], PushedFilters: [IsNotNull(country)], ReadSchema: struct<city:string,cityId:bigint,country:bigint,population(M):double>
+- BroadcastExchange IdentityBroadcastMode
   +- *(2) Project [continent$25L, country$26, countryId$27L]
      +- *(2) Filter isnotnull(countryId$27L)
         +- *(2) FileScan json [continent$25L,country$26,countryId$27L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/.../country.json], PartitionFilters: [], PushedFilters: [IsNotNull(countryId)], ReadSchema: struct<continent:bigint,country:string,countryId:bigint>