You need to enable JavaScript to run this app.
导航

快速开始

最近更新时间2023.08.18 19:13:49

首次发布时间2023.08.18 19:13:49

本文介绍如何快速使用 Volcengine Java SDK 实现基础的 Kafka 实例资源管理流程,包括创建实例、创建 Topic等操作。

前提条件

  • 已安装 Volcengine Java SDK。更多信息,请参见安装 Java SDK
  • 已创建并获取火山引擎访问密钥 AccessKey。访问密钥 AccessKey 拥有所有 API 的全部权限。建议您通过 IAM 用户进行 API 相关操作和日常运维。使用 IAM 用户前,主账号需要为 IAM 用户授予消息队列 Kafka版相关资源和操作的权限。

示例代码

创建实例

通过 Volcengine Java SDK 调用消息队列 Kafka版 V2 API CreateInstance 的示例代码如下。

package com.volcengine.kafka.examples;

import com.volcengine.ApiClient;
import com.volcengine.ApiException;
import com.volcengine.kafka.KafkaApi;
import com.volcengine.kafka.model.*;
import com.volcengine.sign.Credentials;


public class TestKafka {
    public static void main(String[] args) throws Exception {
        String ak = "Your AK";
        String sk = "Your SK";
        String region = "cn-beijing";

        ApiClient apiClient = new ApiClient()
                .setCredentials(Credentials.getCredentials(ak, sk))
                .setRegion(region);

        KafkaApi api = new KafkaApi(apiClient);

        ChargeInfoForCreateInstanceInput chargeInfo = new ChargeInfoForCreateInstanceInput();
        chargeInfo.setChargeType("PrePaid");
        chargeInfo.setPeriod(1);
        chargeInfo.setPeriodUnit("Month");
        chargeInfo.setAutoRenew(true);

        CreateInstanceRequest createInstanceRequest = new CreateInstanceRequest();
        createInstanceRequest.setZoneId("cn-beijing-a");
        createInstanceRequest.setVersion("2.2.2");
        createInstanceRequest.setComputeSpec("kafka.20xrate.hw");
        createInstanceRequest.setVpcId("vpc-rs4yccs57e9sv0x57bf****");
        createInstanceRequest.setSubnetId("subnet-rrps5hvr1bswv0x58fp****");
        createInstanceRequest.setUserName("kafka2001");
        createInstanceRequest.setUserPassword("Test@123456");
        createInstanceRequest.setChargeInfo(chargeInfo);

        try {
            CreateInstanceResponse response = api.createInstance(createInstanceRequest);
            System.out.println(response);
        } catch (ApiException e) {
            System.out.println(e.getResponseBody());
        }
    }
}

查询实例列表

通过 Volcengine Java SDK 调用消息队列 Kafka版 V2 API DescribeInstances 的示例代码如下。

package com.volcengine.kafka.examples;

import com.volcengine.ApiClient;
import com.volcengine.ApiException;
import com.volcengine.kafka.KafkaApi;
import com.volcengine.kafka.model.DescribeInstancesRequest;
import com.volcengine.kafka.model.DescribeInstancesResponse;
import com.volcengine.sign.Credentials;


public class TestKafka {
    public static void main(String[] args) throws Exception {
        String ak = "Your AK";
        String sk = "Your SK";
        String region = "cn-beijing";

        ApiClient apiClient = new ApiClient()
                .setCredentials(Credentials.getCredentials(ak, sk))
                .setRegion(region);

        KafkaApi api = new KafkaApi(apiClient);

        DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest();
        describeInstancesRequest.setPageNumber(1);
        describeInstancesRequest.setPageSize(10);


        try {
            DescribeInstancesResponse response = api.describeInstances(describeInstancesRequest);
            System.out.println(response);
        } catch (ApiException e) {
            System.out.println(e.getResponseBody());
        }
    }
}

创建 Topic

通过 Volcengine Java SDK 调用消息队列 Kafka版 V2 API CreateTopic 的示例代码如下。

package com.volcengine.kafka.examples;

import com.volcengine.ApiClient;
import com.volcengine.ApiException;
import com.volcengine.kafka.KafkaApi;
import com.volcengine.kafka.model.*;
import com.volcengine.sign.Credentials;

import java.util.ArrayList;


public class TestKafka {
    public static void main(String[] args) throws Exception {
        String ak = "Your AK";
        String sk = "Your SK";
        String region = "cn-beijing";

        ApiClient apiClient = new ApiClient()
                .setCredentials(Credentials.getCredentials(ak, sk))
                .setRegion(region);

        KafkaApi api = new KafkaApi(apiClient);

        AccessPolicyForCreateTopicInput policy = new AccessPolicyForCreateTopicInput();
        policy.setAccessPolicy("PubSub");
        policy.setUserName("user123");

        ArrayList<AccessPolicyForCreateTopicInput> policies = new ArrayList<>();
        policies.add(policy);

        CreateTopicRequest req = new CreateTopicRequest();
        req.setInstanceId("kafka-cnngbnntswg1****");
        req.setTopicName("my_topic3");
        req.setPartitionNumber(3);
        req.setAllAuthority(false);
        req.setDescription("describe");
        req.setReplicaNumber(3);
        req.setParameters("{\"LogRetentionHours\":\"72\",\"MessageMaxByte\":\"10\",\"MinInsyncReplicaNumber\":\"2\"}");
        req.setAccessPolicies(policies);

        try {
            CreateTopicResponse response = api.createTopic(req);
            System.out.println(response);
        } catch (ApiException e) {
            System.out.println(e.getResponseBody());
        }
    }
}