多模态场景下提供大模型的深度思考能力
使用具备深度思考能力的模型,对图片、视频或文本进行分析理解,并返回结构化文本输出。算子会自动构建符合多模态模型规范的 message 结构,用户只需按约定提供图片 / 视频 / 文本数据即可完成推理。
(同一个字段中不允许同时混用字符串和列表两种类型)
该算子(ArkLLMThinkingVision)在 Daft 0.6.5(及其以下版本),使用方式与 0.6.14 版本(及其以上版本)不相同。
在 Daft 0.6.14 及其以上版本中,输入时可以指定 images、texts、 videos 字段,分别对应模型的图片、文本、视频输入,支持同时输入。下面是一个示例:
df = df.with_column( "llm_result", las_udf( ArkLLMThinkingVision, construct_args={ "model": "doubao-seed-1.6", "system_text": "你是一个专业的视频理解助手,能够分析视频中的内容并提供详细的描述。", "inference_type": "online", }, )(videos=col("videos"), texts="请用简短的语言描述视频里的内容,返回格式为json。"), )
在 Daft 0.6.5 及其以下版本中,输入时只支持传入视频或图片,不支持同时传入视频和图片,通过multimodal_type指定输入类型,multimodal_type支持传入 "video" 和 "image"、"text"。同时也支持通过指定prompt传入文本信息。下面是一个示例:
df = df.with_column( "llm_result", las_udf( ArkLLMThinkingVision, construct_args={ "model": "doubao-seed-1.6", "multimodal_type": "video", "system_text": "你是一个专业的视频理解助手,能够分析视频中的内容并提供详细的描述。", "prompt": "请用简短的语言描述视频里的内容,返回格式为json。", "inference_type": "online", }, )(col("videos")), )
通过「对话接口」调用模型的方式,特点是以一条条请求为单位发起推理,等待模型输出结果。使用Daft方式,可以将一批请求并行处理,提高推理效率。
说明
Chat 模式和 Job 模式的对比说明请参见:选择调用方式。
输入列名 | 说明 |
|---|---|
images | 传入待处理的图片数据。支持传入单张图片(string)或多张图片(list),数据来源由参数 source_type 控制:url 模式支持 http/https/tos/s3 等协议地址,其中 tos/s3 会自动转换为预签名 URL;base64 模式使用图片的 Base64 编码;binary 模式会将二进制数据自动转换为 Base64 编码。 |
videos | 传入待处理的视频数据。支持传入单条视频(string)或多条视频(list),数据来源由参数 source_type 控制:url 模式支持 http/https/tos/s3 等协议地址,其中 tos/s3 会自动转换为预签名 URL;base64 模式使用视频的 Base64 编码;binary 模式会将二进制数据自动转换为 Base64 编码。 |
texts | 传入用户文本提示词。支持传入单条文本(string)或多条文本(list)。 |
默认情况下,返回字段类型为 struct,包含以下字段:
当环境变量 LAS_LLM_FINISH_REASON_CHECK=true 时,返回 struct 中会额外包含以下字段:
当环境变量 LAS_LLM_BOTS_REFERENCES=true 时,返回 struct 中会额外包含以下字段:
两个环境变量可以同时开启,此时输出 struct 中会同时包含上述字段。
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
model | str | 模型名称 支持的模型有: 豆包模型和 DeepSeek 模型。例如:doubao-seed-1.6。 | |
version | str or None | 模型版本 输入模型对应的版本信息。例如:250115。 | |
inference_type | str | batch | 推理类型,支持在线推理和批量推理。默认值为 batch,即采用批量推理 - online: 采用方舟平台提供的在线推理模块进行推理 - batch:采用方舟平台提供的批量推理模块进行推理 |
system_text | str or None | 系统提示内容 系统提示内容,以 system 角色作为模型的输入,用于统一约束模型行为。 | |
system_image_url | str or None | 系统图片 URL 图文混排场景下,输入系统图片 URL,用于指导模型的行为。 | |
system_video_url | str or None | 系统视频 URL 图文混排场景下,输入系统视频 URL,用于指导模型的行为。 | |
image_format | str | jpeg | 图片编码格式 默认 jpeg。支持格式: JPEG、PNG、WEBP、GIF、BMP、TIFF 等常见格式。详细格式请参考 火山引擎方舟-图片理解 帮助文档 |
image_url_detail | str or None | 图片质量 支持手动设置图片的质量,取值范围 high、low、auto。 - high:高细节模式,适用于需要理解图像细节信息的场景,如多个局部信息/特征提取、复杂/丰富细节图像理解等,理解更全面。 - low:低细节模式,适用于简单图像分类/识别、整体内容理解/描述等场景,理解更快速。 - auto:默认模式,不同模型选择的模式略有不同,具体请参见官方文档。 | |
video_format | str | mp4 | 视频编码格式 配置视频格式,默认是 mp4。支持的视频格式:MP4、AVI、MOV。单视频文件需在 50MB 以内。 |
video_fps | float or None | 视频帧率 取值范围:[0.2, 5]。默认值 1。每秒钟从视频中抽取指定数量的图像。取值越高,对于视频中画面变化理解越精细;取值越低,对于视频中画面变化感知减弱,但是使用的 token 花费少,速度也更快。 | |
source_type | str | url | 数据来源类型 指定媒体数据的来源格式,默认 url。可选值: - binary: 原始二进制数据 - base64: Base64 编码数据 - url: 网络资源地址(支持 http/https/tos/s3),其中 tos/s3 会自动转换为预签名 URL |
max_tokens | int or None | 模型回复最大长度(单位 token),输入输出总长度受模型上下文限制。 | |
max_completion_tokens | int or None | 模型生成的 token 数量的上限,包含思维链内容(reasoning_content)与回答内容(content),不包含传入的信息(messages)。 超出后,停止模型输出思维链内容及模型回答,并返回 finish_reason 字段为 length。 | |
stop | list or None | 停止词列表 模型遇到 stop 字段所指定的字符串时将停止继续生成,这个字符串本身不会输出。 最多支持 4 个字符串。例如 ["你好", "天气"]。 | |
frequency_penalty | float | 0 | 频率惩罚系数 频率惩罚系数。如果值为正,会根据新 token 在文本中的出现频率对其进行惩罚,从而降低模型逐字重复的可能性。 取值范围 [-2.0, 2.0],默认 0。 |
presence_penalty | float | 0 | 存在惩罚系数 存在惩罚系数。如果值为正,会根据新 token 到目前为止是否出现在文本中对其进行惩罚,从而增加模型谈论新主题的可能性。 取值范围为 [-2.0, 2.0]。默认值 0。 |
temperature | float | 1 | 采样温度 采样温度控制了生成文本时对每个候选词的概率分布进行平滑的程度。 - 当取值为 0 时模型仅考虑对数概率最大的一个 token。 - 较高的值(如 0.8)会使输出更加随机,而较低的值(如 0.2)会使输出更加集中确定。 通常建议仅调整 temperature 或 top_p 其中之一,不建议两者都修改。取值范围为 [0, 2],默认值 1。 |
top_p | float | 0.7 | 核采样概率阈值 核采样概率阈值。模型会考虑概率质量在 top_p 内的 token 结果。 当取值为 0 时模型仅考虑对数概率最大的一个 token。 0.1 意味着只考虑概率质量最高的前 10% 的 token,取值越大生成的随机性越高,取值越低生成的确定性越高。 默认值 0.7。 |
logit_bias | dict or None | 调整指定 token 在模型输出内容中出现的概率,使模型生成的内容更加符合特定的偏好。 logit_bias 字段接受一个 map 值,其中每个键为词表中的 token ID(使用 tokenization 接口获取),每个值为该 token 的偏差值,取值范围为 [-100, 100]。 -1 会减少选择的可能性,1 会增加选择的可能性;-100 会完全禁止选择该 token,100 会导致仅可选择该 token。 该参数的实际效果可能因模型而异。 | |
tools | list or None | 工具调用配置 待调用工具的列表,模型返回信息中可包含。当您需要让模型返回待调用工具时,需要配置该结构体。 | |
thinking_type | str or None | 思考模式 控制模型是否开启深度思考模式。不配置时,采用深度思考模式,可以手动关闭。可选值: - enabled:开启思考模式,模型一定先思考后回答。 - disabled:关闭思考模式,模型直接回答问题,不会进行思考。 - auto:自动思考模式,模型根据问题自主判断是否需要思考,简单题目直接回答。 | |
llm_config | dict or None | 自定义 LLM 配置 除了上述参数,其他参数会透传给模型。上述参数会覆盖 llm_config 中的值。 | |
request_timeout | int | 1200 | 超时时间 单次请求的超时时间(秒)。 |
max_concurrency | int | 100 | 并发数 每个进程的最大并发数。 |
下面的代码展示了如何使用 daft 访问火山方舟 多模态深度思考模型(豆包系列)进行批量推理。请注意每次大模型推理结果可能不同。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.ark_llm.ark_llm_thinking_vision import ArkLLMThinkingVision from daft.las.functions.udf import las_udf if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(min_cpu_per_task=0) if __name__ == "__main__": # 需配置环境变量 LAS_API_KEY : LAS_API_KEY 通过在 LAS 服务页面上创建获取 tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "videos": [ f"https://{tos_dir_url}/public/shared_video_dataset/eating_56.mp4", ] } df = daft.from_pydict(samples) df = df.with_column( "llm_result", las_udf( ArkLLMThinkingVision, construct_args={ "model": "doubao-1.5-thinking-vision-pro", "system_text": "你是一个专业的视频理解助手,你需要根据视频内容回答用户的问题。", "inference_type": "online", }, )(videos=col("videos")), ) df = df.with_column("reasoning_content", col("llm_result").struct.get("reasoning_content")) df = df.with_column("llm_result", col("llm_result").struct.get("llm_result")) df.show() # 输出(每次大模型推理结果可能不同) # ╭────────────────────────────────┬────────────────────────────────────────────┬────────────────────────────────────────────╮ # │ videos ┆ llm_result ┆ reasoning_content │ # │ --- ┆ --- ┆ --- │ # │ Utf8 ┆ Utf8 ┆ Utf8 │ # ╞════════════════════════════════╪════════════════════════════════════════════╪════════════════════════════════════════════╡ # │ https://las-public-data-qa.to… ┆ 视频中呈现的是一段动画内容:起初展示的是一 ┆ 用户现在需要描述视频里的内容。首先看画面: │ # │ ┆ 个**多层卡通风… ┆ 开头是一个多层蛋… │ # ╰────────────────────────────────┴────────────────────────────────────────────┴────────────────────────────────────────────╯
该接口用于将数据组装为模型需要的请求格式 JSONL 文件,并将其上传至火山引擎对象存储(TOS)上。采用豆包模型执行批量推理任务,该批量推理任务采用job模式。
说明
Chat 模式和 Job 模式的对比说明请参见:选择调用方式。
输入列名 | 说明 |
|---|---|
model | 要使用的火山方舟提供的大模型名称,例如doubao-seed-1.6、doubao-seed-1.8 等。 |
version | 要使用的模型版本号。例如 doubao-seed-1.8 的版本号为 251228 |
input_dir | 原始输入数据的路径,用于回调脚本进行结果回填。当任务完成后,该路径下的数据会与推理结果进行左连接。必须是 tos:// 或 s3:// 路径。 |
output_dir | 推理结果的输出目录. 必须是 tos:// 或 s3:// 开头的对象存储路径。推理结果中间文件都将存放于此。 |
primary_column_name | DataFrame中的主键列名。此列的值将用作推理记录的主键,用于后续结果的回溯和关联。该列值在 DataFrame 中应保持唯一。 |
image_column_name | DataFrame中的图像列名。此列的值将用作推理记录的 image 字段,用于模型处理多模态输入。 |
video_column_name | DataFrame中包含视频数据(URL 或二进制数据)的列名。 |
text_column_name | DataFrame 中包含文本 prompt 的列名。 |
message_column_name | DataFrame 中包含完整 messages 列表的列名。如果提供此参数,将忽略 image/video/text_column_name,直接使用该列构造请求。 |
返回写入TOS上的 JSONL 文件路径列表。该jsonl文件包含了批量推理的输入数据。当JSONL文件生成后,会将文件提交至豆包模型做批量推理,同时在 AI 数据湖服务中任务管理中会有模型推理对应的任务及其进度。
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
output_dir | str | 推理任务的总输出目录,必须是 tos:// 或 s3:// 开头的对象存储路径。推理结果和中间文件都将存放于此。 | |
primary_column_name | str | DataFrame中的主键列名。此列的值将用作推理记录的 custom_id,用于后续结果的回溯和关联。该列值在 DataFrame 中应保持唯一。 | |
model | str | 要使用的火山方舟提供的大模型名称,例如 doubao-seed-1.6、doubao-seed-1.8 等。 | |
version | str | 模型版本号。例如 doubao-seed-1.8 的版本号为 251228 | |
input_dir | str | None | None |
batch_inference_dataset_dir | str | None | None |
input_format | str | parquet | input_dir 中原始数据的格式,用于回调脚本读取。支持 parquet, csv, json。建议使用 parquet 格式。 因为复杂类型的列在 csv/json 中可能会导致解析错误。 |
llm_column_name | str | llm_result | 在最终输出结果中,存放模型推理内容的列名。 |
finish_reason_column_name | str | None | finish_reason |
image_column_name | str | None | None |
video_column_name | str | None | None |
text_column_name | str | None | None |
message_column_name | str | None | None |
max_tokens | int | None | None |
max_completion_tokens | int | None | None |
stop | list[str] | None | None |
frequency_penalty | float | None | None |
presence_penalty | float | None | None |
temperature | float | None | None |
top_p | float | None | None |
logit_bias | dict | None | None |
tools | list[dict] | None | None |
llm_config | dict | None | None |
num_batches | int | None | None |
completion_window | str | 方舟批量推理服务的任务完成窗口期,例如 "1d" 表示 1 天。 | |
repartition | int | None | None |
kwargs | dict | 其他任意透传给内部 ArkLLMVisionUnderstanding UDF 的参数,例如 system_text 等。 |
下面的代码展示了如何使用 submit_ark_batch_inference_job 接口对 Daft DataFrame 进行批量推理。请注意,每次大模型推理结果可能不同。
from __future__ import annotations import logging import daft from daft.io import IOConfig from daft.las.io.tos import TOSConfig logging.basicConfig(level=logging.INFO) # --- 1. 准备:执行任务时,需要根据实际情况 在LAS平台设置以下环境变量--- # LAS_API_KEY # ACCESS_KEY # SECRET_KEY # TOS_ENDPOINT # --- 2. 定义输入输出路径 --- # 注意:路径必须是 TOS 或 S3 路径 # 根据实际情况配置对象存储桶名称 bucket_name = "bucket_name" # 配置原始数据的路径。该路径存储需要用豆包模型进行推理的原始数据(只支持parquet/csv/json的存储格式,建议使用parquet格式)。若该文件不存在,也可以按照后面说的方式创建: input_data_path = f"s3://{bucket_name}/doubao_test_job_path/original_data.parquet" # 配置推理结果的输出目录。该目录存储将推理结果合并后的文件。即在input_data_path文件中追加推理结果并保存到新文件中 output_dir = f"tos://{bucket_name}/doubao_test_job_path/to/inference_output/" # 配置批量推理数据集的输出目录。`batch_inference_dataset_dir` 若不指定,默认为 {output_dir}/batch_inference_dataset。该路径存储豆包模型返回包含推理结果的jsonl文件 batch_inference_dataset_dir = f"tos://{bucket_name}/doubao_test_job_path/to/batch_inference_dataset" io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) # --- 3. 构造 Daft DataFrame --- # ---- 若input_data_path路径不存在,下面提供简单的创建方法。否则,跳过下面步骤,直接读取该文件。 # 假设我们有一个包含ID和文本提示的 DataFrame data = { "request_id": [f"req_{i}" for i in range(10)], "prompt": [f"写一个关于数字 {i} 的短故事" for i in range(10)], } df = daft.from_pydict(data) df.write_parquet(input_data_path.replace("tos://", "s3://"), io_config=io_config) # 读取input_data_path文件 df = daft.read_parquet(input_data_path, io_config=io_config) # --- 4. 调用接口提交批量推理任务 --- generated_files = df.submit_ark_batch_inference_job( output_dir=output_dir, primary_column_name="request_id", model="doubao-seed-1-8", version="251228", # 定义输入数据列 text_column_name="prompt", # 定义回调所需信息 input_dir=input_data_path, input_format="parquet", # 定义LLM推理参数 max_tokens=256, temperature=0.7, # (可选) 性能参数:将10条数据分成2个文件进行推理 # num_batches=2, # (可选) 回调后的重分区:将最终结果合并成一个文件 repartition=1, completion_window="2d", ) print(f"成功生成并提交了 {len(generated_files)} 个输入文件:") for file_path in generated_files: print(f"- {file_path}")