本文介绍了一种监控源站的文件变更,用来控制 CDN 自动执行缓存刷新和文件预热任务的解决方案。该解决方案的核心是利用云服务商(火山引擎、阿里云、腾讯云、AWS)的函数计算服务,捕获其对象存储服务中的文件变更事件,并调用多云CDN的接口,向接入多云CDN的云服务商(不限于以上四家)下发刷新与预热任务。
当您更新或上传文件到对象存储(火山引擎 TOS、阿里云 OSS、腾讯云 COS、AWS S3)时,本文介绍的方案能够自动触发多云CDN的刷新或预热任务。这确保了 CDN 边缘节点上的内容能及时更新,用户始终访问到最新版本的文件。
该方案的核心是利用云服务商的函数计算服务(火山引擎函数服务、阿里云函数计算、腾讯云云函数、AWS Lambda),通过以下步骤实现自动化:
本方案适用于以下任一云服务商的对象存储与函数计算服务组合:
本文将以火山引擎 TOS 和函数服务为例,详细介绍配置步骤。关于其他云服务商的配置差异,请参见其他云服务商配置说明。
在开始操作前,请确保您已完成以下准备工作:
注意
函数将使用此密钥调用多云CDN的 API。若使用 IAM 用户,请确保该用户拥有调用多云CDN刷新/预热接口的权限。
本节以火山引擎函数服务为例,介绍如何创建并配置函数。
选择函数模板。
完成以下参数配置,然后单击 下一步:函数配置。
函数配置。
在 函数代码 区域,将 index.py 的内容替换为以下代码,然后单击 确定。
在使用以下代码前,您必须替换部分参数的值,具体包括:
<ak> 和 <sk> 替换成您的火山引擎账号的 API 访问密钥。<domain[N]>:替换成您的加速域名。如果您有多个加速域名,需要设置多个 updateURL。
示例:
假设您有两个加速域名,您需要分别设置 updateURL1、updateURL2,将其中的<domain1>和<domain2>替换成加速域名。
示例:
假设您设置了 updateURL1、updateURL2、updateURL3,则需要将 Urls 设置成updateURL1+"\n"+updateURL2+"\n"+updateURL3。

#!/usr/bin/env python # -*- coding: utf-8 -*- import datetime import hashlib import hmac import json from urllib.parse import quote import requests Service = "mcdn" Version = "2022-03-01" Region = "cn-north-1" Host = "open.volcengineapi.com" AK = "<ak>" SK = "<sk>" def norm_query(params): query = "" for key in sorted(params.keys()): if type(params[key]) == list: for k in params[key]: query = ( query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&" ) else: query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&") query = query[:-1] return query.replace("+", "%20") # 第一步:准备辅助函数。 # sha256 非对称加密 def hmac_sha256(key: bytes, content: str): return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() # sha256 hash算法 def hash_sha256(content: str): return hashlib.sha256(content.encode("utf-8")).hexdigest() # 第二步:创建一个多云CDN的 API 请求函数。签名计算的过程包含在该函数中。 def request(method, query, header, ak, sk, action, body): # 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表 # AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。 # 初始化身份证明结构体 credential = { "access_key_id": AK, "secret_access_key": SK, "service": Service, "region": Region, } # 初始化签名结构体 request_param = { "body": json.dumps(body), "host": Host, "path": "/", "method": method, "content_type": "application/json", "date": datetime.datetime.utcnow(), "query": {"Action": action, "Version": Version, **query}, } # 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。 # 初始化签名结果的结构体 x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ") short_x_date = x_date[:8] x_content_sha256 = hash_sha256(request_param["body"]) sign_result = { "Host": request_param["host"], "X-Content-Sha256": x_content_sha256, "X-Date": x_date, "Content-Type": request_param["content_type"], } # 第五步:计算 Signature 签名。 signed_headers_str = ";".join( ["content-type", "host", "x-content-sha256", "x-date"] ) canonical_request_str = "\n".join( [request_param["method"], request_param["path"], norm_query(request_param["query"]), "\n".join( [ "content-type:" + request_param["content_type"], "host:" + request_param["host"], "x-content-sha256:" + x_content_sha256, "x-date:" + x_date, ] ), "", signed_headers_str, x_content_sha256, ] ) hashed_canonical_request = hash_sha256(canonical_request_str) credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"]) string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request]) k_date = hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date) k_region = hmac_sha256(k_date, credential["region"]) k_service = hmac_sha256(k_region, credential["service"]) k_signing = hmac_sha256(k_service, "request") signature = hmac_sha256(k_signing, string_to_sign).hex() sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format( credential["access_key_id"] + "/" + credential_scope, signed_headers_str, signature, ) header = {**header, **sign_result} # 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。 r = requests.post("https://{}{}".format(request_param["host"], request_param["path"]), headers=header, params=request_param["query"], data=request_param["body"], ) return r.json() def handler(event, context): print(f"received new request, event content: {event}") print(json.dumps(event, sort_keys=True, indent=4)) uri = event["data"]["events"][0]["tos"]['object']['key'] updateURL1 = "https://domain1/" + uri updateURL2 = "https://domain2/" + uri refresh_request_body = { "Urls": updateURL1+"\n"+updateURL2, "Type": "file", } refresh_response_body = request("POST", {}, {}, AK, SK, "SubmitRefreshTask", refresh_request_body) print(refresh_response_body) preload_response_body = { "ResponseMetadata": { "RequestID": "", "Action": "SubmitPreloadTask", "Version": "2022-03-01", "Service": "mcdn", "Region": "cn-north-1" }, "Result": { "TaskId": "" } } if "Error" not in refresh_response_body["ResponseMetadata"]: print("刷新已成功,执行预热步骤") preload_request_body = { "Urls": updateURL1+"\n"+updateURL2, } preload_response_body = request("POST", {}, {}, AK, SK, "SubmitPreloadTask", preload_request_body) print(preload_response_body) result = { 'statusCode': 400, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': "预热失败" }) } if "Error" not in preload_response_body["ResponseMetadata"]: print("预热已成功") result = { 'statusCode': 200, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': preload_response_body }) } print(result) return result
为方便验证,本教程以选择
tos:ObjectCreated:*事件为例。tos:ObjectCreated:*事件表示当 Bucket 发生文件增加事件时,函数将被触发。

完成以上操作后,您可以参照结果验证,验证配置是否已生效。
在 TOS Bucket 中上传一个测试文件,然后前往多云CDN控制台查看刷新和预热任务的操作记录。
登录多云CDN控制台。
配置正确的情况下,您可以在列表中找到相关的刷新和预热任务。
您单击对应的 查看执行结果 后,页面将跳转到对应任务的执行状态页。该页面展示了由云服务商返回的任务执行状态。更多信息,请参见查看执行状态。
本方案以火山引擎 TOS 和函数服务为例,介绍了如何通过文件变更事件触发函数,进而实现 CDN 内容的自动刷新或预热。在 AWS、阿里云或腾讯云等其他云服务商环境中部署此方案时,您需要关注以下关键差异,并对配置和代码进行相应调整。
| 项目 | 火山引擎 | AWS | 阿里云 | 腾讯云 |
|---|---|---|---|---|
| 对象存储 | 对象存储 (TOS) | Simple Storage Service (S3) | 对象存储 (OSS) | 对象存储 (COS) |
| 函数计算 | 函数服务 (VEF) | Lambda | 函数计算 (FC) | 云函数 (SCF) |
| 触发器配置 | 在函数服务中配置 TOS 触发器 | 在 Lambda 中配置 S3 触发器 | 在函数计算中配置 OSS 触发器 | 在云函数中配置 COS 触发器 |
| 事件结构 | 事件结构与其他云不同 | 事件结构与其他云不同 | 事件结构与其他云不同 | 事件结构与其他云不同 |
以下 Python 代码示例展示了如何修改 handler 函数和增加 extract_object_key 函数,以兼容来自不同云服务商的对象存储事件。
extract_object_key 函数。
该函数用于从不同云服务商的事件体中提取触发事件的对象键(即文件名)。
def extract_object_key(event): # 火山引擎 TOS try: return event["data"]["events"][0]["tos"]['object']['key'] except Exception: pass # AWS S3 try: return event["Records"][0]["s3"]["object"]["key"] except Exception: pass # 阿里云 OSS try: return event["events"][0]["oss"]["object"]["key"] except Exception: pass # 腾讯云 COS try: return event["Records"][0]["cos"]["cosObject"]["key"] except Exception: pass return None
handler 函数。
您需要更新主 handler 函数,调用 extract_object_key 获取文件名,并增加对不同事件输入类型(如阿里云函数计算可能传递 bytes 或 str 类型)的兼容处理。
import json def handler(event, context): # 兼容阿里云 FC 的 bytes/string 输入 if isinstance(event, (bytes, str)): try: event = json.loads(event) except Exception: pass uri = extract_object_key(event) if not uri: print("No object key found in event") return {"statusCode": 400, "body": "No object key found"} # ... 后续调用多云CDN接口的逻辑 ...
在您的 handler 函数中,请替换原有的硬编码的对象键提取逻辑(uri = event["data"]["events"][0]["tos"]['object']['key']),改为调用 extract_object_key(event)。
当对象存储中的文件在短时间内发生大量变更时(例如,一次性上传成百上千个文件),可能会触发密集的函数调用,导致向多云CDN或第三方 CDN 厂商提交的刷新/预热请求超出速率限制。
您可以在多云CDN控制台的 刷新预热 > 操作记录 页面诊断问题:
为缓解此问题,您可以在函数代码中引入随机延迟和重试机制。这有助于平滑请求峰值,避免在短时间内集中发送大量请求。
以下是一段包含 sleep 逻辑(使函数随机暂停运行一段时间)的示例代码,供您参考。您可以根据实际情况修改其中的 sleep 相关配置,然后使用这段代码替换函数代码 index.py 的内容。
说明
如果引入延迟和重试机制后问题依然存在,建议您提交工单联系技术支持,以获取针对您业务场景的进一步帮助。
#!/usr/bin/env python # -*- coding: utf-8 -*- import datetime import time import random import hashlib import hmac import json from urllib.parse import quote import requests Service = "mcdn" Version = "2022-03-01" Region = "cn-north-1" Host = "open.volcengineapi.com" AK = "AK" SK = "SK" cdnURLs = ['https://example.com/'] Preload = False def norm_query(params): query = "" for key in sorted(params.keys()): if type(params[key]) == list: for k in params[key]: query = ( query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&" ) else: query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&") query = query[:-1] return query.replace("+", "%20") # 第一步:准备辅助函数。 # sha256 非对称加密 def hmac_sha256(key: bytes, content: str): return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() # sha256 hash算法 def hash_sha256(content: str): return hashlib.sha256(content.encode("utf-8")).hexdigest() # 第二步:创建一个多云CDN的 API 请求函数。签名计算的过程包含在该函数中。 def request(method, query, header, ak, sk, action, body): # 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表 # AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。 # 初始化身份证明结构体 credential = { "access_key_id": ak, "secret_access_key": sk, "service": Service, "region": Region, } # 初始化签名结构体 request_param = { "body": json.dumps(body), "host": Host, "path": "/", "method": method, "content_type": "application/json", "date": datetime.datetime.utcnow(), "query": {"Action": action, "Version": Version, **query}, } # 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。 # 初始化签名结果的结构体 x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ") short_x_date = x_date[:8] x_content_sha256 = hash_sha256(request_param["body"]) sign_result = { "Host": request_param["host"], "X-Content-Sha256": x_content_sha256, "X-Date": x_date, "Content-Type": request_param["content_type"], } # 第五步:计算 Signature 签名。 signed_headers_str = ";".join( ["content-type", "host", "x-content-sha256", "x-date"] ) canonical_request_str = "\n".join( [request_param["method"], request_param["path"], norm_query(request_param["query"]), "\n".join( [ "content-type:" + request_param["content_type"], "host:" + request_param["host"], "x-content-sha256:" + x_content_sha256, "x-date:" + x_date, ] ), "", signed_headers_str, x_content_sha256, ] ) hashed_canonical_request = hash_sha256(canonical_request_str) credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"]) string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request]) k_date = hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date) k_region = hmac_sha256(k_date, credential["region"]) k_service = hmac_sha256(k_region, credential["service"]) k_signing = hmac_sha256(k_service, "request") signature = hmac_sha256(k_signing, string_to_sign).hex() sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format( credential["access_key_id"] + "/" + credential_scope, signed_headers_str, signature, ) header = {**header, **sign_result} # 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。 r = requests.post("http://{}{}".format(request_param["host"], request_param["path"]), headers=header, params=request_param["query"], data=request_param["body"], ) return r.json() def handler(event, context): print(f"received new request, event content: {event}") print(json.dumps(event, sort_keys=True, indent=4)) uri = event["data"]["events"][0]["tos"]['object']['key'] action = event["data"]["events"][0]["eventName"].split(':')[2] if action in ['Delete', 'DeleteMarkerCreated']: global Preload Preload = False sleep_max_time = 30 updateURL = "" #判断尾部为 index.html 还有尾部为 '/'的uri #尾部为 index.html 的文件需要额外加上一个刷新 cdnurl+index.html 之前的部分 #尾部为 '/' 的文件需要排除,不需要刷新目录(实际上是文件刷新)预热目录会失败 if uri.endswith('/'): print ("{}{} dont need refresh and preload!".format(cdnURLs[0],uri)) refresh_response_body = "{}{} dont need refresh and preload!".format(cdnURLs[0],uri) result = { 'statusCode': 200, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': refresh_response_body }) } return result for cdnURL in cdnURLs: if updateURL == "": updateURL = updateURL + cdnURL + uri else: updateURL = updateURL + "\n" + cdnURL + uri if uri == 'index.html': updateURL = updateURL + "\n" + cdnURL sleep_max_time = 3 elif uri.endswith('/index.html'): updateURL = updateURL + "\n" + cdnURL + uri.rsplit('/', 1)[0] + '/' sleep_max_time = 3 refresh_request_body = { "Urls": updateURL, "Type": "file", } preload_request_body = { "Urls": updateURL, } #结果模板,默认失败 result = { 'statusCode': 400, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': "刷新失败" }) } #打印刷新的 urls print (refresh_request_body['Urls']) time.sleep(random.SystemRandom().randint(2, sleep_max_time)) #失败重试5次,中间随机 sleep for i in range(5): refresh_response_body = request("POST", {}, {}, AK, SK, "SubmitRefreshTask", refresh_request_body) print(refresh_response_body) if "Error" not in refresh_response_body["ResponseMetadata"]: print("刷新已成功!") result = { 'statusCode': 200, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': refresh_response_body }) } break else: time.sleep(random.SystemRandom().randint(2, 5)) print ('start refresh retry {}...'.format(i+1)) #不进行预热直接返回刷新结果 if not Preload: print(result) return result else: print("进行预热!") #预热处理 result = { 'statusCode': 400, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': "预热失败" }) } for i in range(5): preload_response_body = request("POST", {}, {}, AK, SK, "SubmitPreloadTask", preload_request_body) print(preload_response_body) if "Error" not in preload_response_body["ResponseMetadata"]: print("预热已成功") result = { 'statusCode': 200, 'headers': { 'Content-Type': 'application/json' }, 'body': json.dumps({ 'message': preload_response_body }) } print(result) if result['statusCode'] == 200: return result else: time.sleep(random.SystemRandom().randint(2, 5)) print ('start preload retry {}...'.format(i+1))