You need to enable JavaScript to run this app.
导航

TOS 桶文件变更自动触发 CDN 执行刷新/预热

最近更新时间2023.12.12 17:40:40

首次发布时间2023.11.07 11:03:05

本文介绍了一种监控源站的文件变更,用来控制 CDN 自动执行缓存刷新和文件预热任务的解决方案。该解决方案有以下特征:

  • 适用于静态资源存放在火山引擎对象存储 TOS,并且已启用 CDN 服务(不限于火山引擎 CDN)的站点。
  • 它基于火山引擎函数服务和多云CDN服务,在实施后有以下效果:
    • 通过函数服务的触发器监控源站 TOS 桶内的文件变更事件(如增加、移除、修改等),事件发生时将触发函数的运行。
    • 函数在运行时主动调用多云CDN的提交刷新/预热任务接口。多云CDN的接口能够向所有满足条件的云服务商同时提交刷新、预热请求。

使用限制

  • 只支持多云CDN能够接入的 CDN 服务商。更多信息,请参见使用限制
  • 遵守多云CDN相关接口的使用限制。更多信息,请参见创建缓存刷新任务创建文件预热任务
  • 每日能够向云服务商系统提交的刷新/预热任务的数量受限于云服务商系统。

前提条件

  • 您的站点的业务数据(图片、视频、音频、JaveScript 脚本、文件等)存放在火山引擎对象存储 TOS 服务。
  • 您为站点启用了 CDN 服务。在 CDN 配置中,“加速域名”是您的站点域名,“回源地址”是存放业务数据的 TOS Bucket 域名。
  • 云服务商的 CDN 加速域名已经同步到了多云CDN。更多信息,请参见 CDN 加速域名同步

    为便于描述,本文以火山引擎 CDN 服务为例。其他云服务商的 CDN 服务(例如,阿里云 CDN、华为云 CDN 等)也是支持的。

  • 您已经开通了火山引擎函数服务。更多信息,请参见函数服务 — 快速入门

准备工作

获取火山引擎账号的 API 访问密钥。具体步骤,请参见 Access Key(密钥)管理

注意

函数服务使用该账号的 API 访问密钥调用多云CDN的提交刷新/预热任务接口。因此,如果账号是 IAM 用户账号,您需要确保 IAM 用户账号具有调用多云CDN相关接口的权限。

操作步骤

  1. 登录火山引擎函数服务控制台

  2. 在左侧导航栏,单击 函数列表

  3. 函数列表 页面,单击 创建函数

  4. 完成 创建函数 配置向导。

    1. 选择函数模板
      完成以下参数配置,然后单击 下一步:函数配置

      • 自定义创建:固定为 基于 Hello World 创建
      • 基于模板创建:先将 语言 设置为 Python 3.8,然后选择 vefaas-python38-default
    2. 函数配置
      函数代码 区域,将 index.py 的内容替换为以下代码,然后单击 确定
      在使用以下代码前,您必须替换部分参数的值,具体包括:

      • AK(第 15 行) 和 SK(第 16 行):将 <ak><sk> 替换成您的火山引擎账号的 API 访问密钥。
      • updateURL[N](第 127+[N-1] 行):将 <domain[N]>:替换成您的 CDN 加速域名。如果您有多个加速域名,需要设置多个 updateURL。

        示例:
        假设您有两个加速域名,您需要分别设置 updateURL1、updateURL2,将其中的 <domain1><domain2> 替换成加速域名。

      • Urls(第 131 和 154 行):根据您设置的 updateURL 修改 Urls 的内容。
        Urls 是由 updateURL 拼接得到的。如果您设置了多个 updateURL,需要根据实际情况修改 Urls 的拼接方式。

        示例:
        假设您设置了 updateURL1、updateURL2、updateURL3,则需要将 Urls 设置成 updateURL1+"\n"+updateURL2+"\n"+updateURL3

      alt

      #!/usr/bin/env python
      # -*- coding: utf-8 -*- 
      
      import datetime
      import hashlib
      import hmac
      import json
      from urllib.parse import quote
      import requests
      
      Service = "mcdn"
      Version = "2022-03-01"
      Region = "cn-north-1"
      Host = "open.volcengineapi.com"
      AK = "<ak>"
      SK = "<sk>"
      
      
      
      def norm_query(params):
          query = ""
          for key in sorted(params.keys()):
              if type(params[key]) == list:
                  for k in params[key]:
                      query = (
                              query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&"
                      )
              else:
                  query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&")
          query = query[:-1]
          return query.replace("+", "%20")
      
      
      # 第一步:准备辅助函数。
      # sha256 非对称加密
      def hmac_sha256(key: bytes, content: str):
          return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
      
      
      # sha256 hash算法
      def hash_sha256(content: str):
          return hashlib.sha256(content.encode("utf-8")).hexdigest()
      
      
      # 第二步:创建一个 MCDN 的 API 请求函数。签名计算的过程包含在该函数中。
      def request(method, query, header, ak, sk, action, body):
          # 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表
          # AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。
          # 初始化身份证明结构体
          credential = {
              "access_key_id": AK,
              "secret_access_key": SK,
              "service": Service,
              "region": Region,
          }
          # 初始化签名结构体
          request_param = {
              "body": json.dumps(body),
              "host": Host,
              "path": "/",
              "method": method,
              "content_type": "application/json",
              "date": datetime.datetime.utcnow(),
              "query": {"Action": action, "Version": Version, **query},
          }
          # 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。
          # 初始化签名结果的结构体
          x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ")
          short_x_date = x_date[:8]
          x_content_sha256 = hash_sha256(request_param["body"])
          sign_result = {
              "Host": request_param["host"],
              "X-Content-Sha256": x_content_sha256,
              "X-Date": x_date,
              "Content-Type": request_param["content_type"],
          }
          # 第五步:计算 Signature 签名。
          signed_headers_str = ";".join(
              ["content-type", "host", "x-content-sha256", "x-date"]
          )
          canonical_request_str = "\n".join(
              [request_param["method"],
               request_param["path"],
               norm_query(request_param["query"]),
               "\n".join(
                   [
                       "content-type:" + request_param["content_type"],
                       "host:" + request_param["host"],
                       "x-content-sha256:" + x_content_sha256,
                       "x-date:" + x_date,
                   ]
               ),
               "",
               signed_headers_str,
               x_content_sha256,
               ]
          )
          hashed_canonical_request = hash_sha256(canonical_request_str)
          credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
          string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
          k_date = hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date)
          k_region = hmac_sha256(k_date, credential["region"])
          k_service = hmac_sha256(k_region, credential["service"])
          k_signing = hmac_sha256(k_service, "request")
          signature = hmac_sha256(k_signing, string_to_sign).hex()
          sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
              credential["access_key_id"] + "/" + credential_scope,
              signed_headers_str,
              signature,
          )
          header = {**header, **sign_result}
          # 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。
          r = requests.post("https://{}{}".format(request_param["host"], request_param["path"]),
                            headers=header,
                            params=request_param["query"],
                            data=request_param["body"],
                            )
          return r.json()
      
      
      
      def handler(event, context):
          print(f"received new request, event content: {event}")
          print(json.dumps(event, sort_keys=True, indent=4))
      
          uri = event["data"]["events"][0]["tos"]['object']['key']
          updateURL1 = "https://domain1/" + uri
          updateURL2 = "https://domain2/" + uri
      
          refresh_request_body = {
              "Urls": updateURL1+"\n"+updateURL2,
              "Type": "file",
          }
          
          refresh_response_body = request("POST", {}, {}, AK, SK, "SubmitRefreshTask", refresh_request_body)
          print(refresh_response_body)
      
          preload_response_body = {
              "ResponseMetadata": {
                  "RequestID": "",
                  "Action": "SubmitPreloadTask",
                  "Version": "2022-03-01",
                  "Service": "mcdn",
                  "Region": "cn-north-1"
              },
              "Result": {
                  "TaskId": ""
              }
          }
      
          if "Error" not in refresh_response_body["ResponseMetadata"]:
              print("刷新已成功,执行预热步骤")
              preload_request_body = {
                  "Urls": updateURL1+"\n"+updateURL2,
              }
              preload_response_body = request("POST", {}, {}, AK, SK, "SubmitPreloadTask", preload_request_body)
          print(preload_response_body)
          
          result = {
              'statusCode': 400,
              'headers': {
                  'Content-Type': 'application/json'
              },
              'body': json.dumps({
                  'message': "预热失败"
              })
          }
              
          if "Error" not in preload_response_body["ResponseMetadata"]:
              print("预热已成功")
              result = {
                  'statusCode': 200,
                  'headers': {
                      'Content-Type': 'application/json'
                  },
                  'body': json.dumps({
                      'message': preload_response_body
                  })
              }
          print(result)
          return result
      
      
  5. 发布函数。

    1. 单击 发布
    2. 完成以下参数配置,然后单击 确定
      • 函数版本:默认已选择 Latest
      • 版本描述:为函数添加备注。示例:SubmitRefreshTask
      • 实例数上限:首次发布函数时根据设置该参数。建议保留默认值。更多信息,请参见发布函数
  6. 创建函数触发器。

    1. 发布完成后,单击 触发器 页签。
    2. 单击 创建触发器
    3. 创建触发器 面板,完成以下参数配置,然后单击 确定
      • 触发器类型:选择 TOS 触发器
      • 触发器名称:为触发器设置一个名称。示例:TriggerRefreshTask
      • TOS Bucket:选择存放站点资源的 TOS Bucket。
      • 触发事件:选择函数服务需要监听的事件,支持多选。关于函数服务支持监听的所有 TOS 事件及含义,请参见 事件列表

        为方便验证,本教程以选择 tos:ObjectCreated:* 事件为例。 tos:ObjectCreated:* 事件表示当 Bucket 发生文件增加事件时,函数将被触发。

    alt

完成以上操作后,您可以参照结果验证,验证配置是否已生效。

结果验证

在 TOS Bucket 中上传一个测试文件,然后前往多云CDN控制台查看刷新和预热任务的操作记录。

  1. 登录多云CDN控制台

  2. 在左侧导航栏,选择 加速管理 > 内容管理
  3. 单击 操作记录 页签,然后根据您上传测试文件的时间,在任务列表中查找相关的刷新和预热任务记录。

配置正确的情况下,您可以在列表中找到相关的刷新和预热任务。

alt

您可以将任务记录展开,查看多云CDN向云服务商提交的刷新和预热请求。您单击对应的 TaskID 后,页面将跳转到对应任务的执行状态页。该页面展示了由云服务商返回的任务执行状态。更多信息,请参见查看执行状态
alt

常见问题

遇到请求超限问题时,如何解决?

在部署自动刷新预热方案后,如果您的 TOS 桶文件发生较大的突然变化,将可能触发大量的刷新和预热请求。这有可能导致您的请求数量超过多云CDN接口的 QPS 限制,或云服务商系统的刷新/预热任务限额。

一旦您的请求超过了相关的限制,您可以在多云CDN控制台的 内容管理 页面查询到对应错误信息。

  • 如果在 操作记录 中,任务的 提交状态失败,表示您的请求超过了多云CDN接口的 QPS 限制。
  • 如果在 操作记录 中,任务的 提交状态成功,但是对应的 云服务商TaskID执行状态失败,则表示您的请求超过了云服务商系统的刷新/预热任务限额。

对于请求超限的问题,您可以采用以下方法进行缓解:在函数代码中增加随机 sleep 的逻辑,使函数随机暂停运行一段时间。

以下是一段包含 sleep 逻辑的示例代码,供您参考。您可以根据实际情况修改其中的 sleep 相关配置,然后使用这段代码替换函数代码 index.py 的内容。

说明

如果使用该方法仍无法解决您的问题,请提交工单联系技术支持。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import datetime
import time
import random
import hashlib
import hmac
import json
from urllib.parse import quote
import requests

Service = "mcdn"
Version = "2022-03-01"
Region = "cn-north-1"
Host = "open.volcengineapi.com"
AK = "AK"
SK = "SK"
cdnURLs = ['https://example.com/']
Preload = False



def norm_query(params):
    query = ""
    for key in sorted(params.keys()):
        if type(params[key]) == list:
            for k in params[key]:
                query = (
                        query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&"
                )
        else:
            query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&")
    query = query[:-1]
    return query.replace("+", "%20")


# 第一步:准备辅助函数。
# sha256 非对称加密
def hmac_sha256(key: bytes, content: str):
    return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()


# sha256 hash算法
def hash_sha256(content: str):
    return hashlib.sha256(content.encode("utf-8")).hexdigest()


# 第二步:创建一个 MCDN 的 API 请求函数。签名计算的过程包含在该函数中。
def request(method, query, header, ak, sk, action, body):
    # 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表
    # AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。
    # 初始化身份证明结构体
    credential = {
        "access_key_id": ak,
        "secret_access_key": sk,
        "service": Service,
        "region": Region,
    }
    # 初始化签名结构体
    request_param = {
        "body": json.dumps(body),
        "host": Host,
        "path": "/",
        "method": method,
        "content_type": "application/json",
        "date": datetime.datetime.utcnow(),
        "query": {"Action": action, "Version": Version, **query},
    }
    # 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。
    # 初始化签名结果的结构体
    x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ")
    short_x_date = x_date[:8]
    x_content_sha256 = hash_sha256(request_param["body"])
    sign_result = {
        "Host": request_param["host"],
        "X-Content-Sha256": x_content_sha256,
        "X-Date": x_date,
        "Content-Type": request_param["content_type"],
    }
    # 第五步:计算 Signature 签名。
    signed_headers_str = ";".join(
        ["content-type", "host", "x-content-sha256", "x-date"]
    )
    canonical_request_str = "\n".join(
        [request_param["method"],
         request_param["path"],
         norm_query(request_param["query"]),
         "\n".join(
             [
                 "content-type:" + request_param["content_type"],
                 "host:" + request_param["host"],
                 "x-content-sha256:" + x_content_sha256,
                 "x-date:" + x_date,
             ]
         ),
         "",
         signed_headers_str,
         x_content_sha256,
         ]
    )
    hashed_canonical_request = hash_sha256(canonical_request_str)
    credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
    string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
    k_date = hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date)
    k_region = hmac_sha256(k_date, credential["region"])
    k_service = hmac_sha256(k_region, credential["service"])
    k_signing = hmac_sha256(k_service, "request")
    signature = hmac_sha256(k_signing, string_to_sign).hex()
    sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
        credential["access_key_id"] + "/" + credential_scope,
        signed_headers_str,
        signature,
    )
    header = {**header, **sign_result}
    # 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。
    r = requests.post("http://{}{}".format(request_param["host"], request_param["path"]),
                      headers=header,
                      params=request_param["query"],
                      data=request_param["body"],
                      )
    return r.json()



def handler(event, context):
    print(f"received new request, event content: {event}")
    print(json.dumps(event, sort_keys=True, indent=4))

    uri = event["data"]["events"][0]["tos"]['object']['key']
    action = event["data"]["events"][0]["eventName"].split(':')[2]
    if action in ['Delete', 'DeleteMarkerCreated']:
        global Preload
        Preload = False
    sleep_max_time = 30
    updateURL = ""
    #判断尾部为index.html还有尾部为'/'的uri
    #尾部为index.html的文件需要额外加上一个刷新cdnurl+index.html之前的部分
    #尾部为'/'的文件需要排除,不需要刷新目录(实际上是文件刷新)预热目录会失败
    if uri.endswith('/'):
        print ("{}{} dont need refresh and preload!".format(cdnURLs[0],uri))
        refresh_response_body = "{}{} dont need refresh and preload!".format(cdnURLs[0],uri)
        result = {
            'statusCode': 200,
            'headers': {
                'Content-Type': 'application/json'
            },
            'body': json.dumps({
                'message': refresh_response_body
            })
        }
        return result
    for cdnURL in cdnURLs:
        if updateURL == "":
            updateURL = updateURL + cdnURL + uri
        else:
            updateURL = updateURL + "\n" + cdnURL + uri
        if uri == 'index.html':
            updateURL = updateURL + "\n" + cdnURL
            sleep_max_time = 3
        elif uri.endswith('/index.html'):
            updateURL = updateURL + "\n" + cdnURL + uri.rsplit('/', 1)[0] + '/'
            sleep_max_time = 3
    refresh_request_body = {
        "Urls": updateURL,
        "Type": "file",
    }
    preload_request_body = {
        "Urls": updateURL,
    }
    #结果模板,默认失败
    result = {
        'statusCode': 400,
        'headers': {
            'Content-Type': 'application/json'
        },
        'body': json.dumps({
            'message': "刷新失败"
        })
    }
    #打印一下刷新的urls
    print (refresh_request_body['Urls'])
    time.sleep(random.SystemRandom().randint(2, sleep_max_time))
    #失败重试5次,中间随机sleep
    for i in range(5):
        refresh_response_body = request("POST", {}, {}, AK, SK, "SubmitRefreshTask", refresh_request_body)
        print(refresh_response_body)

        if "Error" not in refresh_response_body["ResponseMetadata"]:
            print("刷新已成功!")
            result = {
                'statusCode': 200,
                'headers': {
                    'Content-Type': 'application/json'
                },
                'body': json.dumps({
                    'message': refresh_response_body
                })
            }
            break
        else:
            time.sleep(random.SystemRandom().randint(2, 5))
            print ('start refresh retry {}...'.format(i+1))
    #不进行预热直接返回刷新结果
    if not Preload:
        print(result)
        return result
    else:
        print("进行预热!")
    #预热处理
    result = {
        'statusCode': 400,
        'headers': {
            'Content-Type': 'application/json'
        },
        'body': json.dumps({
            'message': "预热失败"
        })
    }
    for i in range(5):
        preload_response_body = request("POST", {}, {}, AK, SK, "SubmitPreloadTask", preload_request_body)
        print(preload_response_body)

        if "Error" not in preload_response_body["ResponseMetadata"]:
            print("预热已成功")
            result = {
                'statusCode': 200,
                'headers': {
                    'Content-Type': 'application/json'
                },
                'body': json.dumps({
                    'message': preload_response_body
                })
            }
        print(result)
        if result['statusCode'] == 200:
            return result
        else:
            time.sleep(random.SystemRandom().randint(2, 5))
            print ('start preload retry {}...'.format(i+1))