You need to enable JavaScript to run this app.
导航

LAS FS SDK

最近更新时间2023.09.01 17:42:24

首次发布时间2022.09.30 16:46:51

1. LasFS 目录布局

1.1 主账号

1.1.1 目录结构



1.1.2 目录权限

路径权限备注
lasfs:/*读权限
lasfs:/user/*读权限
lasfs:/public/*读写权限
lasfs:/shared-public/*读权限
lasfs:/user/${userId}/*读权限
lasfs:/user/${userId}/shared/*读权限
lasfs:/user/${userId}/private/*读写权限

1.2 子账号

1.2.1 目录结构



1.2.2 目录权限

路径权限备注
lasfs:/*读权限
lasfs:/private/*读写权限
lasfs:/shared/*读权限
lasfs:/public/*读权限引用主账号的 lasfs:/public/ 目录
lasfs:/shared-public/*读权限引用主账号的 lasfs:/shared-public/ 目录

1.3 LasFS 路径访问方式

1.3.1 主账号访问 LasFS

  • 访问主账号 public 目录,路径信息如下:

    lasfs:/public

  • 访问其子账号目录,比如访问 userId1 的 /private/dataxxx 目录,路径信息如下:

    lasfs:/user/userId1/private/dataxxx

1.3.2 子账号访问 LasFS

  • 以子账号访问其私有目录 /private/dataxxx 为例,路径信息如下:

    lasfs:/private/dataxxx

2. 获取 SDK

2.1 Maven 仓库

<repositories>
    <repository>
        <id>bytedance-public</id>
        <name>bytedance maven</name>
        <url>https://artifact.bytedance.com/repository/releases</url>
    </repository>
</repositories>

2.2 LasFS SDK 依赖

<dependency>
    <groupId>com.volcengine.las</groupId>
    <artifactId>las-fs-tunnel-sdk-shaded</artifactId>
    <version>0.0.3.3-RELEASE</version>
</dependency>

2.3 Hadoop 依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.7</version>                
</dependency>
3. 配置 SDK

3.1 环境信息

环境fs.lasfs.endpointfs.lasfs.service.region
公网访问las-fs-tunnel-cn-beijing.volces.com:80cn-beijing
SparkJar 作业访问100.96.4.84:80cn-beijing

3.2 基础环境配置(任选其一)

3.2.1 Hadoop 配置文件(core-site.xml)

<property>
<!--default scheme, 可选项-->
   <name>fs.defaultFS</name>
   <value>lasfs:/</value>
</property>
<property>
   <name>fs.lasfs.impl</name>
   <value>com.volcengine.las.fs.LasFileSystem</value>
</property>
<property>
   <name>fs.lasfs.endpoint</name>
   <!--lasfs endpoint -->
   <value>las-fs-tunnel-cn-beijing.volces.com:80</value>
</property>
<property>
   <name>fs.lasfs.service.region</name>
   <!-- default: cn-beijing -->
   <value>cn-beijing</value>
</property>

3.2.2 Configuration 代码配置

Configuration conf = new Configuration();
//可选项
conf.set("fs.defaultFS", "lasfs:/");

conf.set("fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem");
conf.set("fs.lasfs.endpoint", "las-fs-tunnel-cn-beijing.volces.com:80");

3.3 鉴权配置(任选其一)

3.3.1 Hadoop 配置文件(core-site.xml)

3.3.1.1 AKSK 鉴权配置

<!-- AkSK 鉴权方式需要以下两个配置-->
<property>
   <name>fs.lasfs.access.key</name>
   <value>{ACCESS_KEY}</value>
</property>
<property>
   <name>fs.lasfs.secret.key</name>
   <value>{SECRET_KEY}</value>
</property>

3.3.1.2 AssumeRole 鉴权配置

<!--assumeRole 鉴权方式,需要同时添加以下配置-->
<property>
   <name>fs.lasfs.access.key</name>
   <value>{ACCESS_KEY}</value>
</property>
<property>
   <name>fs.lasfs.secret.key</name>
   <value>{SECRET_KEY}</value>
</property>
<property>
   <name>fs.lasfs.session.token</name>
   <value>{SECURITY_TOKEN}</value>
</property>
<property>
   <name>fs.lasfs.identity.id</name>
   <value>{IDENTITY_ID}</value>
</property>
<property>
   <name>fs.lasfs.identity.type</name>
      <!--Account or User--> 
   <value>{IDENTITY_TYPE}</value>
</property>

3.3.2 Configuration 代码配置

3.3.2.1 AKSK 代码配置

Configuration conf = new Configuration();
//AKSK 鉴权方式只需以下两个配置
conf.set("fs.lasfs.access.key", {ACCESS_KEY});
conf.set("fs.lasfs.secret.key", {SECRET_KEY});

3.3.2.2 AssumeRole 代码配置

Configuration conf = new Configuration();
//assumeRole 鉴权方式,需要同时添加以下配置
//assumeRole 后获取的临时 AK
conf.set("fs.lasfs.access.key", {ACCESS_KEY});
//assumeRole 后获取的临时 SK
conf.set("fs.lasfs.secret.key", {SECRET_KEY});
//assumeRole 后获取的临时 SK
conf.set("fs.lasfs.session.token", {SECURITY_TOKEN});
//实际访问的账号 ID(主账号或子账号 ID,用于识别用户主体身份)
conf.set("fs.lasfs.identity.id", {IDENTITY_ID});
//实际访问的账号类型(主账号:Account, 子账号:User)
conf.set("fs.lasfs.identity.type", {IDENTITY_TYPE});
4. 使用 SDK

4.1 创建目录

import  org.apache.hadoop.conf.Configuration;
import  org.apache.hadoop.fs.FileSystem;
import  org.apache.hadoop.fs.Path;

import  java.io.IOException;

public class MkdirsExample {
  public static void main(String[] args) throws  IOException  {

    Configuration conf = new Configuration();
    // 环境配置
    conf.set("fs.defaultFS", "lasfs:/");
    conf.set("fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem");
    // 公网访问
    conf.set("fs.lasfs.endpoint", "las-fs-tunnel-cn-beijing.volces.com:80");

    // AKSK 鉴权配置
    conf.set("fs.lasfs.access.key", "accessKey");
    conf.set("fs.lasfs.secret.key", "secretKey");

    FileSystem lasFs = FileSystem.get(conf);
   
    // 主账号路径
    Path mkDirPath = new Path("/public/test");
    // 子账号路径
    // Path  mkDirPath = new  Path("/private/test");
    
    boolean mkdirResult = lasFs.mkdirs(mkDirPath);
    System.out.println("mkdirs result:" + mkdirResult);
    lasFs.close();
  }
}

4.2 查看目录

import  org.apache.hadoop.conf.Configuration;
import  org.apache.hadoop.fs.FileStatus;
import  org.apache.hadoop.fs.FileSystem;
import  org.apache.hadoop.fs.Path;

import  java.io.IOException;

public class ListStatusExample {

  public static void main(String[] args) throws  IOException  {

    Configuration conf = new Configuration();
    // 环境配置
    conf.set("fs.defaultFS", "lasfs:/");
    conf.set("fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem");
    // 公网访问
    conf.set("fs.lasfs.endpoint", "las-fs-tunnel-cn-beijing.volces.com:80");
    
    // AKSK 鉴权配置
    conf.set("fs.lasfs.access.key", "accessKey");
    conf.set("fs.lasfs.secret.key", "secertKey");
 
    FileSystem lasFs = FileSystem.get(conf);
     // 主账号路径
    Path listPath = new Path("/public/test");
    // 子账号路径
    //Path listPath = new  Path("/private/test");
   
    
    FileStatus[] listStatus = lasFs.listStatus(listPath);
    for (FileStatus  status : listStatus) {
      System.out.println("path: " + status.getPath().toString());
      System.out.println("owner: " + status.getOwner());
      System.out.println("group: " + status.getGroup());
      System.out.println("modifyTime: " + status.getModificationTime());
      System.out.println("length: " + status.getLen());
      System.out.println("blockSize: " + status.getBlockSize());
      System.out.println("replication: " + status.getReplication());
      System.out.println("permission: " + status.getPermission());
      System.out.println("isDir: " + status.isDirectory());
    }
    
    lasFs.close();
  }

4.3 删除文件

import  org.apache.hadoop.conf.Configuration;
import  org.apache.hadoop.fs.FileSystem;
import  org.apache.hadoop.fs.Path;

import  java.io.IOException;

public class DeleteExample {
  public static void main(String[] args) throws  IOException  {

    Configuration conf = new Configuration();
    // 环境配置
    conf.set("fs.defaultFS", "lasfs:/");
    conf.set("fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem");
    // 公网访问
    conf.set("fs.lasfs.endpoint", "las-fs-tunnel-cn-beijing.volces.com:80");

    // AKSK 鉴权配置
    conf.set("fs.lasfs.access.key", "accessKey");
    conf.set("fs.lasfs.secret.key", "secretKey");

    FileSystem lasFs = FileSystem.get(conf)
    // 主账号路径
    Path deletePath = new Path("/public/test");
    // 子账号路径
    //Path deletePath = new  Path("/private/test");
    
    
    if (lasFs.exists(deletePath)) {
      boolean deleteSuccess = lasFs.delete(deletePath, true);
      System.out.println("delete result:" + deleteSuccess);
    } else {
      System.out.println("path not exit:" + deletePath);
    }
    lasFs.close();
  }
}

4.4 读写文件

import  org.apache.hadoop.conf.Configuration;
import  org.apache.hadoop.fs.FSDataInputStream;
import  org.apache.hadoop.fs.FSDataOutputStream;
import  org.apache.hadoop.fs.FileSystem;
import  org.apache.hadoop.fs.Path;
import  org.apache.hadoop.io.IOUtils;

import  java.io.*;
import  java.nio.charset.StandardCharsets;

public class ReadWriteExample {
  public static void main(String[] args) throws  IOException, InterruptedException  {
    Configuration conf = new Configuration();
    // 环境配置
    conf.set("fs.defaultFS", "lasfs:/");
    conf.set("fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem");
    // 公网访问
    conf.set("fs.lasfs.endpoint", "las-fs-tunnel-cn-beijing.volces.com:80");

    // AKSK 鉴权配置
    conf.set("fs.lasfs.access.key", "accessKey");
    conf.set("fs.lasfs.secret.key", "secretKey");


    FileSystem lasFs = FileSystem.get(conf);
    String writeContent = "LasFs read write test";
    // 创建文件流
    InputStream in = 
      new BufferedInputStream(new ByteArrayInputStream(writeContent.getBytes(StandardCharsets.UTF_8)));

    // 主账号路径
    Path testPath = new Path("/public/test/download_data.txt");
    // 子账号路径
    // Path testPath = new  Path("/private/test/download_data.txt");
    
    // 在 lasfs 上创建文件
    FSDataOutputStream out = lasFs.create(testPath, true);
    // 利用 IOUtils.copyBytes 进行写入
        IOUtils.copyBytes(in, out, 1024, true);
    System.out.println("write data into LasFs path:  " + testPath);
    
    // 读取已写入文件系统的文件
    if (lasFs.exists(testPath)) {
      // 创建文件流
      FSDataInputStream inputStream = lasFs.open(testPath);
      OutputStream outStream = System.out;
      IOUtils.copyBytes(inputStream, outStream, 1024, true);
    } else {
      System.out.println("path not exit:" + testPath);
    }
    lasFs.close();
  }
}

4.5 上传文件

import  org.apache.hadoop.conf.Configuration;
import  org.apache.hadoop.fs.FileSystem;
import  org.apache.hadoop.fs.Path;

import  java.io.IOException;

public class UploadExample {
  public static void main(String[] args) throws  IOException  {

    Configuration conf = new Configuration();
    // 环境配置
    conf.set("fs.defaultFS", "lasfs:/");
    conf.set("fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem");
    // 公网访问
    conf.set("fs.lasfs.endpoint",  "las-fs-tunnel-cn-beijing.volces.com:80");

    // AKSK 鉴权配置
    conf.set("fs.lasfs.access.key", "accessKey");
    conf.set("fs.lasfs.secret.key", "secretKey");

    FileSystem lasFs = FileSystem.get(conf);
    // 主账号路径
    Path uploadPath = new Path("/public/test/download_data.txt");
    // 子账号路径
    //Path uploadPath = new  Path("/private/test/download_data.txt");
    
    Path localPath = new Path("/tmp/download_data.txt");
    if (!lasFs.exists(uploadPath)) {
      lasFs.copyFromLocalFile(localPath, uploadPath);
      System.out.println("upload result: " + lasFs.exists(uploadPath));
    } else {
      System.out.println("upload failed,lasFs upload path existed!");
    }

    lasFs.close();
  }

}

4.6 下载文件

import  org.apache.hadoop.conf.Configuration;
import  org.apache.hadoop.fs.FileSystem;
import  org.apache.hadoop.fs.Path;

import  java.io.File;
import  java.io.IOException;

public class DownloadExample {
  public static void main(String[] args) throws  IOException  {

    Configuration conf = new Configuration();
    // 环境配置
    conf.set("fs.defaultFS", "lasfs:/");
    conf.set("fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem");
    // 公网访问
    conf.set("fs.lasfs.endpoint", "las-fs-tunnel-cn-beijing.volces.com:80");

    // AKSK 鉴权配置
    conf.set("fs.lasfs.access.key", "accessKey");
    conf.set("fs.lasfs.secret.key", "secretKey");

    FileSystem lasFs = FileSystem.get(conf);
    // 主账号路径
    Path downloadPath = new Path("/public/test/download_data.txt");
    // 子账号路径
    // Path downloadPath = new  Path("/private/test/download_data.txt");
    
    
    Path localPath = new Path("/tmp/download_data.txt");

    if (!new File(localPath.toString()).exists()) {
      lasFs.copyToLocalFile(downloadPath, localPath);
      System.out.println("download result: " + new File(localPath.toString()).exists());
    } else {
      System.out.println("download failed,local path existed!");
    }

    lasFs.close();
  }
5. Spark 集成 LasFS

5.1 添加 Spark 依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.0.1</version>
     <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <scope>provided</scope>
    <version>3.0.1</version>
</dependency>

5.2 SparkJar 读写 LasFS

import  org.apache.spark.sql.*;

import  java.util.Arrays;
import  java.util.List;

public class SparkDataFrameReadWriteExample {

  public static void main(String[] args) {

    SparkSession sparkSession = SparkSession.builder()
        .master("local")
        .config("spark.hadoop.fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem")
        // 公网访问 LasFS,如果通过 LAS 运行 SparkJar 任务,不需要该项配置
        .config("spark.hadoop.fs.lasfs.endpoint",  "las-fs-tunnel-cn-beijing.volces.com:80")
        // ak/sk 鉴权配置
        .config("spark.hadoop.fs.lasfs.access.key", "accessKey")
        .config("spark.hadoop.fs.lasfs.secret.key", "secretKey")
        .getOrCreate();

    // Encoders for most common types are provided in class Encoders
        Encoder<String> stringEncoder = Encoders.STRING();
    List<String> writeData = Arrays.asList("spark", "hadoop", "hdfs", "yarn", "kafka", "hbase");
    Dataset<String> writeDataset = sparkSession.createDataset(writeData, stringEncoder);
    
    writeDataset.show(10);
    // 主账号路径
    String writeFsPathStr = "lasfs:/public/tmp/test/spark_data.txt";
    // 子账号路径
    // String writeFsPathStr = "lasfs:/private/tmp/test/spark_data.txt";
    writeDataset
        .write()
        .mode(SaveMode.Overwrite)
        .text(writeFsPathStr);
        

        Dataset<Row> readDataSet = sparkSession
        .read()
        .text(writeFsPathStr);
        
    readDataSet.show(10);

    sparkSession.stop();

  }

}