Elasticsearch 功能可以通过多种方式轻松集成到任何 Java 应用程序中,通过 REST API 或通过本机 API。 在 Java 中,很容易使用许多可用库之一调用 REST HTTP 接口,例如 Apache HttpComponents 客户端(有关更多信息,请参见 http://hc.apache.org/)。 在这个领域,没有最常用的库。 通常,开发人员会选择最适合他们偏好的库或他们非常熟悉的库。 从 Elasticsearch 6.x 开始,Elastic 为客户提供了一种低/高级 HTTP 接口供大家使用。 在 8.x 版本中,Elastic 发布了一个现代/功能/强类型的客户端。如果你想了解 Elastic 对 Java 的支持,请参阅文章 “Elasticsearch:使用最新的 Elasticsearch Java client 8.0 来创建索引并搜索”。
每种 Java 虚拟机 (JVM) 语言也可以使用本地协议将 Elasticsearch 与其应用程序集成; 但是,我们不会对此进行介绍,因为它从 Elasticsearch 7.x 开始就不再使用了。 新应用程序应该依赖 HTTP。 在本章中,我们将学习如何初始化不同的客户端以及如何执行我们在前几章中看到的命令。 我们不会深入介绍每个 HTTP 调用,因为我们已经为 REST API 描述了它们。 Elasticsearch 社区建议在集成它们时使用 REST API,因为它们在不同版本之间更稳定并且有据可查。
HTTP 客户端是最容易创建的客户端之一。 它非常方便,因为它不仅允许调用本地协议那样的内部方法,还允许调用插件中实现的只能通过 HTTP 调用的第三方调用。
在今天的展示中,我将使用最新的 Elastic Stack 8.3 来进行展示。
安装
如果你还没有安装好自己的 Elasticsearch 及 Kibana。请参阅我之前的文章:
- 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch
- Kibana:如何在 Linux,MacOS 及 Windows上安装 Elastic 栈中的 Kibana
- Elasticsearch:设置 Elastic 账户安全
在默认的情况下,Elastic Stack 8.x 的安装已带有安全。如果你是自托管型的集群,那么你的证书应该是自签名的证书。
创建标准 HTTP 客户端
带有 HTTPS 的 Elasticsearch 安装
因为 Elasticsearch 8.x 或更高版本默认是安全的,所以要运行本章的所有示例,请在执行代码时将凭据放入 ES_USER 和 ES_PASSWORD 环境变量中。我们使用 maven 来创建一个项目:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>ElasticsearchJava</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.13</version></dependency></dependencies></project>
我们选择了 Apache HttpComponents 库,它是用于执行 HTTP 调用的最广泛使用的库之一。 这个库在名为的主 Maven 存储库中可用 search.maven.org。
我们创建如下的一个文件 ElasticsearchJava.java:
ElasticsearchJava.java
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.AuthCache;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;public class ElasticsearchJava {private static String wsUrl ="https://127.0.0.1:9200";public static void main(String[] args) throwsKeyManagementException, NoSuchAlgorithmException,KeyStoreException {CloseableHttpClient client = HttpClients.custom().setSSLContext(new SSLContextBuilder().loadTrustMaterial(null, TrustAllStrategy.INSTANCE).build()).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE).setRetryHandler(new MyRequestRetryHandler()).build();HttpGet method = new HttpGet(wsUrl + "/mybooks/_doc/1");// Execute the method.HttpHost targetHost = new HttpHost("localhost",9200, "https");CredentialsProvider credsProvider = newBasicCredentialsProvider();credsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(System.getenv("ES_USER"), System.getenv("ES_PASSWORD")));
// credsProvider.setCredentials(AuthScope.ANY,
// new UsernamePasswordCredentials("elastic","UX57+rwiGozGILIEn*FW"));// Create AuthCache instanceAuthCache authCache = new BasicAuthCache();// Generate BASIC scheme object and add it to local auth cacheBasicScheme basicAuth = new BasicScheme();authCache.put(targetHost, basicAuth);// Add AuthCache to the execution contextHttpClientContext context = HttpClientContext.create();context.setCredentialsProvider(credsProvider);method.addHeader("Accept-Encoding", "gzip");try {CloseableHttpResponse response = client.execute(method, context);if (response.getStatusLine().getStatusCode()!= HttpStatus.SC_OK) {System.err.println("Method failed: " +response.getStatusLine());} else {HttpEntity entity = response.getEntity();String responseBody = EntityUtils.toString(entity);System.out.println(responseBody);}} catch (IOException e) {System.err.println("Fatal transport error: "+ e.getMessage());e.printStackTrace();} finally {// Release the connection.method.releaseConnection();}}
}
我们创建另外一个文件:
MyRequestRetryHandler.java
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.protocol.HttpContext;import javax.net.ssl.SSLException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.UnknownHostException;public class MyRequestRetryHandler implements HttpRequestRetryHandler {public boolean retryRequest(IOException exception,int executionCount,HttpContext context) {if (executionCount >= 3) {// Do not retry if over max retry countreturn false;}if (exception instanceof InterruptedIOException) {// Timeoutreturn false;}if (exception instanceof UnknownHostException) {// Unknown hostreturn false;}if (exception instanceof ConnectTimeoutException) {// Connection refusedreturn false;}if (exception instanceof SSLException) {// SSL handshake exceptionreturn false;}HttpClientContext clientContext = HttpClientContext.adapt(context);HttpRequest request = clientContext.getRequest();return !(request instanceof HttpEntityEnclosingRequest);}
}
第一步是初始化 HTTP 客户端对象。 在前面的代码中,这是通过以下代码片段完成的:
CloseableHttpClient client = CloseableHttpClient client =
HttpClients.custom()
.setSSLContext(new SSLContextBuilder().
loadTrustMaterial(null, TrustAllStrategy.INSTANCE).build())
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setRetryHandler(new MyRequestRetryHandler())
.build();
在使用客户端之前,最好对其进行自定义。 默认情况下,Elasticsearch 8.0.0 是安全的,它使用自生成的未经官方签名的 SSL 证书,因此需要使用 .setContext 和 .setSSLHostnameVerifier 来接受自签名证书。 一般来说,客户端也可以被修改以提供额外的功能,例如重试支持。 重试支持对于设计健壮的应用程序非常重要; IP 网络协议永远不会 100% 可靠,因此如果出现问题,它会自动重试操作变坏(例如,HTTP 连接关闭或服务器开销)。
在前面的代码中,我们定义了 HttpRequestRetryHandler,它监视执行并在引发错误之前重复执行 3 次。 为了能够执行 API 调用,我们需要进行身份验证; 身份验证与 basicAuth 一样简单,但非常适用于非复杂部署,如以下代码所示:
HttpHost targetHost = new HttpHost("localhost", 9200, "https");
CredentialsProvider credsProvider = new
BasicCredentialsProvider();
credsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(System.getenv("ES_USER"),System.getenv("ES_PASSWORD")));
AuthCache authCache = new BasicAuthCache();
// Generate BASIC scheme object and add it to local auth cache
BasicScheme basicAuth = new BasicScheme();
authCache.put(targetHost, basicAuth);
提示:最佳安全实践是从环境变量中读取凭据,而不是将它们放入代码中。
执行调用时必须使用 create context 参数,如下代码所示:
// Add AuthCache to the execution context
HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credsProvider);
CloseableHttpResponse response = client.execute(method,context);
设置客户端后,我们执行 GET REST 调用。 使用的方法将用于 HttpGet 并且 URL 将是名为 index/type/id 的项目 要初始化该方法,请使用以下代码:
HttpGet method = new HttpGet(wsUrl + "/mybooks/_doc/1");
上面的代码相当于如下的命令:
GET mybooks/_doc/1
现在我们可以设置自定义 header,允许我们将额外信息传递给服务器以执行调用。 一些示例可能是 API 密钥或有关支持格式的提示。
一个典型的例子是通过 HTTP 使用 gzip 数据压缩来减少带宽使用。 为此,我们可以在调用中添加一个自定义 header,通知服务器我们的客户端接受编码。 可以从短语 Accept-Encoding 和 gzip 制作示例自定义标头,如以下代码所示:
method.addHeader("Accept-Encoding", "gzip");
使用所有参数配置调用后,我们可以启动请求,如下所示:
CloseableHttpResponse response = client.execute(method, context);
每个响应对象都必须对其返回状态进行验证:如果调用正常,则返回状态应为 200。在以下代码中,检查是在 if 语句中完成的,如下所示:
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK)
如果调用正常,响应的状态码是200,我们就可以读取到应答,如下:
HttpEntity entity = response.getEntity();
String responseBody = EntityUtils.toString(entity);
响应包装在 HttpEntity 中,它是一个流。 HTTP 客户端库提供了一个名为 EntityUtils.toString 的辅助方法,它将 HttpEntity 的所有内容作为字符串读取; 否则,我们需要创建一些代码来读取字符串并构建字符串。 显然,调用的所有读取部分都包装在一个 try-catch 块中,以收集由网络错误创建的所有可能的错误。
测试
为了能够测试上面的代码,我们必须借助 Kibana 来创建如下的一个 mybooks 索引:
PUT mybooks/_doc/1
{"uuid": "1","title": "Great math"
}
在上面,我们创建了一个叫做 mybooks 的索引,并创建了一个文档。运行我们的 Java 应用,我们可以看到如下的结果:
{"_index":"mybooks","_id":"1","_version":1,"_seq_no":0,"_primary_term":1,"found":true,"_source":{"uuid": "1","title": "Great math"
}
}
显然这个结果就是我们如下命令的结果:
GET mybooks/_doc/1
通过使用 HTTP client 的方法,我们可以直接使用 Elasticsearch 所提供的丰富的 REST API 接口来完成我们想要的功能。
不带任何安全的 Elasticsearch 安装
针对这样的 Elasticsearch 安装,我们可以参照之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单” 中的 “如何配置 Elasticsearch 不带安全性” 一节来进行安装。
我们来创建如下的一个文件 App.java
App.java
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;import java.io.IOException;public class App {private static String wsUrl = "http://127.0.0.1:9200";public static void main(String[] args) {CloseableHttpClient client = HttpClients.custom().setRetryHandler(new MyRequestRetryHandler()).build();HttpGet method = new HttpGet(wsUrl + "/mybooks/_doc/1");// Execute the method.try {CloseableHttpResponse response = client.execute(method);if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {System.err.println("Method failed: " + response.getStatusLine());} else {HttpEntity entity = response.getEntity();String responseBody = EntityUtils.toString(entity);System.out.println(responseBody);}} catch (IOException e) {System.err.println("Fatal transport error: " + e.getMessage());e.printStackTrace();} finally {// Release the connection.method.releaseConnection();}}
}
和之前的一样,如果我们需要进行测试,我们需要使用 Kibana 来创建如下的一个索引:
PUT mybooks/_doc/1
{"uuid": "1","title": "Great math"
}
然后,运行我们的代码。我可以可以看到如下的一个输出:
{"_index":"mybooks","_id":"1","_version":1,"_seq_no":0,"_primary_term":1,"found":true,"_source":{"uuid": "1","title": "Great math"
}
}
显然它和之前的输出是一致的。
带有基本安全的 Elasticsearch 安装
针对这样的 Elasticsearch 安装,我们可以参照之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单” 中的 “如何配置 Elasticsearch 只带有基本安全” 一节来进行安装。
我们来创建如下的一个叫做 ElasticsearchBasicSecurity.java
ElasticsearchBasicSecurity.java
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.AuthCache;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;import java.io.IOException;public class ElasticsearchBasicSecurity {private static String wsUrl = "http://127.0.0.1:9200";public static void main(String[] args) {CloseableHttpClient client = HttpClients.custom().setRetryHandler(new MyRequestRetryHandler()).build();HttpGet method = new HttpGet(wsUrl + "/mybooks/_doc/1");// Execute the method.HttpHost targetHost = new HttpHost("127.0.0.1", 9200, "http");CredentialsProvider credsProvider = new BasicCredentialsProvider();credsProvider.setCredentials(new AuthScope(targetHost.getHostName(), targetHost.getPort()),new UsernamePasswordCredentials("elastic", "password"));// Create AuthCache instanceAuthCache authCache = new BasicAuthCache();// Generate BASIC scheme object and add it to local auth cacheBasicScheme basicAuth = new BasicScheme();authCache.put(targetHost, basicAuth);// Add AuthCache to the execution contextHttpClientContext context = HttpClientContext.create();context.setCredentialsProvider(credsProvider);method.addHeader("Accept-Encoding", "gzip");try {CloseableHttpResponse response = client.execute(method, context);if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {System.out.println("Something is wrong");System.err.println("Method failed: " + response.getStatusLine());} else {HttpEntity entity = response.getEntity();String responseBody = EntityUtils.toString(entity);System.out.println(responseBody);}} catch (IOException e) {System.err.println("Fatal transport error: " + e.getMessage());e.printStackTrace();} finally {// Release the connection.method.releaseConnection();}}
}
在上面,你需要根据自己的用户名及密码进行修改。
为了测试,我们使用 Kibana 创建如下的一个索引:
PUT mybooks/_doc/1
{"uuid": "1","title": "Great math"
}
运行我们的代码:
{"_index":"mybooks","_id":"1","_version":1,"_seq_no":0,"_primary_term":1,"found":true,"_source":{"uuid": "1","title": "Great math"
}
}
从上面的输出中,我们可以看出来输出的结果和之前的是一样的。