[Hdfs] 利用 FileSystem 傳資料到 Hdfs?

在開始做大數據的專案的時候通常都會遇到這個問題,要先把資料送到 Hdfs 上之後,資料才有可能被 Spark 等程式使用,一般來說傳送檔案到 Hdfs 可以利用以下方法,EMS/RabbitMQ, Knox Server, FTR,  或是 Kafka 等等,常見的做法是透過 Knox 伺服器,由於 Hdfs 的群集是由多個 NameNode 與多個 DataNode 組成,最直接的做法是產生一個 FileSystem 直接指向 Hdfs 而不是透過 Knox,本篇要呈現如何不透過 Knox 向 Hdfs 傳送資料!

本篇記錄的是利用 core-site.xml 與 hdfs-site.xml 生成遠端的 FileSystem,並且利用 FileSystem 類別已有的函式進行傳送,以下是程式碼:

public FileSystem getDatalakeFileSystem(String confPath, String login, String keyTab){

	String krbConf = confPath + "krb5.conf";
	System.setProperty("java.security.krb5.conf", krbConf);
	System.setProperty("sun.security.krb5.Config", krbConf);
	System.setProperty("user.name", login);
	
	Configuration conf = new Configuration();
	conf.addResource(new Path(confPath + "/core-site.xml");
	conf.addResource(new Path(confPath + "/hdfs-site.xml");
	conf.set("hadoop.security.authentication", "kerberos");
	conf.set("hadoop.security.authorization", "true");
	
	UserGroupInformation.setConfiguration(conf);
	UserGroupInformation.loginUserFromKeytab(login, keyTab);
	
	try{    
	    return FileSystem.get(new URI("swebhdfs://localhost:9000"), conf);
	} catch (URISyntaxException uriException) {
	    throw new IOException(uriException);
	}
}

在生成 FileSystem 介面之後,下列相對應的應用就應運而生了。

  • boolean delete(Path path, boolean b)
  • FileStatus[] listStatus(Path path)
  • FileStatus[] listStatus(Path[] files)
  • void copyFromLocalFile(Path src, Path dst)
  • boolean mkdirs(Path f)
  • boolean exists(Path f)
  • FSDataInputStream open(Path f)

在使用這個功能的時候,最重要的是要提供以下的檔案,core-site.xml 與 hdfs-site.xml 定義了 hdfs 的組成方式,例如要去哪裡找到 NameNodes 與 DataNodes,一般來說 Kerberos 是用來管理 hdfs 的權限,利用 krb5.conf 與 user.keytab 則可以取得 Kerberos 的認證。

  • core-site.xml
  • hdfs-site.xml
  • krb5.conf
  • user.keytab

core-site.xml 與 hdfs-site.xml 的設定可以參考連結

<configuration>
    <property>
    	<name>fs.defaultFS</name>
        <value>swebhdfs://localhost:9000</value>
    </property>
</configuration>
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
    	<value>/opt/hadoop-3.1.2/data/namenode</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/opt/hadoop-3.1.2/data/datanode</value>
    </property>
</configuration>

在開發這個功能的時候,檸檬爸也遇到以下問題:

Exception in thread "main" java.io.IOException: No FileSystem for scheme: swebhdfs
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1375)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1390)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:196)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:95)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:180)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:175)
    at org.apache.mahout.classifier.naivebayes.NaiveBayesModel.materialize(NaiveBayesModel.java:100)
java.lang.IllegalArgumentException: Can't get Kerberos realm
    at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
    at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:263)
    at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:299)
    at utils.ConnectionHandler.connectHiveDB(ConnectionHandler.java:58)

只要照上面提供的程式碼執行就可以解決這兩個問題:)