You need to enable JavaScript to run this app.
日志服务

日志服务

复制全文
Go SDK
通过 Go SDK 导入 Kafka 数据
复制全文
通过 Go SDK 导入 Kafka 数据

日志服务支持通过 SDK 导入 Kafka 数据。本文档通过示例代码演示如何通过 Go SDK 从 Kafka 导入数据到日志服务。

前提条件

  • 已安装日志服务 Go SDK。更多信息,请参见安装 Go SDK

  • 已执行以下命令安装 proto 依赖包。

    go get -u github.com/gogo/protobuf/proto
    
  • 已添加 VOLCENGINE_ACCESS_KEY_ID 等环境变量。环境变量的配置方式请参考配置身份认证信息

    注意

    推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。

场景说明

本文档通过示例代码演示如何通过 GO SDK 在指定 Project 中创建 Kafka 数据导入任务。相关的功能介绍、注意事项、费用说明,请参考从 Kafka 导入数据

示例代码

package demo

import (
    "github.com/volcengine/volc-sdk-golang/service/tls"
    "os"
)

func CreateImportKafkaTaskDemo() (*tls.CreateImportTaskResponse, error) {
    // 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455。
    // 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空。
    client := tls.NewClient(os.Getenv("VOLCENGINE_ENDPOINT"), os.Getenv("VOLCENGINE_ACCESS_KEY_ID"), os.Getenv("VOLCENGINE_ACCESS_KEY_SECRET"), os.Getenv("VOLCENGINE_TOKEN"), os.Getenv("VOLCENGINE_REGION"))
    //创建数据导入任务。
    //CreateImportTask API 的请求参数说明请参考https://www.volcengine.com/docs/6470/1333757。
    createImportTaskResp, err := client.CreateImportTask(&tls.CreateImportTaskRequest{
       TopicID:     "fa**",
       TaskName:    "task-name-1",
       SourceType:  "kafka",
       Description: "this is a task",
       ImportSourceInfo: &tls.ImportSourceInfo{
          KafkaSourceInfo: &tls.KafkaSourceInfo{
             Host:              "kafka-cnngc7an0qpv****.kafka.ivolces.com:9092",
             Group:             "mygroup",
             Topic:             "test",
             Encode:            "UTF-8",
             Password:          "",
             Protocol:          "",
             Username:          "",
             Mechanism:         "",
             InstanceID:        "",
             IsNeedAuth:        "false",
             InitialOffset:     0,
             TimeSourceDefault: 0,
          },
       },
       TargetInfo: &tls.TargetInfo{
          Region:  "cn-shanghai",
          LogType: "json_log",
          ExtractRule: &tls.ImportExtractRule{
             TimeZone:         "GMT",
             SkipLineCount:    0,
             TimeExtractRegex: "",
             ExtractRule: &tls.ExtractRule{
                Quote: "\"",
             },
          },
       },
    })

    if err != nil {
       return nil, err
    }

    return createImportTaskResp, nil
}

相关文档

  • 通过 SDK 发送调用 API 的请求以后,您会收到服务端的响应,如果响应中包含 200 以外的状态码,表示接口调用失败。您可以参考各个 API 的文档查看对应的错误码信息。

  • 数据导入任务相关的 API 接口如下。您可以参考 API 文档查看对应接口的详细信息。

    API

    说明

    CreateImportTask

    调用接口 CreateImportTask 创建数据导入任务。

    DeleteImportTask

    调用接口 DeleteImportTask 删除指定的数据导入任务。

    ModifyImportTask

    调用接口 ModifyImportTask 修改指定的数据导入任务。

    DescribeImportTask

    调用接口 DescribeImportTask 获取某个数据导入任务的信息。

    DescribeImportTasks

    调用接口 DescribeImportTasks 获取数据导入任务列表。

最近更新时间:2024.12.30 10:18:04
这个页面对您有帮助吗?
有用
有用
无用
无用