[Spark] Row: getList() 的隱藏錯誤

在開發 Java Spark 的時候,常常會需要轉換 Dataset 或是 DataFrame,對於比較大的表格格式變換 (Schema Change),通常會使用到 JavaRDD 與 Row,開發時用到比較複雜的資料結構像是 List 或是 Map 等等的時候,有時候發生錯誤並不知道要如何除錯?本篇想要展示類別 Row: getList() 的隱藏錯誤在使用的時候發生無法理解的 NullPointerException 現象並且其解決的方法!

以下我們利用 Java + Spark 展示一個例子,假設我們要處理的資料用 Json 的格式描述如下:

[
  {
    "category": "airport",
    "type": "title"
  },
  {
    "category": "airport",
    "type": "vocabulary",
    "uids": ["1","2","3","4","5","6","7","8","9","10","11","12","16","17","18"]
  },
  {
    "category": "airport",
    "type": "alphabet"
  },
  {
    "category": "airport",
    "type": "sentence",
    "uids": ["13","14","15"]
  }
]

利用 SparkSession (spark-core_2.11) 讀檔案並且 show 出來的資料呈現如下:

SparkSession sparkSession = SparkSession.builder().master("local")
                                        .appName("Spark Session Example").getOrCreate();

Dataset<Row> input = sparkSession.read().json("file.json");
input.printSchema(); input.show();
+--------+----------+--------------------+
|category|      type|                uids|
+--------+----------+--------------------+
| airport|     title|                null|
| airport|vocabulary|[1, 2, 3, 4, 5, 6...|
| airport|  alphabet|                null|
| airport|  sentence|        [13, 14, 15]|
+--------+----------+--------------------+

假設我們想要對 uids 這一個 Array 做修改,此時我們會需要用到 MapFunction 的函式,假設我們只想要取 Array 裡面的第一與第二個值,此時 MapFunction 會類似以下所示的程式碼:

StructType outputType = DataTypes.createStructType(Arrays.asList(
    DataTypes.createStructField("category", DataTypes.StringType, true),
    DataTypes.createStructField("type", DataTypes.StringType, true),
    DataTypes.createStructField("uids", DataTypes.createArrayType(DataTypes.StringType), true)
));
Dataset<Row> out = in.map((MapFunction<Row, Row>) (Row rowInput) -> {
    List<Object> output = new ArrayList<>();
    output.add(rowInput.getAs("category"));
    output.add(rowInput.getAs("type"));
    List<String> uids = rowInput.getList(rowInput.fieldIndex("uids"));
    if (uids == null){
        output.add(null);
    } else {
        output.add(uids.subList(0,1).toArray());
    }
    return RowFactory.create(output.toArray());
}, RowEncoder.apply(outputType));

但是執行的時候發生以下 NullPointerException 的錯誤訊息!

Caused by: java.lang.NullPointerException
	at scala.collection.convert.Wrappers$IterableWrapperTrait$class.size(Wrappers.scala:24)
	at scala.collection.convert.Wrappers$SeqWrapper.size(Wrappers.scala:65)
	at java.util.SubList.<init>(AbstractList.java:621)
	at java.util.AbstractList.subList(AbstractList.java:484)

如果詳細去查看 getList() 的 Spark Row 的函式說明,會發現 getList 函式會回傳 List<T> 以上的使用方法並沒有邏輯上的錯誤,即便我們有檢查 uids 是否為 null 但是還是發生 NullPointerException。此時使用 scala.collection.mutable.WrappedArray 並將程式稍微修改如以下即可以使用!

WrappedArray<String> uids = rowInput.getAs("uids");
if (uids == null){
    output.add(null);
} else {
    output.add(Arrays.copyOfRange((String[]) uids.array(), 0, 2));
}

備註:也可以使用 JavaConversions.SeqAsJavaList 等等其他的函式幫忙將 uids 轉換成 List<String>:

List<String> uids = JavaConversions.seqAsJavaList(uidsArray.toSeq());

備註:其他 Spark 常見的問題也可以參考連結