在运用 HDFS API 开展开发操作以前,一定要对 HDFS 予以初始化,HDFS 的初始化通常存在两种形式:
在此,我们仅介绍第一种方式:
Configuration conf = new Configuration();//初始化conf变量 private void init() throws IOException { conf = new Configuration(); //假设 以下两个配置文件均以拷贝到conf下 conf.addResource("conf/hdfs-site.xml"); conf.addResource("conf/core-site.xml"); fSystem = FileSystem.get(conf); }
初始化完成 HDFS 文件系统之后,我们便能够运用 HDFS 所提供的 API 来开展相应的开发工作。
主要会用到 hadoop-common、hadoop-hdfs、hadoop-client 这三个依赖包。
上传文件示例。
private static void uploadTest() throws Exception{ //其中local_src.txt是本地文件,hdfs_dst.txt是上传到hdfs后的文件 fSystem.copyFromLocalFile(new Path("local_src.txt"), new Path("hdfs_dst.txt")); fSystem.close(); }
下载文件示例。
private static void downloadTest() throws Exception{ //其中hdfs_src.txt是hdfs上的文件,local_dst.txt是从hdfs上下载的文件 fSystem.copyToLocalFile(new Path("hdfs_src.txt"), new Path("local_dst.txt")); fSystem.close(); }
上传和下载的 API 的底层封装实际上就是:FileUtil.copy(....)
。
文件流上传下载。
另外,我们还能够通过文件流 Stream
方式来上传和下载文件:
private static void uploadTest() throws Exception{ //local.gz是本地文件系统上的文件,hdfs.gz是hdfs上的文件 InputStream in = new FileInputStream(new File("local.gz")); FSDataOutputStream out = fSystem.create(new Path("/hdfs.gz")); IOUtils.copyBytes(in, out, 4096, true); fSystem.close(); }
private static void downloadTest() throws Exception{ //其中hdfs.gz是hdfs上的文件,local.gz是下载到本地文件系统中的文件 FSDataInputStream in = fs.open(new Path("/hdfs.gz")); OutputStream out = new FileOutputStream(new File("local.gz")); IOUtils.copyBytes(in, out, 4096, true); fSystem.close(); }
创建并且写入文件。
private void createFileAndWrite() throws IOException { //将content中的内容写入到hdfs文件Write.txt中 final String content = "So well!"; FSDataOutputStream out = null; try { out = fSystem.create(new Path("/user/Write.txt")); out.write(content.getBytes()); out.hsync(); } finally { // make sure the stream is closed finally. out.close(); } }
向文件中追加内容。
private void appendFileContents() throws IOException { // 将 content 中的内容追加写入到 hdfs 文件 Write.txt 中 final String content = "hi boy"; FSDataOutputStream out = null; try { out = fSystem.append(new Path("/user/Write.txt")); out.write(content.getBytes()); out.hsync(); } finally { // make sure the stream is closed finally. out.close(); } }
需要注意,要确保待追加的文件 Write.txt
已经存在,并且在此期间没有其他客户端正在向其中写入内容,否则此次追加内容将会失败并抛出异常。
通过调用 FileSystem 实例的 open
方法以获取某个用于读取文件的输入流。接着,利用该输入流读取 HDFS 上的指定文件的内容。最后,调用 close()
方法来关闭输入流。
private void read() throws IOException { //读取文件的内容 String strPath = "/user/Reader.txt"; Path path = new Path(strPath); FSDataInputStream in = null; BufferedReader reader = null; StringBuffer strBuffer = new StringBuffer(); try { in = fSystem.open(path); reader = new BufferedReader(new InputStreamReader(in)); String sTempOneLine; // write file while ((sTempOneLine = reader.readLine()) != null) { strBuffer.append(sTempOneLine); } LOG.info("the content is : " + strBuffer.toString()); } finally { IOUtils.closeStream(reader); IOUtils.closeStream(in); } }
private void deleteFile() throws IOException { //删除以下文件,且不可恢复 Path beDeleted = new Path("/user/fileDelete.txt"); boolean res=fSystem.delete(beDeleted, true); System.out.println(res); }
嵌套创建目录。
private static void testMkDir() throws Exception{ //嵌套创建目录 boolean mkdirs = fSystem.mkdirs(new Path("/user/test1/test2")); System.out.println("mkdirs success!"); }
递归删除目录。
private static void testDel() throws Exception{ //递归的删除目录 if (!fSystem.exists("/user/test1")) {//目录不存在 return false; } boolean delBl = fSystem.delete(new Path("/user/test1"), true); System.out.println(delBl); }
private static void listFilesAndBlocks()throws Exception{ //列出指定的目录下的所有文件,此处以hdfs下的根目录为例 RemoteIterator<LocatedFileStatus> listFiles = fSystem.listFiles(new Path("/"), true); while(listFiles.hasNext()){ LocatedFileStatus file = listFiles.next(); System.out.println(file.getPath()+"\t"); System.out.println(file.getPath().getName()+"\t"); System.out.println(file.getLen()+"\t"); System.out.println(file.getReplication()+"\t"); //获取blcoks相关的信息 BlockLocation[] blockLocations = file.getBlockLocations(); System.out.println(blockLocations.length+"\t"); for(BlockLocation bl : blockLocations){ String[] hosts = bl.getHosts(); System.out.print(hosts[0] + "-" + hosts[1]+"\t"); } } }
如果不想递归查看,fSystem.listFiles
中的参数写 false
。
关于 HDFSAPI
更多的详细介绍,请参见 Apache Hadoop 社区文档。