You need to enable JavaScript to run this app.
导航
开发指南
最近更新时间:2025.04.01 20:13:40首次发布时间:2024.11.12 16:54:08
我的收藏
有用
有用
无用
无用

HDFS 的初始化

在运用 HDFS API 开展开发操作以前,一定要对 HDFS 予以初始化,HDFS 的初始化通常存在两种形式:

  1. 直接加载 HDFS 集群里的配置文件,例如:core-site.xml 以及 hdfs-site.xml 这两个文件;
  2. 运用 conf 对象提供的某些方法去手动加载 HDFS 集群信息,比如集群名、副本数等等。

在此,我们仅介绍第一种方式:

Configuration conf = new Configuration();//初始化conf变量
private void init() throws IOException {
        conf = new Configuration();
        //假设 以下两个配置文件均以拷贝到conf下
        conf.addResource("conf/hdfs-site.xml");
        conf.addResource("conf/core-site.xml");
        fSystem = FileSystem.get(conf);
}

初始化完成 HDFS 文件系统之后,我们便能够运用 HDFS 所提供的 API 来开展相应的开发工作。
主要会用到 hadoop-common、hadoop-hdfs、hadoop-client 这三个依赖包。

上传和下载文件

上传文件示例。

private static void uploadTest() throws Exception{
    //其中local_src.txt是本地文件,hdfs_dst.txt是上传到hdfs后的文件
    fSystem.copyFromLocalFile(new Path("local_src.txt"), new Path("hdfs_dst.txt"));
    fSystem.close();
}

下载文件示例。

private static void downloadTest() throws Exception{
    //其中hdfs_src.txt是hdfs上的文件,local_dst.txt是从hdfs上下载的文件
    fSystem.copyToLocalFile(new Path("hdfs_src.txt"), new Path("local_dst.txt"));
    fSystem.close();
}

上传和下载的 API 的底层封装实际上就是:FileUtil.copy(....)
文件流上传下载。
另外,我们还能够通过文件流 Stream 方式来上传和下载文件:

private static void uploadTest() throws Exception{
    //local.gz是本地文件系统上的文件,hdfs.gz是hdfs上的文件
    InputStream in = new FileInputStream(new File("local.gz"));
    FSDataOutputStream out = fSystem.create(new Path("/hdfs.gz"));
    IOUtils.copyBytes(in, out, 4096, true);
    fSystem.close();
}
private static void downloadTest() throws Exception{
    //其中hdfs.gz是hdfs上的文件,local.gz是下载到本地文件系统中的文件
    FSDataInputStream in = fs.open(new Path("/hdfs.gz"));
    OutputStream out = new FileOutputStream(new File("local.gz"));
    IOUtils.copyBytes(in, out, 4096, true);
    fSystem.close();
}

写文件和追加写文件

创建并且写入文件。

private void createFileAndWrite() throws IOException {
    //将content中的内容写入到hdfs文件Write.txt中
    final String content = "So well!";
    FSDataOutputStream out = null;
    try {
        out = fSystem.create(new Path("/user/Write.txt"));
        out.write(content.getBytes());
        out.hsync();
    } finally {
            // make sure the stream is closed finally.
            out.close();
 }
}

向文件中追加内容。

private void appendFileContents() throws IOException {
    // 将 content 中的内容追加写入到 hdfs 文件 Write.txt 中
    final String content = "hi boy";
    FSDataOutputStream out = null;
    try {
        out = fSystem.append(new Path("/user/Write.txt"));
        out.write(content.getBytes());
        out.hsync();
} finally {
        // make sure the stream is closed finally.
        out.close();
 }
}

需要注意,要确保待追加的文件 Write.txt 已经存在,并且在此期间没有其他客户端正在向其中写入内容,否则此次追加内容将会失败并抛出异常。

读文件

通过调用 FileSystem 实例的 open 方法以获取某个用于读取文件的输入流。接着,利用该输入流读取 HDFS 上的指定文件的内容。最后,调用 close() 方法来关闭输入流。

private void read() throws IOException {
//读取文件的内容
    String strPath = "/user/Reader.txt";
    Path path = new Path(strPath);
    FSDataInputStream in = null;
    BufferedReader reader = null;
    StringBuffer strBuffer = new StringBuffer();
    try {
        in = fSystem.open(path);
        reader = new BufferedReader(new InputStreamReader(in));
        String sTempOneLine;
        // write file
        while ((sTempOneLine = reader.readLine()) != null) {
             strBuffer.append(sTempOneLine);
        }
    LOG.info("the content is : " + strBuffer.toString());

    } finally {
    IOUtils.closeStream(reader);
    IOUtils.closeStream(in);
     }
}

删除文件

private void deleteFile() throws IOException {

    //删除以下文件,且不可恢复
    Path beDeleted = new Path("/user/fileDelete.txt");
    boolean res=fSystem.delete(beDeleted, true);
    System.out.println(res);
}

嵌套创建目录与递归删除目录

嵌套创建目录。

private static void testMkDir() throws Exception{

    //嵌套创建目录
    boolean mkdirs = fSystem.mkdirs(new Path("/user/test1/test2"));
    System.out.println("mkdirs success!");
}

递归删除目录。

private static void testDel() throws Exception{

    //递归的删除目录
    if (!fSystem.exists("/user/test1")) {//目录不存在
            return false;
    }
    boolean delBl = fSystem.delete(new Path("/user/test1"), true);
    System.out.println(delBl);
}

递归查看目录下文件以及块的信息

private static void listFilesAndBlocks()throws Exception{

    //列出指定的目录下的所有文件,此处以hdfs下的根目录为例
    RemoteIterator<LocatedFileStatus> listFiles = fSystem.listFiles(new Path("/"), true);
    while(listFiles.hasNext()){
        LocatedFileStatus file = listFiles.next();
        System.out.println(file.getPath()+"\t");
        System.out.println(file.getPath().getName()+"\t");
        System.out.println(file.getLen()+"\t");
        System.out.println(file.getReplication()+"\t");

        //获取blcoks相关的信息
        BlockLocation[] blockLocations = file.getBlockLocations();
        System.out.println(blockLocations.length+"\t");
        for(BlockLocation bl : blockLocations){
            String[] hosts = bl.getHosts();
            System.out.print(hosts[0] + "-" + hosts[1]+"\t");
         }
   }
}

如果不想递归查看,fSystem.listFiles 中的参数写 false
关于 HDFSAPI 更多的详细介绍,请参见 Apache Hadoop 社区文档