[Spark] Row: getList() 的隱藏錯誤
在開發 Java Spark 的時候,常常會需要轉換 Dataset 或是 DataFrame,對於比較大的表格格式變換 (Schema Change),通常會使用到 JavaRDD 與 Row,開發時用到比較複雜的資料結構像是 List 或是 Map 等等的時候,有時候發生錯誤並不知道要如何除錯?本篇想要展示類別 Row: getList() 的隱藏錯誤在使用的時候發生無法理解的 NullPointerException 現象並且其解決的方法!
以下我們利用 Java + Spark 展示一個例子,假設我們要處理的資料用 Json 的格式描述如下:
[
{
"category": "airport",
"type": "title"
},
{
"category": "airport",
"type": "vocabulary",
"uids": ["1","2","3","4","5","6","7","8","9","10","11","12","16","17","18"]
},
{
"category": "airport",
"type": "alphabet"
},
{
"category": "airport",
"type": "sentence",
"uids": ["13","14","15"]
}
]
利用 SparkSession (spark-core_2.11) 讀檔案並且 show 出來的資料呈現如下:
SparkSession sparkSession = SparkSession.builder().master("local")
.appName("Spark Session Example").getOrCreate();
Dataset<Row> input = sparkSession.read().json("file.json");
input.printSchema();
input.show();
+--------+----------+--------------------+
|category| type| uids|
+--------+----------+--------------------+
| airport| title| null|
| airport|vocabulary|[1, 2, 3, 4, 5, 6...|
| airport| alphabet| null|
| airport| sentence| [13, 14, 15]|
+--------+----------+--------------------+
假設我們想要對 uids 這一個 Array 做修改,此時我們會需要用到 MapFunction 的函式,假設我們只想要取 Array 裡面的第一與第二個值,此時 MapFunction 會類似以下所示的程式碼:
StructType outputType = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("category", DataTypes.StringType, true),
DataTypes.createStructField("type", DataTypes.StringType, true),
DataTypes.createStructField("uids", DataTypes.createArrayType(DataTypes.StringType), true)
));
Dataset<Row> out = in.map((MapFunction<Row, Row>) (Row rowInput) -> {
List<Object> output = new ArrayList<>();
output.add(rowInput.getAs("category"));
output.add(rowInput.getAs("type"));
List<String> uids = rowInput.getList(rowInput.fieldIndex("uids"));
if (uids == null){
output.add(null);
} else {
output.add(uids.subList(0,1).toArray());
}
return RowFactory.create(output.toArray());
}, RowEncoder.apply(outputType));
但是執行的時候發生以下 NullPointerException 的錯誤訊息!
Caused by: java.lang.NullPointerException
at scala.collection.convert.Wrappers$IterableWrapperTrait$class.size(Wrappers.scala:24)
at scala.collection.convert.Wrappers$SeqWrapper.size(Wrappers.scala:65)
at java.util.SubList.<init>(AbstractList.java:621)
at java.util.AbstractList.subList(AbstractList.java:484)
如果詳細去查看 getList() 的 Spark Row 的函式說明,會發現 getList 函式會回傳 List<T> 以上的使用方法並沒有邏輯上的錯誤,即便我們有檢查 uids 是否為 null 但是還是發生 NullPointerException。此時使用 scala.collection.mutable.WrappedArray 並將程式稍微修改如以下即可以使用!
WrappedArray<String> uids = rowInput.getAs("uids");
if (uids == null){
output.add(null);
} else {
output.add(Arrays.copyOfRange((String[]) uids.array(), 0, 2));
}
備註:也可以使用 JavaConversions.SeqAsJavaList 等等其他的函式幫忙將 uids 轉換成 List<String>:
List<String> uids = JavaConversions.seqAsJavaList(uidsArray.toSeq());
備註:其他 Spark 常見的問題也可以參考連結!