当前的DataLeap模板任务存在以下2个问题:
使用Shell任务的模板任务能力,底层依赖MinIO作为共享存储,实现多种类型的模板任务。原理:
说明
MinIO相当于一个共享存储系统,类似S3存储,以bucket(桶)的方式管理文件,一个bucket可以理解成一个文件夹,可使用wget命令进行资源下载
已编写好一个Python脚本,例如用于调用API查询数据,并将查询结果写入ByteHouse。各个API的查询方式极为相似,且已沉淀出一个Python脚本模板。
在DataLeap上,单纯的Python任务需要第三方依赖包(例如requests、ByteHouse依赖等),如何应用模板任务的能力,构建一个可复用的模板任务。
如下图所示:
可实现的效果:
# Step1.1 使用minibase用户登录服务器(或使用root登录后切换至minibase用户,命令:su - minibase) # Step1.2 进入DataLeap的部署节点(DataLeap每个部署节点默认都有MinIO的客户端) mc mb minibase/{your_bucket_name} # mc,表示MinIO Client,MinIO客户端 # mb,创建bucket,创建桶(相当于创建一个文件夹) # {your_bucket_name}替换为实际的bucket名称 # minibase/{your_bucket_name},其中minibase是DataLeap部署时自带的一级目录,在此基础上创建{your_bucket_name}的二级目录,用户需根据情况自行修改。 #例子:mc mb minibase/pythonscript # Step1.3 通过此命令,将刚刚创建的minibase/{your_bucket_name} 桶设置为公开可访问状态,任何人都可以读取该桶中的对象 mc policy set public minibase/{your_bucket_name} #**policy set 是mc命令中用于设置策略的子命令组合** #**public 是一种预定义的策略,表示公开访问权限** #例子:mc policy set public minibase/pythonscript # Step1.4 将用户本地的依赖(例如requests依赖包、Python模板文件)上传至Step 1.1中的服务器。这里假设,上传的文件名是your_file_name1.tar.gz, your_file_name2.py # Step1.5 将刚刚上传至服务器的文件,上传至刚刚创建的MinIO的bucket中 mc cp your_file_name1.tar.gz minibase/{your_bucket_name} mc cp your_file_name2.py minibase/{your_bucket_name} #例子:mc cp copy.py minibase/pythonscript #例子:mc cp api_to_bytehouse.tar.gz minibase/pythonscript # Step1.6 完成上传后,列出上传文件列表,查看是否成功上传: mc ls minibase/{your_bucket_name} #例子:mc ls minibase/pythonscript
进入DataLeap,创建Shell模板
# {your_bucket_name}需要替换为实际的bucket名称,例如zhanglitest # {your_python_file.py}需要替换为实际的Python模板文件,例如call_api.py # {tar_foler_name}将MinIO下载的文件,下载到哪一个本地目录,例如 packages # {your_python_packages_name.tar.gz}Python依赖包的名称,例如requests # 从MinIO下载安装包 wget minio-service.minio:9000/{your_bucket_name}/{your_python_packages_name.tar.gz} wget minio-service.minio:9000/{your_bucket_name}/{your_python_file.py} # 解压tar包 mkdir {tar_foler_name} tar -xzvf {your_python_packages_name.tar.gz} -C ./{tar_foler_name} # 安装依赖 pip3 install --no-index --find-links=./{tar_foler_name} {your_python_packages_name.tar.gz} # 调用Python模板任务,调用时,后面可传参,参数建议用双引号包裹,其中$1表示向Shell模板中传参 python3 {your_python_file.py} $0 $1 $2
例子:
# {your_bucket_name}需要替换为实际的bucket名称,例如 # {your_python_file.py}需要替换为实际的Python模板文件,例如copy.py api_to_bytehouse.tar.gz # {tar_foler_name}将MinIO下载的文件,下载到哪一个本地目录,例如 packages # {your_python_packages_name}Python依赖包的名称,例如requests # 从MinIO下载安装包 wget minio-service.minio:9000/minibase/pythonscript/api_to_bytehouse.tar.gz wget minio-service.minio:9000/minibase/pythonscript/api_to_bytehouse_template.py # 解压tar包 mkdir api_to_bytehouse tar -xzvf api_to_bytehouse.tar.gz -C ./api_to_bytehouse # 安装依赖 pip3 install --no-index --find-links=./api_to_bytehouse clickhouse-connect requests # 调用Python模板任务,调用时,后面可传参,参数建议用双引号包裹,其中$1表示向Shell模板中传参 python3 pythonscript/api_to_bytehouse_template.py --pagesize 10 --bukrs X940 --url https://api.dsp.sinopec.com/prod/Z2_ZRJRYSP00/Z2_WLPZHXM00 --ak 5aeabea481684159 --sk 4789d3476c98467ca51fc2fbe8fdd1b6 --ByteHouseTableName TRADING.FUEL_Z2_KJPZTT
创建Shell任务后,选择引用模板。
注意
这里入参信息是指Shell模板的入参。
"\"https://www.baidu.com"\" "get" "${date}${DATE}${date-10}"引用了模板的Shell任务,依然可以执行调试操作。
调试成功后,您可继续进行后续的配置调度、提交发布等操作。
MinIO中的文件更新后,相关的Shell任务将立即受到影响,建议在更新MinIO中的文件时谨慎测试。完成MinIO文件更新后,应对受影响的任务进行测试
若Shell任务引用了最新版本,当用户修改、保存Shell模板任务时,引用了最新版本的Shell任务会立即使用最新的版本。当然此种方式影响范围较大,建议充分测试再更新模板。
建议压缩为tar.gz文件(而不是zip文件),因为DataLeap的Shell任务内可以解压tar.gz文件,无法解压zip文件
1、将Python的requests依赖包打成压缩包,上传MinIO 2、编写Shell模板 3、创建Shell任务,引用模板,调用Python模板任务
# 压缩文件命令 tar -czvf requests.tar.gz certifi-2024.12.14-py3-none-any.whl charset_normalizer-3.4.1-py3-none-any.whl idna-3.10-py3-none-any.whl requests-2.31.0-py3-none-any.whl urllib3-2.0.7-py3-none-any.whl
import sys import requests import json import time def call_api(url, method, data): # 发送请求 print("url:", url) print("method:", method) print("data:", data) response = requests.get(url) # 检查响应状态码 if response.status_code == 200: print("成功访问百度首页!") # 打印网页内容(前500字符) print(response.text[:500]) # 输出前500个字符 else: print("请求失败,状态码:", response.status_code) def main(): # 获取传递的参数 print("脚本名称:", sys.argv[0]) # 脚本文件名 # 从第一个索引开始获取传递的参数 for i, arg in enumerate(sys.argv[1:], 1): print(f"参数{i}: {arg}") call_api("https://www.baidu.com", 'GET', '123') if __name__ == "__main__": main()
在创建的Shell模板中,编辑以下Shell模板脚本:
# 从MinIO下载安装包 wget minio-service.minio:9000/zhanglitest/requests.tar.gz wget minio-service.minio:9000/zhanglitest/call_api.py # 显示当前路径下的文件 ls -l # 解压tar包 mkdir requests tar -xzvf requests.tar.gz -C ./requests # 安装依赖 pip3 install --no-index --find-links=./requests requests # 调用Python模板任务,传参 python3 call_api.py $1 $2 $3
Shell模板创建完成,并保存后,切换进入数据开发页签,并打开创建好的Shell任务,进行模板引用:
模板引用完成后,单击界面上方工具栏中的调试按钮,进行任务调试,等待调试任务执行完成,并在下方调试记录中,查看对应的执行日志信息: