[Hadoop] Hdfs Data Integrity with Checksum

使用 Spark/Hadoop 生態系這麼久之後,最近才開始來研究 Hadoop 的 checksum 機制是怎麼運作的?

使用 hdfs dfs 指令手動生成 Checksum 參考指令

hdfs dfs -checksum /tmp/hadoop-common-2.7.3.2.6.3.0-SNAPSHOT.jar
/tmp/hadoop-common-2.7.3.2.6.3.0-SNAPSHOT.jar MD5-of-0MD5-of-512CRC32C  000002000000000000000000c16859d1d071c6b1ffc9c8557d4909f1

直接使用 ChecksumFileSystem

在實作 Checksum 可以直接使用 ChecksumFileSystem 這個類別,由於這是一個 abstract class 所以沒有辦法直接新增一個 ChecksumFileSystem,一定要使用繼承,例如下面檸檬爸提供一個用 Scala 寫的範例。

class NewFileSystem(fs: FileSystem) extends ChecksumFileSystem(fs) {
  val name = "NewFileSystem"
}

object SparkUtil {
  def getHadoopFileSystem(path: String, conf: Configuration): NewFileSystem = {
    val fs = FileSystem.get(new URI(path), conf)
    new NewFileSystem(fs)
  }
}

此時可以直接使用 ChecksumFileSystem 裡面的 copyFromLocal 函式在上傳檔案的時候就可以一併產生循環檢查碼 crc (Cyclic Redundancy Check) 與上傳,檸檬爸實測的結果是 ChecksumFileSystem 可以自動生成一個 CRC32C 的。值得注意的是 ChecksumFileSystem 只能夠產生 .crc 的檢查碼,並無法更換產生檢查碼的演算法。

備註:關於 CRC32C 的演算法

使用 FileSystem 原生的函式 getFileChecksum

在使用 getFileChecksum 的時候可以利用類似以下的程式碼直接產生一個 MD5MD5CRC32CastagnoliFileChecksum 的檢查碼,比較前面的 CRC32C 多了 MD5 的編碼。

val dfs = "hdfs://localhost:9000/test/test.txt"
val fs = getHadoopFileSystem(dst, hadoopConf)
val checksum = fs.getFileChecksum(new Path(dst))

但是這邊還是不能夠更換成其他的 Hash 演算法例如 SHA256。備註:這邊的 FileSystem 要是 DistributedFileSystem 才能夠成功產生出我們要的 Checksum。參考網址

嘗試更新組態 dfs.checksum.type 到 SHA256

參考網址的作法,嘗試更新 conf (dfs.checksum.type) 成 SHA256,預設是 CRC32C,但是還是無法產生 SHA256 的檢查碼。