You need to enable JavaScript to run this app.
导航

配置 Flink 访问 CloudFS

最近更新时间2023.08.02 14:36:25

首次发布时间2022.05.26 14:18:32

Flink 是一个面向有限流和无限流有状态计算的分布式计算框架,它能够支持流处理和批处理两种应用类型。
本文介绍如何配置 EMR 中的 Flink 服务使用和访问 CloudFS。

前提条件

  • 开通大数据文件存储服务 CloudFS 并创建文件存储,获取挂载信息。详细操作请参考创建文件存储系统
  • 完成 E-MapReduce 中的集群创建。具体操作,请参见 E-MapReduce 集群创建
  • 准备一个测试文件。

步骤一:配置 CloudFS 服务

说明

集群所有节点都要修改如下配置。

  1. 连接 E-MapReduce 集群,连接方式如下:

    • 使用本地终端 ssh 连接集群节点管理 master 的公网 ip。
    • 使用同区域下的云服务器实例连接集群节点管理 master 的内网 ip。
  2. 下载 CloudFS 的 SDK 包至 E-MapReduce 集群指定存储位置。下载地址:inf.hdfs.cfs_sdk_deploy_1.4.1.tar.gz

  3. 解压后将 SDK 目录下的cloudfs-hadoop-with-dependencies-cfs-1.4.1.jar文件复制到 Hadoop 的/hadoop/hdfs目录下。

    cp {Directory}/cloudfs-hadoop-with-dependencies-cfs-1.4.1.jar /{Directory}/hadoop/hdfs/
    
  4. 配置core-site.xml文件。

    1. 执行以下命令打开 Hadoop 安装目录下的core-site.xml文件:
      vim {hadoop_安装目录}/hadoop/conf/core-site.xml
    2. core-site.xml中添加配置:
<property>
  <name>fs.defaultFS</name>
  <value>cfs://xxxx.cfs-cn-beijing.ivolces.com</value>
<!-- 填写获取的挂载点地址 -->
</property>

<property>
  <name>fs.cfs.impl</name>
  <value>com.volcengine.cloudfs.fs.CfsFileSystem</value>
</property>

<property>
  <name>fs.AbstractFileSystem.cfs.impl</name>
  <value>com.volcengine.cloudfs.fs.CFS</value>
</property>  

<property>
  <name>cfs.access.key</name>
  <value>AKxxxxxxxxxxx</value>
<!-- 填写访问密钥ID -->
</property>
<property>
  <name>cfs.secret.key</name>
  <value>SKxxxxxxxxxxx</value>
<!-- 填写私有访问密钥-->
</property>

<!-- 可选:如果使用的是 STS Token,需要填写 -->
<property>
  <name>cfs.security.token</name>
  <value>STSTokenxxxxxxxx</value>
</property>

<!-- 可选:如果开启缓存加速,需要配置缓存加速接入的 VPC 的网段 -->
<property>
  <name>cfs.client.network.segment</name>
  <value><VPC 网段,例如 192.168.0.0/16></value>
</property>

  1. 将解压后的 SDK 目录下的cloudfs-hadoop-with-dependencies-cfs-1.4.1.jar文件复制到Flink lib文件夹下。

    cp {Directory}/cloudfs-hadoop-with-dependencies-cfs-1.4.1.jar /{Directory}/flink/lib/
    
  2. 重启 Flink 服务。

    1. 登录火山引擎 E-MapReduce 控制台。
    2. 在集群管理页面,找到目标 E-MapReduce 集群。
    3. 单击服务列表找到 Flink 服务。
    4. 单击列表页面右侧重启按钮,选择重启全部组件。重启后需重新连接集群。

说明

需自行上传测试数据。

  1. 准备测试数据。

    hdfs dfs -ls cfs://`{`Directory`}/`{document}
    <!-- 查看上传目录下的文件 -->
    
  2. 执行 Flink 计算命令。

    export HADOOP_CONF_DIR=/`{`Directory`}`/hadoop/conf
    <!-- 该命令为指定环境使用于你的hadoop下的conf文件夹 -->
    flink run -t local /`{`Directory`}`/flink/examples/batch/WordCount.jar \
    ---input cfs://`{`Directory`}`/{document} \
    <!-- 此处文件夹下的数据作为输入 -->
    --output cfs://`{`Directory`}`/{document}
    <!-- 此处为经过WordCount计算后的输出文件 -->
    
  3. 查看计算结果。

    hdfs dfs -cat cfs://`{`Directory`}`/{document}
    <!-- 此处为计算后的文件路径 -->
    

    返回如下图类似信息,则表示配置 Flink 成功。
    图片