You need to enable JavaScript to run this app.
导航

通过 Java 连接实例

最近更新时间2024.04.07 19:15:43

首次发布时间2023.08.24 17:53:29

Elasticsearch 官方和社区推出了各个语言版本的 SDK,以方便用户使用。本文介绍如何使用 Java 语言,通过 Rest High level Client 连接火山引擎 ES 实例,并为您提供示例代码。

准备工作

  • 提前创建火山引擎 ES 实例,并确保为正常运行状态。创建实例的具体操作,请参见创建 ESCloud 实例
  • 您在连接 ES 实例前,请先在实例详情页面获取实例访问地址、实例访问用户。对于需要使用证书连接的 HTTPS 协议实例,还需要下载证书并保存到本地路径。
    • 如果遗忘实例访问用户(admin)的密码,可以选择重置密码。
    • 如果需要使用实例公网地址访问,您可以为实例开启公网访问,然后绑定一个弹性公网 EIP。
      图片
  • 运行 Java 代码的服务器需要提前安装 Java 环境,安装 1.8 或以上版本 JDK;以及安装 3.5 或以上版本 Maven。具体操作,请参见安装JDK安装Maven
  • 确保运行 Java 代码的服务器与火山引擎 ES 实例网络互通。
    • 如果运行 Java 代码的服务器与火山引擎 ES 实例在相同的私有网络 VPC 中,则可以通过实例的私网地址进行连接。
    • 如果运行 Java 代码的服务器在公网环境下,则可以通过实例的公网地址进行连接。具体操作步骤,请参见开启实例公网访问

注意事项

建议 Rest High Level Client 版本和火山引擎 ES 实例的版本保持一致。若您使用相比 ES 实例更高版本的 Rest High Level Client,则可能存在少量请求的兼容性问题。
例如需要访问的 ES 实例版本是 7.10.2,则使用的 Rest High Level Client 客户端版本建议也是 7.10.2。

添加依赖

pom.xml 文件中添加以下依赖。火山引擎 ES 实例和 Rest High Level Client 版本请根据实际情况填写,此处以 7.10.2 版本为例。

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.10.2</version>
</dependency>

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>7.10.2</version>
</dependency>

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.10.2</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>0.10.1</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.3.9</version>
</dependency>

使用证书连接实例

该场景适用于连接访问方式为 HTTPS 的 ES 实例,且需要认证实例的 HTTPS 证书。
示例代码如下:

package com.bytedance.openplatform.imgr.core.client;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.springframework.stereotype.Component;


/**
 * 通过 Rest Hive Level 连接 HTTPS 实例,使用证书。
 */

public class ESClient {

    RestHighLevelClient initClientWithCA(List<String> hosts, int port, String protocol, String caPath, String user, String password) throws Exception {
        HttpHost[] httpHosts = hosts.stream().map(host -> new HttpHost(host, port, protocol)).toArray(HttpHost[]::new);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
        InputStream is = Files.newInputStream(Paths.get(caPath));
        CertificateFactory factory = CertificateFactory.getInstance("X.509");
        Certificate trustedCa = factory.generateCertificate(is);
        KeyStore trustStore = KeyStore.getInstance("pkcs12");
        trustStore.load(null, null);
        trustStore.setCertificateEntry("ca", trustedCa);
        SSLContextBuilder sslContextBuilder = SSLContexts.custom()
                .loadTrustMaterial(trustStore, null);
        SSLContext sslContext = sslContextBuilder.build();
        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setDefaultCredentialsProvider(provider)
                    .setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build())
                    //keepalive
                    .setKeepAliveStrategy((httpResponse, httpContext) -> TimeUnit.MINUTES.toMillis(2))
                    .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE));
            return httpClientBuilder;
        });
        //设置相关超时时间。
        restClientBuilder.setRequestConfigCallback(builder -> builder
                .setConnectionRequestTimeout(5000)
                .setSocketTimeout(60000)
                .setConnectTimeout(5000));
        return new RestHighLevelClient(restClientBuilder);
    }

    // 配置 ES 实例访问地址、登录鉴权用户,以及 HTTPS 证书信息。
   public static void main(String[] args) throws Exception {
        ESClient esClientBak = new ESClient();
       RestHighLevelClient restHighLevelClient = esClientBak.initClientWithCA(
                Arrays.asList("{实例访问地址}"), 9200,
               "https", "{证书保存路径}",
               "{实例访问用户}", "{用户密码}");
        ClusterHealthRequest request = new ClusterHealthRequest();
        try {
            ClusterHealthResponse health = restHighLevelClient.cluster().health(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
           e.printStackTrace();
        }

       /**
        * 创建索引
        */
       // 创建索引,设置索引名称、分配分片数和副本数。
       CreateIndexRequest createIndexRequest = new CreateIndexRequest("{索引名称}");
       createIndexRequest.settings(Settings.builder()
               .put("index.number_of_shards", 3)
               .put("index.number_of_replicas", 1));

       // 创建索引时设置mapping字段映射,按需设置字段信息。
       HashMap<String, Object> properties = new HashMap<String, Object>();
       properties.put("name",
               Collections.singletonMap("type", "keyword"));
       properties.put("age",
               Collections.singletonMap("type", "integer"));

       HashMap<String, Object> mapping = new HashMap<String, Object>();
       mapping.put("properties", properties);

       createIndexRequest.mapping(mapping);

       // 创建索引请求
       CreateIndexResponse createIndexResponse = restHighLevelClient.indices()
               .create(createIndexRequest, RequestOptions.DEFAULT);
       System.out.println("isAcknowledged=" + createIndexResponse.isAcknowledged() +
               "\nshards_acknowledged=" + createIndexResponse.isShardsAcknowledged() +
               "\nindex=" + createIndexResponse.index());
       restHighLevelClient.close();
   }

}

运行程序,返回如下类似信息:

isAcknowledged=true
shards_acknowledged=true
index=custom-index

忽略证书连接实例

该场景适用于连接访问方式为 HTTPS 或 HTTP 的两种 ES 实例,连接过程中忽略 HTTPS 证书。
示例代码如下:

package com.bytedance.openplatform.imgr.core.client;

import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.springframework.stereotype.Component;


/**
 * 通过 Rest Hive Level 连接 HTTPS 实例,忽略证书。
 */

public class ESClient {

    RestHighLevelClient initClientSkipVerification(List<String> hosts, int port, String protocol, String user, String password)
            throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        HttpHost[] httpHosts = hosts.stream().map(host -> new HttpHost(host, port, protocol)).toArray(HttpHost[]::new);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (x509Certificates, s) -> true).build();
        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setDefaultCredentialsProvider(provider)
                    .setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build())
                    //keepalive
                    .setKeepAliveStrategy((httpResponse, httpContext) -> TimeUnit.MINUTES.toMillis(2))
                    .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE));
            return httpClientBuilder;
        });
        //设置相关超时时间。
        restClientBuilder.setRequestConfigCallback(builder -> builder
                .setConnectionRequestTimeout(5000)
                .setSocketTimeout(60000)
                .setConnectTimeout(5000));
        return new RestHighLevelClient(restClientBuilder);
    }


    // 配置 ES 实例访问地址、登录鉴权用户。
   public static void main(String[] args) throws Exception {
        ESClient esClientBak = new ESClient();
       RestHighLevelClient restHighLevelClient = esClientBak.initClientSkipVerification(
                Arrays.asList("{实例访问地址}"), 9200, "https",
               "{实例访问用户}", "{用户密码}");
        ClusterHealthRequest request = new ClusterHealthRequest();
        try {
            ClusterHealthResponse health = restHighLevelClient.cluster().health(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
           e.printStackTrace();
        }

       /**
        * 创建索引
        */
       // 创建索引,设置索引名称、分配分片数和副本数。
       CreateIndexRequest createIndexRequest = new CreateIndexRequest("{索引名称}");
       createIndexRequest.settings(Settings.builder()
               .put("index.number_of_shards", 3)
               .put("index.number_of_replicas", 1));

       // 创建索引时设置mapping字段映射,按需设置字段信息。
       HashMap<String, Object> properties = new HashMap<String, Object>();
       properties.put("name",
               Collections.singletonMap("type", "keyword"));
       properties.put("age",
               Collections.singletonMap("type", "integer"));

       HashMap<String, Object> mapping = new HashMap<String, Object>();
       mapping.put("properties", properties);

       createIndexRequest.mapping(mapping);

       // 创建索引请求。
       CreateIndexResponse createIndexResponse = restHighLevelClient.indices()
               .create(createIndexRequest, RequestOptions.DEFAULT);
       System.out.println("isAcknowledged=" + createIndexResponse.isAcknowledged() +
               "\nshards_acknowledged=" + createIndexResponse.isShardsAcknowledged() +
               "\nindex=" + createIndexResponse.index());
       restHighLevelClient.close();
   }

}

运行程序,返回如下类似信息:

isAcknowledged=true
shards_acknowledged=true
index=custom-index