最近更新时间:2024.04.07 19:15:43
首次发布时间:2023.08.24 17:53:29
Elasticsearch 官方和社区推出了各个语言版本的 SDK,以方便用户使用。本文介绍如何使用 Java 语言,通过 Rest High level Client 连接火山引擎 ES 实例,并为您提供示例代码。
建议 Rest High Level Client 版本和火山引擎 ES 实例的版本保持一致。若您使用相比 ES 实例更高版本的 Rest High Level Client,则可能存在少量请求的兼容性问题。
例如需要访问的 ES 实例版本是 7.10.2,则使用的 Rest High Level Client 客户端版本建议也是 7.10.2。
在 pom.xml
文件中添加以下依赖。火山引擎 ES 实例和 Rest High Level Client 版本请根据实际情况填写,此处以 7.10.2 版本为例。
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.10.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.10.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.10.2</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>0.10.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.9</version> </dependency>
该场景适用于连接访问方式为 HTTPS 的 ES 实例,且需要认证实例的 HTTPS 证书。
示例代码如下:
package com.bytedance.openplatform.imgr.core.client; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyStore; import java.security.cert.Certificate; import java.security.cert.CertificateFactory; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContexts; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.common.settings.Settings; import org.springframework.stereotype.Component; /** * 通过 Rest Hive Level 连接 HTTPS 实例,使用证书。 */ public class ESClient { RestHighLevelClient initClientWithCA(List<String> hosts, int port, String protocol, String caPath, String user, String password) throws Exception { HttpHost[] httpHosts = hosts.stream().map(host -> new HttpHost(host, port, protocol)).toArray(HttpHost[]::new); RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); InputStream is = Files.newInputStream(Paths.get(caPath)); CertificateFactory factory = CertificateFactory.getInstance("X.509"); Certificate trustedCa = factory.generateCertificate(is); KeyStore trustStore = KeyStore.getInstance("pkcs12"); trustStore.load(null, null); trustStore.setCertificateEntry("ca", trustedCa); SSLContextBuilder sslContextBuilder = SSLContexts.custom() .loadTrustMaterial(trustStore, null); SSLContext sslContext = sslContextBuilder.build(); restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.setDefaultCredentialsProvider(provider) .setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build()) //keepalive .setKeepAliveStrategy((httpResponse, httpContext) -> TimeUnit.MINUTES.toMillis(2)) .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE)); return httpClientBuilder; }); //设置相关超时时间。 restClientBuilder.setRequestConfigCallback(builder -> builder .setConnectionRequestTimeout(5000) .setSocketTimeout(60000) .setConnectTimeout(5000)); return new RestHighLevelClient(restClientBuilder); } // 配置 ES 实例访问地址、登录鉴权用户,以及 HTTPS 证书信息。 public static void main(String[] args) throws Exception { ESClient esClientBak = new ESClient(); RestHighLevelClient restHighLevelClient = esClientBak.initClientWithCA( Arrays.asList("{实例访问地址}"), 9200, "https", "{证书保存路径}", "{实例访问用户}", "{用户密码}"); ClusterHealthRequest request = new ClusterHealthRequest(); try { ClusterHealthResponse health = restHighLevelClient.cluster().health(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } /** * 创建索引 */ // 创建索引,设置索引名称、分配分片数和副本数。 CreateIndexRequest createIndexRequest = new CreateIndexRequest("{索引名称}"); createIndexRequest.settings(Settings.builder() .put("index.number_of_shards", 3) .put("index.number_of_replicas", 1)); // 创建索引时设置mapping字段映射,按需设置字段信息。 HashMap<String, Object> properties = new HashMap<String, Object>(); properties.put("name", Collections.singletonMap("type", "keyword")); properties.put("age", Collections.singletonMap("type", "integer")); HashMap<String, Object> mapping = new HashMap<String, Object>(); mapping.put("properties", properties); createIndexRequest.mapping(mapping); // 创建索引请求 CreateIndexResponse createIndexResponse = restHighLevelClient.indices() .create(createIndexRequest, RequestOptions.DEFAULT); System.out.println("isAcknowledged=" + createIndexResponse.isAcknowledged() + "\nshards_acknowledged=" + createIndexResponse.isShardsAcknowledged() + "\nindex=" + createIndexResponse.index()); restHighLevelClient.close(); } }
运行程序,返回如下类似信息:
isAcknowledged=true shards_acknowledged=true index=custom-index
该场景适用于连接访问方式为 HTTPS 或 HTTP 的两种 ES 实例,连接过程中忽略 HTTPS 证书。
示例代码如下:
package com.bytedance.openplatform.imgr.core.client; import java.io.IOException; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.ssl.SSLContextBuilder; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.common.settings.Settings; import org.springframework.stereotype.Component; /** * 通过 Rest Hive Level 连接 HTTPS 实例,忽略证书。 */ public class ESClient { RestHighLevelClient initClientSkipVerification(List<String> hosts, int port, String protocol, String user, String password) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { HttpHost[] httpHosts = hosts.stream().map(host -> new HttpHost(host, port, protocol)).toArray(HttpHost[]::new); RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (x509Certificates, s) -> true).build(); restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.setDefaultCredentialsProvider(provider) .setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build()) //keepalive .setKeepAliveStrategy((httpResponse, httpContext) -> TimeUnit.MINUTES.toMillis(2)) .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE)); return httpClientBuilder; }); //设置相关超时时间。 restClientBuilder.setRequestConfigCallback(builder -> builder .setConnectionRequestTimeout(5000) .setSocketTimeout(60000) .setConnectTimeout(5000)); return new RestHighLevelClient(restClientBuilder); } // 配置 ES 实例访问地址、登录鉴权用户。 public static void main(String[] args) throws Exception { ESClient esClientBak = new ESClient(); RestHighLevelClient restHighLevelClient = esClientBak.initClientSkipVerification( Arrays.asList("{实例访问地址}"), 9200, "https", "{实例访问用户}", "{用户密码}"); ClusterHealthRequest request = new ClusterHealthRequest(); try { ClusterHealthResponse health = restHighLevelClient.cluster().health(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } /** * 创建索引 */ // 创建索引,设置索引名称、分配分片数和副本数。 CreateIndexRequest createIndexRequest = new CreateIndexRequest("{索引名称}"); createIndexRequest.settings(Settings.builder() .put("index.number_of_shards", 3) .put("index.number_of_replicas", 1)); // 创建索引时设置mapping字段映射,按需设置字段信息。 HashMap<String, Object> properties = new HashMap<String, Object>(); properties.put("name", Collections.singletonMap("type", "keyword")); properties.put("age", Collections.singletonMap("type", "integer")); HashMap<String, Object> mapping = new HashMap<String, Object>(); mapping.put("properties", properties); createIndexRequest.mapping(mapping); // 创建索引请求。 CreateIndexResponse createIndexResponse = restHighLevelClient.indices() .create(createIndexRequest, RequestOptions.DEFAULT); System.out.println("isAcknowledged=" + createIndexResponse.isAcknowledged() + "\nshards_acknowledged=" + createIndexResponse.isShardsAcknowledged() + "\nindex=" + createIndexResponse.index()); restHighLevelClient.close(); } }
运行程序,返回如下类似信息:
isAcknowledged=true shards_acknowledged=true index=custom-index