You need to enable JavaScript to run this app.
多云CDN

多云CDN

复制全文
最佳实践
对象存储文件变更自动触发 CDN 执行刷新/预热
复制全文
对象存储文件变更自动触发 CDN 执行刷新/预热

本文介绍了一种监控源站的文件变更,用来控制 CDN 自动执行缓存刷新和文件预热任务的解决方案。该解决方案的核心是利用云服务商(火山引擎、阿里云、腾讯云、AWS)的函数计算服务,捕获其对象存储服务中的文件变更事件,并调用多云CDN的接口,向接入多云CDN的云服务商(不限于以上四家)下发刷新与预热任务。

方案简介

工作原理

当您更新或上传文件到对象存储(火山引擎 TOS、阿里云 OSS、腾讯云 COS、AWS S3)时,本文介绍的方案能够自动触发多云CDN的刷新或预热任务。这确保了 CDN 边缘节点上的内容能及时更新,用户始终访问到最新版本的文件。

该方案的核心是利用云服务商的函数计算服务(火山引擎函数服务、阿里云函数计算、腾讯云云函数、AWS Lambda),通过以下步骤实现自动化:

  1. 创建自定义函数:在函数服务中编写并部署自定义函数,将对象存储文件变更(如上传、修改、删除)事件配置为函数计算的触发器。
  2. 函数触发与执行:当对象存储文件发生变更时,自动触发预先部署的函数。
  3. 调用多云CDN API:函数代码从事件中提取变更文件的 URL,并调用多云CDN的 API 接口,向所有已接入的云服务商下发刷新或预热任务。

适用场景

本方案适用于以下任一云服务商的对象存储与函数计算服务组合:

  • 火山引擎:对象存储(TOS)与函数服务
  • 阿里云:对象存储(OSS)与函数计算(FC)
  • 腾讯云:对象存储(COS)与云函数(SCF)
  • AWS:Simple Storage Service (S3) 与 Lambda

本文将以火山引擎 TOS 和函数服务为例,详细介绍配置步骤。关于其他云服务商的配置差异,请参见其他云服务商配置说明

使用限制

  • 多云CDN平台:函数最终调用的 API 为多云CDN的刷新/预热接口。因此,刷新和预热任务能够下发给的云服务商必须是多云CDN所支持的,具体包括多云CDN内置加速服务以及通过多云CDN接入的第三方加速服务。更多信息,请参见使用限制
  • API 调用限制:
    • 多云CDN:刷新和预热操作的调用频率(QPS)受多云CDN的接口限制。详情请参见创建缓存刷新任务创建文件预热任务
    • 第三方加速服务:每日可通过多云CDN向第三方加速服务提交的任务配额受相应云服务商的限制。

前提条件

在开始操作前,请确保您已完成以下准备工作:

  • 对象存储配置:您的业务数据(如图片、视频、脚本等)已存储在支持的云服务商对象存储服务中(如火山引擎 TOS、AWS S3、阿里云 OSS 或腾讯云 COS)。
  • CDN 服务配置:
    • 您已为网站或应用启用 CDN 加速,并将源站设置为您的对象存储 Bucket 域名。
    • 您的加速域名已添加或同步到多云CDN平台。更多信息,请参见手动添加域名加速域名同步
  • 函数计算服务:您已开通所选云服务商的函数计算服务。本文以火山引擎函数服务为例,详情请参见函数服务快速入门
  • API 访问密钥:您已获取火山引擎账号的 API 访问密钥(Access Key)。具体步骤,请参见Access Key(密钥)管理

    注意

    函数将使用此密钥调用多云CDN的 API。若使用 IAM 用户,请确保该用户拥有调用多云CDN刷新/预热接口的权限。

操作步骤

本节以火山引擎函数服务为例,介绍如何创建并配置函数。

步骤一:创建函数

  1. 登录火山引擎函数服务控制台
  2. 在左侧导航栏,单击 函数列表
  3. 函数列表 页面,单击 创建函数
  4. 完成 创建函数 配置向导。
    1. 选择函数模板
      完成以下参数配置,然后单击 下一步:函数配置

      • 自定义创建:固定为 基于 Hello World 创建
      • 基于模板创建:先将 语言 设置为 Python 3.8,然后选择 vefaas-python38-default
    2. 函数配置
      函数代码 区域,将 index.py 的内容替换为以下代码,然后单击 确定
      在使用以下代码前,您必须替换部分参数的值,具体包括:

      • AK(第 15 行) 和 SK(第 16 行):将 <ak><sk> 替换成您的火山引擎账号的 API 访问密钥。
      • updateURL[N](第 127+[N-1] 行):将 <domain[N]>:替换成您的加速域名。如果您有多个加速域名,需要设置多个 updateURL。

        示例:
        假设您有两个加速域名,您需要分别设置 updateURL1、updateURL2,将其中的 <domain1><domain2> 替换成加速域名。

      • Urls(第 131 和 154 行):根据您设置的 updateURL 修改 Urls 的内容。
        Urls 是由 updateURL 拼接得到的。如果您设置了多个 updateURL,需要根据实际情况修改 Urls 的拼接方式。

        示例:
        假设您设置了 updateURL1、updateURL2、updateURL3,则需要将 Urls 设置成 updateURL1+"\n"+updateURL2+"\n"+updateURL3

      alt

      #!/usr/bin/env python
      # -*- coding: utf-8 -*- 
      
      import datetime
      import hashlib
      import hmac
      import json
      from urllib.parse import quote
      import requests
      
      Service = "mcdn"
      Version = "2022-03-01"
      Region = "cn-north-1"
      Host = "open.volcengineapi.com"
      AK = "<ak>"
      SK = "<sk>"
      
      
      
      def norm_query(params):
          query = ""
          for key in sorted(params.keys()):
              if type(params[key]) == list:
                  for k in params[key]:
                      query = (
                              query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&"
                      )
              else:
                  query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&")
          query = query[:-1]
          return query.replace("+", "%20")
      
      
      # 第一步:准备辅助函数。
      # sha256 非对称加密
      def hmac_sha256(key: bytes, content: str):
          return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
      
      
      # sha256 hash算法
      def hash_sha256(content: str):
          return hashlib.sha256(content.encode("utf-8")).hexdigest()
      
      
      # 第二步:创建一个多云CDN的 API 请求函数。签名计算的过程包含在该函数中。
      def request(method, query, header, ak, sk, action, body):
          # 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表
          # AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。
          # 初始化身份证明结构体
          credential = {
              "access_key_id": AK,
              "secret_access_key": SK,
              "service": Service,
              "region": Region,
          }
          # 初始化签名结构体
          request_param = {
              "body": json.dumps(body),
              "host": Host,
              "path": "/",
              "method": method,
              "content_type": "application/json",
              "date": datetime.datetime.utcnow(),
              "query": {"Action": action, "Version": Version, **query},
          }
          # 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。
          # 初始化签名结果的结构体
          x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ")
          short_x_date = x_date[:8]
          x_content_sha256 = hash_sha256(request_param["body"])
          sign_result = {
              "Host": request_param["host"],
              "X-Content-Sha256": x_content_sha256,
              "X-Date": x_date,
              "Content-Type": request_param["content_type"],
          }
          # 第五步:计算 Signature 签名。
          signed_headers_str = ";".join(
              ["content-type", "host", "x-content-sha256", "x-date"]
          )
          canonical_request_str = "\n".join(
              [request_param["method"],
               request_param["path"],
               norm_query(request_param["query"]),
               "\n".join(
                   [
                       "content-type:" + request_param["content_type"],
                       "host:" + request_param["host"],
                       "x-content-sha256:" + x_content_sha256,
                       "x-date:" + x_date,
                   ]
               ),
               "",
               signed_headers_str,
               x_content_sha256,
               ]
          )
          hashed_canonical_request = hash_sha256(canonical_request_str)
          credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
          string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
          k_date = hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date)
          k_region = hmac_sha256(k_date, credential["region"])
          k_service = hmac_sha256(k_region, credential["service"])
          k_signing = hmac_sha256(k_service, "request")
          signature = hmac_sha256(k_signing, string_to_sign).hex()
          sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
              credential["access_key_id"] + "/" + credential_scope,
              signed_headers_str,
              signature,
          )
          header = {**header, **sign_result}
          # 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。
          r = requests.post("https://{}{}".format(request_param["host"], request_param["path"]),
                            headers=header,
                            params=request_param["query"],
                            data=request_param["body"],
                            )
          return r.json()
      
      
      
      def handler(event, context):
          print(f"received new request, event content: {event}")
          print(json.dumps(event, sort_keys=True, indent=4))
      
          uri = event["data"]["events"][0]["tos"]['object']['key']
          updateURL1 = "https://domain1/" + uri
          updateURL2 = "https://domain2/" + uri
      
          refresh_request_body = {
              "Urls": updateURL1+"\n"+updateURL2,
              "Type": "file",
          }
          
          refresh_response_body = request("POST", {}, {}, AK, SK, "SubmitRefreshTask", refresh_request_body)
          print(refresh_response_body)
      
          preload_response_body = {
              "ResponseMetadata": {
                  "RequestID": "",
                  "Action": "SubmitPreloadTask",
                  "Version": "2022-03-01",
                  "Service": "mcdn",
                  "Region": "cn-north-1"
              },
              "Result": {
                  "TaskId": ""
              }
          }
      
          if "Error" not in refresh_response_body["ResponseMetadata"]:
              print("刷新已成功,执行预热步骤")
              preload_request_body = {
                  "Urls": updateURL1+"\n"+updateURL2,
              }
              preload_response_body = request("POST", {}, {}, AK, SK, "SubmitPreloadTask", preload_request_body)
          print(preload_response_body)
          
          result = {
              'statusCode': 400,
              'headers': {
                  'Content-Type': 'application/json'
              },
              'body': json.dumps({
                  'message': "预热失败"
              })
          }
              
          if "Error" not in preload_response_body["ResponseMetadata"]:
              print("预热已成功")
              result = {
                  'statusCode': 200,
                  'headers': {
                      'Content-Type': 'application/json'
                  },
                  'body': json.dumps({
                      'message': preload_response_body
                  })
              }
          print(result)
          return result
      
      
  5. 发布函数。
    1. 单击 发布
    2. 完成以下参数配置,然后单击 确定
      • 函数版本:默认已选择 Latest
      • 版本描述:为函数添加备注。示例:SubmitRefreshTask
      • 实例数上限:首次发布函数时根据设置该参数。建议保留默认值。更多信息,请参见发布函数

步骤二:配置触发器

  1. 在函数详情页,单击 触发器 页签。
  2. 单击 创建触发器
  3. 创建触发器 面板,完成以下参数配置,然后单击 确定
    • 触发器类型:选择 TOS 触发器
    • 触发器名称:为触发器设置一个名称。示例:TriggerRefreshTask
    • TOS Bucket:选择存放网站资源的 TOS Bucket。
    • 触发事件:选择函数服务需要监听的事件,支持多选。关于函数服务支持监听的所有 TOS 事件及含义,请参见 事件列表

      为方便验证,本教程以选择 tos:ObjectCreated:* 事件为例。 tos:ObjectCreated:* 事件表示当 Bucket 发生文件增加事件时,函数将被触发。

alt

完成以上操作后,您可以参照结果验证,验证配置是否已生效。

结果验证

在 TOS Bucket 中上传一个测试文件,然后前往多云CDN控制台查看刷新和预热任务的操作记录。

  1. 登录多云CDN控制台

  2. 在左侧导航栏,单击 刷新预热
  3. 单击 操作记录 页签,然后根据您上传测试文件的时间,在任务列表中查找相关的刷新和预热任务记录。

配置正确的情况下,您可以在列表中找到相关的刷新和预热任务。

您单击对应的 查看执行结果 后,页面将跳转到对应任务的执行状态页。该页面展示了由云服务商返回的任务执行状态。更多信息,请参见查看执行状态
alt

常见问题

其他云服务商配置说明

本方案以火山引擎 TOS 和函数服务为例,介绍了如何通过文件变更事件触发函数,进而实现 CDN 内容的自动刷新或预热。在 AWS、阿里云或腾讯云等其他云服务商环境中部署此方案时,您需要关注以下关键差异,并对配置和代码进行相应调整。

关键差异对比

项目火山引擎AWS阿里云腾讯云
对象存储对象存储 (TOS)Simple Storage Service (S3)对象存储 (OSS)对象存储 (COS)
函数计算函数服务 (VEF)Lambda函数计算 (FC)云函数 (SCF)
触发器配置在函数服务中配置 TOS 触发器在 Lambda 中配置 S3 触发器在函数计算中配置 OSS 触发器在云函数中配置 COS 触发器
事件结构事件结构与其他云不同事件结构与其他云不同事件结构与其他云不同事件结构与其他云不同

代码调整示例

以下 Python 代码示例展示了如何修改 handler 函数和增加 extract_object_key 函数,以兼容来自不同云服务商的对象存储事件。

  1. extract_object_key 函数。

    该函数用于从不同云服务商的事件体中提取触发事件的对象键(即文件名)。

    def extract_object_key(event):
        # 火山引擎 TOS
        try:
            return event["data"]["events"][0]["tos"]['object']['key']
        except Exception:
            pass
        # AWS S3
        try:
            return event["Records"][0]["s3"]["object"]["key"]
        except Exception:
            pass
        # 阿里云 OSS
        try:
            return event["events"][0]["oss"]["object"]["key"]
        except Exception:
            pass
        # 腾讯云 COS
        try:
            return event["Records"][0]["cos"]["cosObject"]["key"]
        except Exception:
            pass
        return None
    
  2. handler 函数。

    您需要更新主 handler 函数,调用 extract_object_key 获取文件名,并增加对不同事件输入类型(如阿里云函数计算可能传递 bytesstr 类型)的兼容处理。

    import json
    
    def handler(event, context):
        # 兼容阿里云 FC 的 bytes/string 输入
        if isinstance(event, (bytes, str)):
            try:
                event = json.loads(event)
            except Exception:
                pass
        
        uri = extract_object_key(event)
        if not uri:
            print("No object key found in event")
            return {"statusCode": 400, "body": "No object key found"}
        
        # ... 后续调用多云CDN接口的逻辑 ...
    

在您的 handler 函数中,请替换原有的硬编码的对象键提取逻辑(uri = event["data"]["events"][0]["tos"]['object']['key']),改为调用 extract_object_key(event)

如何应对请求速率超限问题?

当对象存储中的文件在短时间内发生大量变更时(例如,一次性上传成百上千个文件),可能会触发密集的函数调用,导致向多云CDN或第三方 CDN 厂商提交的刷新/预热请求超出速率限制。

问题诊断

您可以在多云CDN控制台的 刷新预热 > 操作记录 页面诊断问题:

  • 任务状态为“待执行”:表示提交到多云CDN的请求超过了 QPS(每秒查询率)限制,任务正在排队等待处理。
  • 任务状态为“已完成”,但执行结果为“失败”:表示任务已下发至第三方 CDN,但超出了该厂商的每日或每分钟的配额限制。

解决方案

为缓解此问题,您可以在函数代码中引入随机延迟和重试机制。这有助于平滑请求峰值,避免在短时间内集中发送大量请求。

以下是一段包含 sleep 逻辑(使函数随机暂停运行一段时间)的示例代码,供您参考。您可以根据实际情况修改其中的 sleep 相关配置,然后使用这段代码替换函数代码 index.py 的内容。

说明

如果引入延迟和重试机制后问题依然存在,建议您提交工单联系技术支持,以获取针对您业务场景的进一步帮助。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import datetime
import time
import random
import hashlib
import hmac
import json
from urllib.parse import quote
import requests

Service = "mcdn"
Version = "2022-03-01"
Region = "cn-north-1"
Host = "open.volcengineapi.com"
AK = "AK"
SK = "SK"
cdnURLs = ['https://example.com/']
Preload = False



def norm_query(params):
    query = ""
    for key in sorted(params.keys()):
        if type(params[key]) == list:
            for k in params[key]:
                query = (
                        query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&"
                )
        else:
            query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&")
    query = query[:-1]
    return query.replace("+", "%20")


# 第一步:准备辅助函数。
# sha256 非对称加密
def hmac_sha256(key: bytes, content: str):
    return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()


# sha256 hash算法
def hash_sha256(content: str):
    return hashlib.sha256(content.encode("utf-8")).hexdigest()


# 第二步:创建一个多云CDN的 API 请求函数。签名计算的过程包含在该函数中。
def request(method, query, header, ak, sk, action, body):
    # 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表
    # AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。
    # 初始化身份证明结构体
    credential = {
        "access_key_id": ak,
        "secret_access_key": sk,
        "service": Service,
        "region": Region,
    }
    # 初始化签名结构体
    request_param = {
        "body": json.dumps(body),
        "host": Host,
        "path": "/",
        "method": method,
        "content_type": "application/json",
        "date": datetime.datetime.utcnow(),
        "query": {"Action": action, "Version": Version, **query},
    }
    # 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。
    # 初始化签名结果的结构体
    x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ")
    short_x_date = x_date[:8]
    x_content_sha256 = hash_sha256(request_param["body"])
    sign_result = {
        "Host": request_param["host"],
        "X-Content-Sha256": x_content_sha256,
        "X-Date": x_date,
        "Content-Type": request_param["content_type"],
    }
    # 第五步:计算 Signature 签名。
    signed_headers_str = ";".join(
        ["content-type", "host", "x-content-sha256", "x-date"]
    )
    canonical_request_str = "\n".join(
        [request_param["method"],
         request_param["path"],
         norm_query(request_param["query"]),
         "\n".join(
             [
                 "content-type:" + request_param["content_type"],
                 "host:" + request_param["host"],
                 "x-content-sha256:" + x_content_sha256,
                 "x-date:" + x_date,
             ]
         ),
         "",
         signed_headers_str,
         x_content_sha256,
         ]
    )
    hashed_canonical_request = hash_sha256(canonical_request_str)
    credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
    string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
    k_date = hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date)
    k_region = hmac_sha256(k_date, credential["region"])
    k_service = hmac_sha256(k_region, credential["service"])
    k_signing = hmac_sha256(k_service, "request")
    signature = hmac_sha256(k_signing, string_to_sign).hex()
    sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
        credential["access_key_id"] + "/" + credential_scope,
        signed_headers_str,
        signature,
    )
    header = {**header, **sign_result}
    # 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。
    r = requests.post("http://{}{}".format(request_param["host"], request_param["path"]),
                      headers=header,
                      params=request_param["query"],
                      data=request_param["body"],
                      )
    return r.json()



def handler(event, context):
    print(f"received new request, event content: {event}")
    print(json.dumps(event, sort_keys=True, indent=4))

    uri = event["data"]["events"][0]["tos"]['object']['key']
    action = event["data"]["events"][0]["eventName"].split(':')[2]
    if action in ['Delete', 'DeleteMarkerCreated']:
        global Preload
        Preload = False
    sleep_max_time = 30
    updateURL = ""
    #判断尾部为 index.html 还有尾部为 '/'的uri
    #尾部为 index.html 的文件需要额外加上一个刷新 cdnurl+index.html 之前的部分
    #尾部为 '/' 的文件需要排除,不需要刷新目录(实际上是文件刷新)预热目录会失败
    if uri.endswith('/'):
        print ("{}{} dont need refresh and preload!".format(cdnURLs[0],uri))
        refresh_response_body = "{}{} dont need refresh and preload!".format(cdnURLs[0],uri)
        result = {
            'statusCode': 200,
            'headers': {
                'Content-Type': 'application/json'
            },
            'body': json.dumps({
                'message': refresh_response_body
            })
        }
        return result
    for cdnURL in cdnURLs:
        if updateURL == "":
            updateURL = updateURL + cdnURL + uri
        else:
            updateURL = updateURL + "\n" + cdnURL + uri
        if uri == 'index.html':
            updateURL = updateURL + "\n" + cdnURL
            sleep_max_time = 3
        elif uri.endswith('/index.html'):
            updateURL = updateURL + "\n" + cdnURL + uri.rsplit('/', 1)[0] + '/'
            sleep_max_time = 3
    refresh_request_body = {
        "Urls": updateURL,
        "Type": "file",
    }
    preload_request_body = {
        "Urls": updateURL,
    }
    #结果模板,默认失败
    result = {
        'statusCode': 400,
        'headers': {
            'Content-Type': 'application/json'
        },
        'body': json.dumps({
            'message': "刷新失败"
        })
    }
    #打印刷新的 urls
    print (refresh_request_body['Urls'])
    time.sleep(random.SystemRandom().randint(2, sleep_max_time))
    #失败重试5次,中间随机 sleep
    for i in range(5):
        refresh_response_body = request("POST", {}, {}, AK, SK, "SubmitRefreshTask", refresh_request_body)
        print(refresh_response_body)

        if "Error" not in refresh_response_body["ResponseMetadata"]:
            print("刷新已成功!")
            result = {
                'statusCode': 200,
                'headers': {
                    'Content-Type': 'application/json'
                },
                'body': json.dumps({
                    'message': refresh_response_body
                })
            }
            break
        else:
            time.sleep(random.SystemRandom().randint(2, 5))
            print ('start refresh retry {}...'.format(i+1))
    #不进行预热直接返回刷新结果
    if not Preload:
        print(result)
        return result
    else:
        print("进行预热!")
    #预热处理
    result = {
        'statusCode': 400,
        'headers': {
            'Content-Type': 'application/json'
        },
        'body': json.dumps({
            'message': "预热失败"
        })
    }
    for i in range(5):
        preload_response_body = request("POST", {}, {}, AK, SK, "SubmitPreloadTask", preload_request_body)
        print(preload_response_body)

        if "Error" not in preload_response_body["ResponseMetadata"]:
            print("预热已成功")
            result = {
                'statusCode': 200,
                'headers': {
                    'Content-Type': 'application/json'
                },
                'body': json.dumps({
                    'message': preload_response_body
                })
            }
        print(result)
        if result['statusCode'] == 200:
            return result
        else:
            time.sleep(random.SystemRandom().randint(2, 5))
            print ('start preload retry {}...'.format(i+1))
最近更新时间:2025.07.21 21:52:47
这个页面对您有帮助吗?
有用
有用
无用
无用