You need to enable JavaScript to run this app.
导航

开发指南

最近更新时间2023.04.03 10:34:22

首次发布时间2022.08.18 16:19:48

1 HDFS 的初始化

在使用 HDFS API 进行开发操作之前,必须要对 HDFS 进行初始化,HDFS 的初始化一般有两种形式:

  1. 直接加载 HDFS 集群中的配置文件,比如:core-site.xml 和 hdfs-site.xml 两个文件;

  2. 使用 conf 对象提供的一些方法去手动加载hdfs集群信息,比如集群名、副本数等。

在此处我们只介绍第一种方式:

Configuration conf = new Configuration();//初始化conf变量
private void init() throws IOException {
        conf = new Configuration();
        //假设 以下两个配置文件均以拷贝到conf下
        conf.addResource("conf/hdfs-site.xml");
        conf.addResource("conf/core-site.xml");
        fSystem = FileSystem.get(conf);
}

初始化完 HDFS 文件系统后,我们就可以使用 HDFS 提供的 API 进行相应的开发。
主要用到 hadoop-common、hadoop-hdfs、hadoop-client 三个依赖包。

2 上传和下载文件

private static void uploadTest() throws Exception{
    //其中local_src.txt是本地文件,hdfs_dst.txt是上传到hdfs后的文件
    fSystem.copyFromLocalFile(new Path("local_src.txt"), new Path("hdfs_dst.txt"));
    fSystem.close();
}

private static void downloadTest() throws Exception{
    //其中hdfs_src.txt是hdfs上的文件,local_dst.txt是从hdfs上下载的文件
    fSystem.copyToLocalFile(new Path("hdfs_src.txt"), new Path("local_dst.txt"));
    fSystem.close();
}

上传和下载的 API 的底层封装其实就是 : FileUtil.copy(....)
当然,我们也可以使用Stream方式上传、下载文件:

private static void uploadTest() throws Exception{
    //local.gz是本地文件系统上的文件,hdfs.gz是hdfs上的文件
    InputStream in = new FileInputStream(new File("local.gz"));
    FSDataOutputStream out = fSystem.create(new Path("/hdfs.gz"));
    IOUtils.copyBytes(in, out, 4096, true);
    fSystem.close();
}

private static void downloadTest() throws Exception{
    //其中hdfs.gz是hdfs上的文件,local.gz是下载到本地文件系统中的文件
    FSDataInputStream in = fs.open(new Path("/hdfs.gz"));
    OutputStream out = new FileOutputStream(new File("local.gz"));
    IOUtils.copyBytes(in, out, 4096, true);
    fSystem.close();
}

3 写文件和追加写文件

private void createFileAndWrite() throws IOException {
    //将content中的内容写入到hdfs文件Write.txt中
    final String content = "So well!";
    FSDataOutputStream out = null;
    try {
        out = fSystem.create(new Path("/user/Write.txt"));
        out.write(content.getBytes());
        out.hsync();
    } finally {
            // make sure the stream is closed finally.
            out.close();
 }
}

private void appendFileContents() throws IOException {
    //将content中的内容追加写入到hdfs文件Write.txt中
    final String content = "hi boy";
    FSDataOutputStream out = null;
    try {
        out = fSystem.append(new Path("/user/Write.txt"));
        out.write(content.getBytes());
        out.hsync();
} finally {
        // make sure the stream is closed finally.
        out.close();
 }
}

如果待追加的文件 Write.txt 已经存在,并且同时没有别的客户端正在写入内容,否则此次追加内容会失败抛出异常。

4 读文件

通过调用 FileSystem 实例的 open 方法获取某个读取文件的输入流。然后使用该输入流读取 HDFS 上的指定文件的内容。最后,调用 close() 关闭输入流。

private void read() throws IOException {
//读取文件的内容
    String strPath = "/user/Reader.txt";
    Path path = new Path(strPath);
    FSDataInputStream in = null;
    BufferedReader reader = null;
    StringBuffer strBuffer = new StringBuffer();
    try {
        in = fSystem.open(path);
        reader = new BufferedReader(new InputStreamReader(in));
        String sTempOneLine;
        // write file
        while ((sTempOneLine = reader.readLine()) != null) {
             strBuffer.append(sTempOneLine);
        }
    LOG.info("the content is : " + strBuffer.toString());

    } finally {
    IOUtils.closeStream(reader);
    IOUtils.closeStream(in);
     }
}

5 删除文件

private void deleteFile() throws IOException {

    //删除以下文件,且不可恢复
    Path beDeleted = new Path("/user/fileDelete.txt");
    boolean res=fSystem.delete(beDeleted, true);
    System.out.println(res);
}

6 嵌套创建目录与递归删除目录

private static void testMkDir() throws Exception{

    //嵌套创建目录
    boolean mkdirs = fSystem.mkdirs(new Path("/user/test1/test2"));
    System.out.println("mkdirs success!");
}

private static void testDel() throws Exception{

    //递归的删除目录
    if (!fSystem.exists("/user/test1")) {//目录不存在
            return false;
    }
    boolean delBl = fSystem.delete(new Path("/user/test1"), true);
    System.out.println(delBl);
}

7 递归查看目录下文件以及块的信息

private static void listFilesAndBlocks()throws Exception{

    //列出指定的目录下的所有文件,此处以hdfs下的根目录为例
    RemoteIterator<LocatedFileStatus> listFiles = fSystem.listFiles(new Path("/"), true);
    while(listFiles.hasNext()){
        LocatedFileStatus file = listFiles.next();
        System.out.println(file.getPath()+"\t");
        System.out.println(file.getPath().getName()+"\t");
        System.out.println(file.getLen()+"\t");
        System.out.println(file.getReplication()+"\t");

        //获取blcoks相关的信息
        BlockLocation[] blockLocations = file.getBlockLocations();
        System.out.println(blockLocations.length+"\t");
        for(BlockLocation bl : blockLocations){
            String[] hosts = bl.getHosts();
            System.out.print(hosts[0] + "-" + hosts[1]+"\t");
         }
   }
}

如果不想递归查看,fSystem.listFiles 中的参数写 false。
关于 HDFS API 更多的详细介绍,请参见Apache Hadoop社区文档。