[Hadoop] Hdfs Data Integrity with Checksum

使用 Spark/Hadoop 生態系這麼久之後,最近才開始來研究 Hadoop 的 checksum 機制是怎麼運作的?

使用 hdfs dfs 指令手動生成 Checksum 參考指令

hdfs dfs -checksum /tmp/hadoop-common-2.7.3.2.6.3.0-SNAPSHOT.jar
/tmp/hadoop-common-2.7.3.2.6.3.0-SNAPSHOT.jar MD5-of-0MD5-of-512CRC32C  000002000000000000000000c16859d1d071c6b1ffc9c8557d4909f1

在使用這個指令的時候出現的是 None,可能可以參考的原因,也可能是 abfss 不支援,但是使用 file:/// 還是出現 None,這邊還不是很了解原因。

直接使用 ChecksumFileSystem

在實作 Checksum 可以直接使用 ChecksumFileSystem 這個類別,由於這是一個 abstract class 所以沒有辦法直接新增一個 ChecksumFileSystem,一定要使用繼承,例如下面檸檬爸提供一個用 Scala 寫的範例。

class NewFileSystem(fs: FileSystem) extends ChecksumFileSystem(fs) {
  val name = "NewFileSystem"
}

object SparkUtil {
  def getHadoopFileSystem(path: String, conf: Configuration): NewFileSystem = {
    val fs = FileSystem.get(new URI(path), conf)
    new NewFileSystem(fs)
  }
}

此時可以直接使用 ChecksumFileSystem 裡面的 copyFromLocal 函式在上傳檔案的時候就可以一併產生循環檢查碼 crc (Cyclic Redundancy Check) 與上傳,檸檬爸實測的結果是 ChecksumFileSystem 可以自動生成一個 CRC32C 的。值得注意的是 ChecksumFileSystem 只能夠產生 .crc 的檢查碼,並無法更換產生檢查碼的演算法。

備註:關於 CRC32C 的演算法

使用 FileSystem 原生的函式 getFileChecksum

在使用 getFileChecksum 的時候可以利用類似以下的程式碼直接產生一個 MD5MD5CRC32CastagnoliFileChecksum 的檢查碼,比較前面的 CRC32C 多了 MD5 的編碼。

val dfs = "hdfs://localhost:9000/test/test.txt"
val fs = getHadoopFileSystem(dst, hadoopConf)
val checksum = fs.getFileChecksum(new Path(dst))

但是這邊還是不能夠更換成其他的 Hash 演算法例如 SHA256。備註:這邊的 FileSystem 要是 DistributedFileSystem 才能夠成功產生出我們要的 Checksum。參考網址

嘗試更新組態 dfs.checksum.type 到 SHA256

參考網址的作法,嘗試更新 conf (dfs.checksum.type) 成 SHA256,預設是 CRC32C,但是還是無法產生 SHA256 的檢查碼。

GCP 怎麼做?

hadoop fs -Ddfs.checksum.combine.mode=COMPOSITE_CRC -checksum hdfs:///user/bob/data.bin

GCP 可以用以上的指令直接取得 checksum 但是主要還是 CRC32 跟 MD5,備註:abfss 目前還不支援 -checksum 這個 argument。

Limitations of the ABFS connector

  • File last access time is not tracked.
  • Extended attributes are not supported.
  • File Checksums are not supported.
  • The Syncable interfaces hsync() and hflush() operations are supported if fs.azure.enable.flush is set to true (default=true). With the Wasb connector, this limited the number of times either call could be made to 50,000 HADOOP-15478. If abfs has the a similar limit, then excessive use of sync/flush may cause problems.