Hexo

点滴积累 豁达处之

0%

ElasticSearch整合(Java api)

ElasticSearch整合(Java api)

相关依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<dependencies>
<!-- ES的高阶的客户端API -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
<!-- 阿里巴巴出品的一款将Java对象转换为JSON、将JSON转换为Java对象的库 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.3</version>
<scope>test</scope>
</dependency>

</dependencies>

初始化连接

使用的是RestHighLevelClient去连接ES集群,后续操作ES中的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
private RestHighLevelClient restHighLevelClient;

public JobFullTextServiceImpl() {
// 建立与ES的连接
// 1. 使用RestHighLevelClient构建客户端连接。
// 2. 基于RestClient.builder方法来构建RestClientBuilder
// 3. 用HttpHost来添加ES的节点
RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost("192.168.21.130", 9200, "http")
, new HttpHost("192.168.21.131", 9200, "http")
, new HttpHost("192.168.21.132", 9200, "http"));
restHighLevelClient = new RestHighLevelClient(restClientBuilder);
}

查询/删除/搜索/分页**

1
2
3
4
5
* 新增:IndexRequest
* 更新:UpdateRequest
* 删除:DeleteRequest
* 根据ID获取:GetRequest
* 关键字检索:SearchRequest

添加职位数据到ES中

* 使用IndexRequest对象来描述请求

​ * 可以设置请求的参数:设置ID、并设置传输ES的数据——注意因为ES都是使用JSON(DSL)来去操作数据的,所以需要使用一个FastJSON的库来将对象转换为JSON字符串进行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void add(JobDetail jobDetail) throws IOException {
//1. 构建IndexRequest对象,用来描述ES发起请求的数据。
IndexRequest indexRequest = new IndexRequest(JOB_IDX);

//2. 设置文档ID。
indexRequest.id(jobDetail.getId() + "");

//3. 使用FastJSON将实体类对象转换为JSON。
String json = JSONObject.toJSONString(jobDetail);

//4. 使用IndexRequest.source方法设置文档数据,并设置请求的数据为JSON格式。
indexRequest.source(json, XContentType.JSON);

//5. 使用ES High level client调用index方法发起请求,将一个文档添加到索引中。
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public JobDetail findById(long id) throws IOException {
// 1. 构建GetRequest请求。
GetRequest getRequest = new GetRequest(JOB_IDX, id + "");

// 2. 使用RestHighLevelClient.get发送GetRequest请求,并获取到ES服务器的响应。
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);

// 3. 将ES响应的数据转换为JSON字符串
String json = getResponse.getSourceAsString();

// 4. 并使用FastJSON将JSON字符串转换为JobDetail类对象
JobDetail jobDetail = JSONObject.parseObject(json, JobDetail.class);

// 5. 记得:单独设置ID
jobDetail.setId(id);

return jobDetail;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void update(JobDetail jobDetail) throws IOException {
// 1. 判断对应ID的文档是否存在
// a) 构建GetRequest
GetRequest getRequest = new GetRequest(JOB_IDX, jobDetail.getId() + "");

// b) 执行client的exists方法,发起请求,判断是否存在
boolean exists = restHighLevelClient.exists(getRequest, RequestOptions.DEFAULT);

if(exists) {
// 2. 构建UpdateRequest请求
UpdateRequest updateRequest = new UpdateRequest(JOB_IDX, jobDetail.getId() + "");

// 3. 设置UpdateRequest的文档,并配置为JSON格式
updateRequest.doc(JSONObject.toJSONString(jobDetail), XContentType.JSON);

// 4. 执行client发起update请求
restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
}
}
1
2
3
4
5
6
7
8
9
@Override
public void deleteById(long id) throws IOException {
// 1. 构建delete请求
DeleteRequest deleteRequest = new DeleteRequest(JOB_IDX, id + "");

// 2. 使用RestHighLevelClient执行delete请求
restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public List<JobDetail> searchByKeywords(String keywords) throws IOException {
// 1.构建SearchRequest检索请求
// 专门用来进行全文检索、关键字检索的API
SearchRequest searchRequest = new SearchRequest(JOB_IDX);

// 2.创建一个SearchSourceBuilder专门用于构建查询条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 3.使用QueryBuilders.multiMatchQuery构建一个查询条件(搜索title、jd),并配置到SearchSourceBuilder
MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(keywords, "title", "jd");

// 将查询条件设置到查询请求构建器中
searchSourceBuilder.query(multiMatchQueryBuilder);

// 4.调用SearchRequest.source将查询条件设置到检索请求
searchRequest.source(searchSourceBuilder);

// 5.执行RestHighLevelClient.search发起请求
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hitArray = searchResponse.getHits().getHits();

// 6.遍历结果
ArrayList<JobDetail> jobDetailArrayList = new ArrayList<>();

for (SearchHit documentFields : hitArray) {
// 1)获取命中的结果
String json = documentFields.getSourceAsString();

// 2)将JSON字符串转换为对象
JobDetail jobDetail = JSONObject.parseObject(json, JobDetail.class);

// 3)使用SearchHit.getId设置文档ID
jobDetail.setId(Long.parseLong(documentFields.getId()));

jobDetailArrayList.add(jobDetail);
}

return jobDetailArrayList;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Override
public Map<String, Object> searchByPage(String keywords, int pageNum, int pageSize) throws IOException {
// 1.构建SearchRequest检索请求
// 专门用来进行全文检索、关键字检索的API
SearchRequest searchRequest = new SearchRequest(JOB_IDX);

// 2.创建一个SearchSourceBuilder专门用于构建查询条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 3.使用QueryBuilders.multiMatchQuery构建一个查询条件(搜索title、jd),并配置到SearchSourceBuilder
MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(keywords, "title", "jd");

// 将查询条件设置到查询请求构建器中
searchSourceBuilder.query(multiMatchQueryBuilder);

// 每页显示多少条
searchSourceBuilder.size(pageSize);
// 设置从第几条开始查询
searchSourceBuilder.from((pageNum - 1) * pageSize);

// 4.调用SearchRequest.source将查询条件设置到检索请求
searchRequest.source(searchSourceBuilder);

// 5.执行RestHighLevelClient.search发起请求
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hitArray = searchResponse.getHits().getHits();

// 6.遍历结果
ArrayList<JobDetail> jobDetailArrayList = new ArrayList<>();

for (SearchHit documentFields : hitArray) {
// 1)获取命中的结果
String json = documentFields.getSourceAsString();

// 2)将JSON字符串转换为对象
JobDetail jobDetail = JSONObject.parseObject(json, JobDetail.class);

// 3)使用SearchHit.getId设置文档ID
jobDetail.setId(Long.parseLong(documentFields.getId()));

jobDetailArrayList.add(jobDetail);
}

// 8. 将结果封装到Map结构中(带有分页信息)
// a) total -> 使用SearchHits.getTotalHits().value获取到所有的记录数
// b) content -> 当前分页中的数据
long totalNum = searchResponse.getHits().getTotalHits().value;
HashMap hashMap = new HashMap();
hashMap.put("total", totalNum);
hashMap.put("content", jobDetailArrayList);


return hashMap;
}

使用scroll分页方式查询

1
2
3
> * 第一次查询,不带scroll_id,所以要设置scroll超时时间
> * 超时时间不要设置太短,否则会出现异常
> * 第二次查询,SearchSrollRequest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Override
public Map<String, Object> searchByScrollPage(String keywords, String scrollId, int pageSize) throws IOException {
SearchResponse searchResponse = null;

if(scrollId == null) {
// 1.构建SearchRequest检索请求
// 专门用来进行全文检索、关键字检索的API
SearchRequest searchRequest = new SearchRequest(JOB_IDX);

// 2.创建一个SearchSourceBuilder专门用于构建查询条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 3.使用QueryBuilders.multiMatchQuery构建一个查询条件(搜索title、jd),并配置到SearchSourceBuilder
MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(keywords, "title", "jd");

// 将查询条件设置到查询请求构建器中
searchSourceBuilder.query(multiMatchQueryBuilder);

// 每页显示多少条
searchSourceBuilder.size(pageSize);

// 4.调用SearchRequest.source将查询条件设置到检索请求
searchRequest.source(searchSourceBuilder);

//--------------------------
// 设置scroll查询
//--------------------------
searchRequest.scroll(TimeValue.timeValueMinutes(5));

// 5.执行RestHighLevelClient.search发起请求
searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

}
// 第二次查询的时候,直接通过scroll id查询数据
else {
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
searchScrollRequest.scroll(TimeValue.timeValueMinutes(5));

// 使用RestHighLevelClient发送scroll请求
searchResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
}

//--------------------------
// 迭代ES响应的数据
//--------------------------

SearchHit[] hitArray = searchResponse.getHits().getHits();

// 6.遍历结果
ArrayList<JobDetail> jobDetailArrayList = new ArrayList<>();

for (SearchHit documentFields : hitArray) {
// 1)获取命中的结果
String json = documentFields.getSourceAsString();

// 2)将JSON字符串转换为对象
JobDetail jobDetail = JSONObject.parseObject(json, JobDetail.class);

// 3)使用SearchHit.getId设置文档ID
jobDetail.setId(Long.parseLong(documentFields.getId()));

jobDetailArrayList.add(jobDetail);
}

// 8. 将结果封装到Map结构中(带有分页信息)
// a) total -> 使用SearchHits.getTotalHits().value获取到所有的记录数
// b) content -> 当前分页中的数据
long totalNum = searchResponse.getHits().getTotalHits().value;
HashMap hashMap = new HashMap();
hashMap.put("scroll_id", searchResponse.getScrollId());
hashMap.put("content", jobDetailArrayList);

return hashMap;
}

参考链接