elk 二

zzz.jpg

ELK

document详细

手动控制版本号

前提

已有数据是在数据库中,有自己手动维护的版本号的情况下,可以使用external version控制

要求

修改时external version要大于当前文档的_version

语法

1
2
3
4
PUT /test_index/_doc/4?version=2&version_type=external
{
"test_field": "name"
}

retry_on_conflict 参数

指定重新发送次数

1
2
3
4
5
6
POST /test_index/_doc/1/_update?retry_on_conflict=3
{
"doc": {
"test_field": "name"
}
}

批量查询 mget

减少网络开销

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /_mget
{
"docs" : [
{
"_index" : "test_index",
"_id" : 1
},
{
"_index" : "test_index",
"_id" : 2
}
]
}

统一索引下批量查询

1
2
3
4
5
6
7
8
9
10
11
GET /test_index/_mget
{
"docs" : [
{
"_id" : 2
},
{
"_id" : 3
}
]
}

搜索的批量查询

1
2
3
4
5
6
7
8
post /test_index/_doc/_search
{
"query": {
"ids" : {
"values" : ["1", "2"]
}
}
}

批量增删改 bulk

一次请求全都做完。减少网络传输次数

1
2
3
4
5
6
POST /_bulk
{ "delete": { "_index": "test_index", "_id": "5" }}
{ "create": { "_index": "test_index", "_id": "14" }}
{ "test_field": "test14" }
{ "update": { "_index": "test_index", "_id": "2"} }
{ "doc" : {"test_field" : "bulk test"} }

delete:删除一个文档,只需1个json串

create:强制创建 PUT /index/type/id/_create

index:普通的put操作,可以是创建文档,也可以是全量替换文档

update:执行的是局部更新partial update操作

格式:每个json不能换行,相邻json必须换行

隔离:每个操作互不影响,操作失败的行会返回其失败信息

用法:bulk请求一次不要太大,会积压到内存中,性能会下降,一次请求几千个操作、大小在几M正好

Java api操作ElasticSearch

low: 偏向底层

high:高级封装

原生方式

  • pom
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.0</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.3.0</version>
</dependency>
  • test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 原生创建
* @throws IOException
*/
@Test
void contextLoads() throws IOException {

// 获取连接客户端
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
new HttpHost("localhost",9200,"http")
));

// 构建请求
GetRequest getRequest = new GetRequest("book","1");

// 执行
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);

// 获取结果
System.out.println(response.getIndex());
System.out.println(response.getId());
System.out.println(response.getSourceAsString());

}

spring boot方式文档查询

  • pom
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.0</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
  • application.yml
1
2
3
4
5
6
7
spring:
application:
name: elasticsearch-server

xiaobo:
elasticsearch:
hostname: 127.0.0.1:9200 #多个结点中间用逗号分隔
  • test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//    注入获取连接客户端
@Autowired
private RestHighLevelClient restHighLevelClient;

/**
* sprinboot配置创建
* @throws IOException
*/
@Test
public void esTest() throws IOException {

// 构建请求
GetRequest getRequest = new GetRequest("book","1");

// 发送同步请求
GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);

// 返回结果
System.out.println(response.getIndex());
System.out.println(response.getId());
System.out.println(response.getSourceAsString());

}

/**
* 可选参数 为特定字段配置_source_include
* @throws IOException
*/
@Test
public void argumentsTest1() throws IOException {

GetRequest getRequest = new GetRequest("book","1");

// 拿到指定字段
String includes[] = new String[]{"name","price"};
String excludes[] = Strings.EMPTY_ARRAY;

FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);

getRequest.fetchSourceContext(fetchSourceContext);

GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);

System.out.println(response.getSourceAsString());
}

/**
* 可选参数 为特定字段配置_source_excludes getRequest里面有很多方法 慢慢尝试
* @throws IOException
*/
@Test
public void argumentsTest2() throws IOException {
GetRequest getRequest = new GetRequest("book","1");

// 不拿指定字段
String includes1[] = Strings.EMPTY_ARRAY;
String exclude1[] = new String[]{"name","price"};

FetchSourceContext fetchSourceContext = new FetchSourceContext(true,includes1,exclude1);

getRequest.fetchSourceContext(fetchSourceContext);

GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);

System.out.println(response.getSourceAsString());
}

/**
* 异步请求
* @throws InterruptedException
*/
@Test
public void asynTest() throws InterruptedException {

GetRequest getRequest = new GetRequest("book","1");

// 声明监听器 监听getRequest
ActionListener<GetResponse> listener = new ActionListener<>() {
@Override
public void onResponse(GetResponse getResponse) {
System.out.println(getResponse.getSourceAsString());
}

@Override
public void onFailure(Exception e) {
e.printStackTrace();
}
};


restHighLevelClient.getAsync(getRequest, RequestOptions.DEFAULT, listener);

Thread.sleep(2000);
}

spring boot 方式文档新增

  • test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/**
* 创建
* @throws IOException
*/
@Test
public void indexTest() throws IOException {

// 构建请求
IndexRequest indexRequest = new IndexRequest("index_test");
indexRequest.id("2");

// 构建方法1
String jsonString="{\n" +
" \"name\":\"xiaobo\",\n" +
" \"postDate\":\"2022-11-04\",\n" +
" \"age\":\"22\"\n" +
"}";

// XContentType.JSON声明是json
indexRequest.source(jsonString, XContentType.JSON);
// 构建方法2
// HashMap<String, Object> map = new HashMap<>();
// map.put("name","xiaobo");
// map.put("postDate","2022-11-04");
// map.put("age","22");
//
// indexRequest.source(map);
//
//// 构建方法3
// XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
//
// xContentBuilder.startObject();
// {
// xContentBuilder.field("name","xiaobo");
// xContentBuilder.timeField("postDate","2022-11-04");
// xContentBuilder.field("age","22");
// }
// xContentBuilder.endObject();
// indexRequest.source(xContentBuilder);
//
//// 构建方法4
// indexRequest.source("name","xiaobo",
// "postDate","2022-11-04",
// "age","22");


// 手动设置version版本号
// indexRequest.version(4);
// indexRequest.versionType(VersionType.EXTERNAL);

//设置超时时间
// indexRequest.timeout(TimeValue.timeValueSeconds(1));
// indexRequest.timeout("1s");

// 同步请求
IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);


// System.out.println(response.getId());
// System.out.println(response.getResult());


// 异步请求
// ActionListener<IndexResponse> listener = new ActionListener<>() {
// @Override
// public void onResponse(IndexResponse indexResponse) {
// System.out.println(indexResponse.getResult());
// }
//
// @Override
// public void onFailure(Exception e) {
// e.printStackTrace();
// }
// };
//
// restHighLevelClient.indexAsync(indexRequest,RequestOptions.DEFAULT,listener);
//
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }

if (response.getResult() == DocWriteResponse.Result.CREATED){
System.out.println("创建:" + response.getResult());
}else if (response.getResult() == DocWriteResponse.Result.UPDATED){
System.out.println("更新:" + response.getResult());
}

ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();


// 检验主从碎片是否正常同步
if (shardInfo.getTotal() != shardInfo.getSuccessful()){
System.out.println("处理成功的碎片不正常");
}

if (shardInfo.getFailed() > 0){
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
// 打印错误
System.out.println(failure.reason());
}
}

}

spring boot文档修改

  • test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 更新
* @throws IOException
*/
@Test
public void updateTest() throws IOException {
UpdateRequest updateRequest = new UpdateRequest("index_test","1");

HashMap<String, Object> map = new HashMap<>();

map.put("name","wangyibo");

updateRequest.doc(map);

UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);

if (response.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("更新:" + response.getResult());
}else if (response.getResult() == DocWriteResponse.Result.CREATED){
System.out.println("创建:" + response.getResult());
}else if (response.getResult() == DocWriteResponse.Result.DELETED){
System.out.println("删除:" + response.getResult());
}else if (response.getResult() == DocWriteResponse.Result.NOOP){
System.out.println("没有操作:" + response.getResult());
}
}

spring boot文档删除

  • test
1
2
3
4
5
6
7
8
9
10
11
/**
* 删除
* @throws IOException
*/
@Test
public void deleteTest() throws IOException {
DeleteRequest deleteRequest = new DeleteRequest("index_test","2");

DeleteResponse delete = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);

}

spring boot文档批量

  • test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 批量
* @throws IOException
*/
@Test
public void bulkTest() throws IOException {

// 创建请求
BulkRequest bulkRequest = new BulkRequest();

// 加入批量
bulkRequest.add(new IndexRequest("bulk_test_1")
.id("1")
.source(XContentType.JSON, "name", "xiaobo"));

bulkRequest.add(new IndexRequest("bulk_test_2")
.id("2")
.source(XContentType.JSON, "name", "wangyibo"));

bulkRequest.add(new UpdateRequest("bulk_test_1","1")
.doc("name","wyb"));

bulkRequest.add(new DeleteRequest("bulk_test_2","2"));

// 发送请求
BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

// 遍历结果
for (BulkItemResponse bulkItemResponse : response) {
// 拿到response
DocWriteResponse response1 = bulkItemResponse.getResponse();
// 拿到response类型,并进行转换
switch (bulkItemResponse.getOpType()){
case INDEX:
IndexResponse indexResponse = (IndexResponse) response1;
System.out.println("创建:" + indexResponse.getResult());
break;
case CREATE:
IndexResponse indexResponse1 = (IndexResponse) response1;
System.out.println("创建:" + indexResponse1.getResult());
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) response1;
System.out.println("更新:" + updateResponse.getResult());
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) response1;
System.out.println("删除:" + deleteResponse.getResult());
break;
}
}

}

es内部机制

es分布式基础

透明隐藏特性

分布式机制:分布式数据存储及共享

分片机制:数据存储到哪个分片,副本数据写入

集群发现机制:cluster discovery。新启动es实例,自动加入集群

shard负载均衡:大量数据写入及查询,es会将数据平均分配

shard副本:新增副本数,分片重分配

垂直扩容与水平扩容

垂直扩容:使用更加强大的服务器替代老服务器,但单机存储及运算能力有上线,且成本直线上升

水平扩容:采购更多服务器,加入集群,大数据

节点对等的分布式架构

节点对等,每个节点都能接收所有的请求

自动请求路由

响应收集

分片shard、副本replica机制

shard & replica机制

  • 每个index包含一个或多个shard

  • 每个shard都是一个最小工作单元,承载部分数据,lucene实例,完整的建立索引和处理请求的能力

  • 增减节点时,shard会自动在nodes中负载均衡

  • primary shard和replica shard,每个document肯定只存在于某一个primary shard以及其对应的replica shard中,不可能存在于多个primary shard

  • replica shard是primary shard的副本,负责容错,以及承担读请求负载

  • primary shard的数量在创建索引的时候就固定了,replica shard的数量可以随时修改

  • primary shard的默认数量是1,replica默认是1,默认共有2个shard,1个primary shard,1个replica shard

  • es7以前primary shard的默认数量是5,replica默认是1,默认有10个shard,5个primary shard,5个replica shard

  • primary shard不能和自己的replica shard放在同一个节点上(否则节点宕机,primary shard和副本都丢失,起不到容错的作用),但是可以和其他primary shard的replica shard放在同一个节点上

单node环境下创建index

  • 单node环境下,创建一个index,有3个primary shard,3个replica shard

  • 集群status是yellow

  • 将3个primary shard分配到仅有的一个node上去,另外3个replica shard无法分配

  • 集群可以正常工作,但是一旦出现节点宕机,数据全部丢失,而且集群不可用,无法承接任何请求

1
2
3
4
5
6
7
PUT /test_index1
{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 1
}
}

2个node环境下replica shard

  • replica shard分配:3个primary shard,3个replica shard,1 node

  • primary 向 replica同步

  • 读请求 primary和replica都可以进行读取

横向扩容

  • 分片自动负载均衡,分片向空闲机器转移

  • 每个节点存储更少分片,系统资源给与每个分片的资源更多,整体集群性能提高

  • 扩容极限,节点数大于整体分片数,则必有空闲机器

  • 超出扩容极限时,可以增加副本数,存储和搜索性能更强。容错性更好

  • 容错性,只要一个索引的所有主分片在,集群就就可以运行

es容错机制 master选举,replica容错,数据恢复

  • master node宕机,自动master选举,集群为red

  • replica容错,新master将replica提升为primary shard,状态为yellow

  • 重启宕机node,master 复制 replica到该结点,使用原有的shard并同步宕机后的修改,状态为green

文档存储机制

数据路由

文档存储如何路由到相应分片

文档最终会落在主分片的一个分片上,到底落到哪一个,这就是数据路由

路由算法

哈希值对主分片数取模

1
shard = hash(routing) % number_of_primary_shards

手动指定路由

1
2
3
PUT /test_index/_doc/1?routing=numbers{

}

文档的增删改内部机制

  • 客户端选择一个node发送请求过去,这个node就是coordinating node(协调节点)

  • coordinating node,对document进行路由,将请求转发给对应的node

  • 实际的node上的primary shard处理请求,然后将数据同步到replica node

  • coordinating node,如果发现primary node和所有replica node都搞定之后,就返回响应结果给客户端

文档的查询内部机制

  • 客户端发送请求到任意一个node,成为coordinate node

  • coordinate node对document进行路由,将请求转发到对应的node,此时会使用round-robin随机轮询算法,在primary shard以及其所有replica中随机选择一个,让读请求负载均衡

  • 接收请求的node返回document给coordinate node

  • coordinate node返回document给客户端

  • document如果还在建立索引过程中,可能只有primary shard有,任何一个replica shard都没有,此时可能会导致无法读取到document,但是document完成索引建立之后,primary shard和replica shard就都有了

Mapping映射

自动或手动为index中的_doc建立的一种数据结构和相关配置,简称为mapping映射

  • 自动创建映射
1
PUT /website/_doc/1{}
  • 查看映射
1
GET  /website/_mapping

dynamic mapping,自动为我们建立index,以及对应的mapping,mapping中包含了每个field对应的数据类型,以及如何分词等设置

精确匹配与全文搜索

精确匹配

exact value,搜索的时候必须输入完全相同的内容不过

全文检索

只是匹配完整的一个值,可以对值进行拆分词语后(分词)进行匹配,也可以通过缩写、时态、大小写、同义词等进行匹配

  • 缩写 & 全称

cn vs china

  • 格式转化

like liked likes

  • 大小写

Tom vs tom

  • 同义词

like vs love

分词器 analyzer

切分词语,normalization(时态转换,单复数转换)就是一个标准化

analyzer组成

  • character filter:在一段文本进行分词之前,先进行预处理

& –> and

  • tokenizer:分词

I Love You –> I,Love,You

  • token filter

dogs –> dog,liked –> like

内置分词器

standard analyzer 标准分词器

simple analyzer 简单分词器

whitespace analyzer 空格分词器

language analyzer 特定的语言的分词器

query string根据字段分词

query string必须和index建立相同的analyzer进行分词

1
2
3
4
5
GET /_analyze
{
"analyzer": "standard",
"text": "Text to analyze 80"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
"tokens" : [
{
"token" : "text",
"start_offset" : 0,
"end_offset" : 4,
"type" : "<ALPHANUM>",
"position" : 0
},
{
"token" : "to",
"start_offset" : 5,
"end_offset" : 7,
"type" : "<ALPHANUM>",
"position" : 1
},
{
"token" : "analyze",
"start_offset" : 8,
"end_offset" : 15,
"type" : "<ALPHANUM>",
"position" : 2
},
{
"token" : "80",
"start_offset" : 16,
"end_offset" : 18,
"type" : "<NUM>",
"position" : 3
}
]
}

token实际存储的term 关键字

position在此词条在原文本中的位置

start_offset/end_offset字符在原始字符串中的位置

手动管理mapping

查询所有索引映射

GET /_mapping

手动创建映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
PUT book/_mapping
{
"properties": {
"name": {
"type": "text"
},
"description": {
"type": "text",
"analyzer":"english",
"search_analyzer":"english"
},
"pic":{
"type":"text",
"index":false
},
"studymodel":{
"type":"text"
}
}
}
  • analyzer

通过analyzer属性指定分词器,analyzer是指在索引和搜索都使用english,如果单独想定义搜索时使用的分词器则可以通过search_analyzer属性

  • index

index属性指定是否索引,默认为index=true,即要进行索引,只有进行索引才可以从索引库搜索到,但是也有一些内容不需要索引,例如,商品图片地址只被用来展示图片,不进行搜索图片,此时可以将index设置为false

  • store

是否在source之外存储,每个文档索引后会在 ES中保存一份原始文档,存放在_source中,一般情况下不需要设置store为true,因为在_source中已经有一份原始文档了

keyword关键字字段

取代了”index”: false,创建keyword字段的索引时是不进行分词的

date日期类型

format可以设置日期格式

修改映射

只能创建index时手动建立mapping,或者新增field mapping,但是不能update field mapping

因为已有数据按照映射早已分词存储好,如果修改,这些存量数据不知道该怎么处理

删除映射

通过删除索引来删除映射

复杂数据类型

multivalue field

{ “tags”: [ “tag1”, “tag2” ]}

empty field

null,[],[null]

object field

1
2
3
4
5
6
7
8
9
10
11
PUT /person/_doc/1
{
"address": {
"country": "china",
"province": "liaoning",
"city": "tieling"
},
"name": "xiaobo",
"age": 22,
"join_date": "2022-12-22"
}
  • object
1
2
3
4
5
6
7
8
9
10
{
"address": {
"country": "china",
"province": "liaoning",
"city": "tieling"
},
"name": "xiaobo",
"age": 22,
"join_date": "2022-12-22"
}
  • object底层存储格式
1
2
3
4
5
6
7
8
{
"name": [xiaobo],
"age": [22],
"join_date": [2022-12-22],
"address.country": [china],
"address.province": [liaoning],
"address.city": [tieling]
}
  • 数组对象
1
2
3
4
5
6
7
{
"authors": [
{ "age": 22, "name": "xiaobo"},
{ "age": 23, "name": "wangyibo"},
{ "age": 21, "name": "wangyangnan"}
]
}
  • 数组底层存储格式
1
2
3
4
{
"authors.age": [21,22,23],
"authors.name": [xiaobo,wangyibo,wangyangnan]
}

正确的开始 微小的长进 然后持续 嘿 我是小博 带你一起看我目之所及的世界……

-------------本文结束 感谢您的阅读-------------

本文标题:elk 二

文章作者:小博

发布时间:2022年12月22日 - 18:51

最后更新:2022年12月30日 - 15:57

原始链接:https://codexiaobo.github.io/posts/566979386/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。