ELK
document详细
手动控制版本号
前提
已有数据是在数据库中,有自己手动维护的版本号的情况下,可以使用external version控制
要求
修改时external version要大于当前文档的_version
语法
1 2 3 4
| PUT /test_index/_doc/4?version=2&version_type=external { "test_field": "name" }
|
retry_on_conflict 参数
指定重新发送次数
1 2 3 4 5 6
| POST /test_index/_doc/1/_update?retry_on_conflict=3 { "doc": { "test_field": "name" } }
|
批量查询 mget
减少网络开销
1 2 3 4 5 6 7 8 9 10 11 12 13
| GET /_mget { "docs" : [ { "_index" : "test_index", "_id" : 1 }, { "_index" : "test_index", "_id" : 2 } ] }
|
统一索引下批量查询
1 2 3 4 5 6 7 8 9 10 11
| GET /test_index/_mget { "docs" : [ { "_id" : 2 }, { "_id" : 3 } ] }
|
搜索的批量查询
1 2 3 4 5 6 7 8
| post /test_index/_doc/_search { "query": { "ids" : { "values" : ["1", "2"] } } }
|
批量增删改 bulk
一次请求全都做完。减少网络传输次数
1 2 3 4 5 6
| POST /_bulk { "delete": { "_index": "test_index", "_id": "5" }} { "create": { "_index": "test_index", "_id": "14" }} { "test_field": "test14" } { "update": { "_index": "test_index", "_id": "2"} } { "doc" : {"test_field" : "bulk test"} }
|
delete
:删除一个文档,只需1个json串
create
:强制创建 PUT /index/type/id/_create
index
:普通的put操作,可以是创建文档,也可以是全量替换文档
update
:执行的是局部更新partial update操作
格式
:每个json不能换行,相邻json必须换行
隔离
:每个操作互不影响,操作失败的行会返回其失败信息
用法
:bulk请求一次不要太大,会积压到内存中,性能会下降,一次请求几千个操作、大小在几M正好
Java api操作ElasticSearch
low
: 偏向底层
high
:高级封装
原生方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.3.0</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.3.0</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
@Test void contextLoads() throws IOException {
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder( new HttpHost("localhost",9200,"http") ));
GetRequest getRequest = new GetRequest("book","1");
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
System.out.println(response.getIndex()); System.out.println(response.getId()); System.out.println(response.getSourceAsString());
}
|
spring boot方式文档查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.3.0</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.3.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency>
</dependencies>
|
1 2 3 4 5 6 7
| spring: application: name: elasticsearch-server
xiaobo: elasticsearch: hostname: 127.0.0.1:9200
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| @Autowired private RestHighLevelClient restHighLevelClient;
@Test public void esTest() throws IOException {
GetRequest getRequest = new GetRequest("book","1");
GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
System.out.println(response.getIndex()); System.out.println(response.getId()); System.out.println(response.getSourceAsString());
}
@Test public void argumentsTest1() throws IOException {
GetRequest getRequest = new GetRequest("book","1");
String includes[] = new String[]{"name","price"}; String excludes[] = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
System.out.println(response.getSourceAsString()); }
@Test public void argumentsTest2() throws IOException { GetRequest getRequest = new GetRequest("book","1");
String includes1[] = Strings.EMPTY_ARRAY; String exclude1[] = new String[]{"name","price"};
FetchSourceContext fetchSourceContext = new FetchSourceContext(true,includes1,exclude1);
getRequest.fetchSourceContext(fetchSourceContext);
GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
System.out.println(response.getSourceAsString()); }
@Test public void asynTest() throws InterruptedException {
GetRequest getRequest = new GetRequest("book","1");
ActionListener<GetResponse> listener = new ActionListener<>() { @Override public void onResponse(GetResponse getResponse) { System.out.println(getResponse.getSourceAsString()); }
@Override public void onFailure(Exception e) { e.printStackTrace(); } };
restHighLevelClient.getAsync(getRequest, RequestOptions.DEFAULT, listener);
Thread.sleep(2000); }
|
spring boot 方式文档新增
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
@Test public void indexTest() throws IOException {
IndexRequest indexRequest = new IndexRequest("index_test"); indexRequest.id("2");
String jsonString="{\n" + " \"name\":\"xiaobo\",\n" + " \"postDate\":\"2022-11-04\",\n" + " \"age\":\"22\"\n" + "}";
indexRequest.source(jsonString, XContentType.JSON);
IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
if (response.getResult() == DocWriteResponse.Result.CREATED){ System.out.println("创建:" + response.getResult()); }else if (response.getResult() == DocWriteResponse.Result.UPDATED){ System.out.println("更新:" + response.getResult()); }
ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()){ System.out.println("处理成功的碎片不正常"); }
if (shardInfo.getFailed() > 0){ for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
System.out.println(failure.reason()); } }
}
|
spring boot文档修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@Test public void updateTest() throws IOException { UpdateRequest updateRequest = new UpdateRequest("index_test","1");
HashMap<String, Object> map = new HashMap<>();
map.put("name","wangyibo");
updateRequest.doc(map);
UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
if (response.getResult() == DocWriteResponse.Result.UPDATED) { System.out.println("更新:" + response.getResult()); }else if (response.getResult() == DocWriteResponse.Result.CREATED){ System.out.println("创建:" + response.getResult()); }else if (response.getResult() == DocWriteResponse.Result.DELETED){ System.out.println("删除:" + response.getResult()); }else if (response.getResult() == DocWriteResponse.Result.NOOP){ System.out.println("没有操作:" + response.getResult()); } }
|
spring boot文档删除
1 2 3 4 5 6 7 8 9 10 11
|
@Test public void deleteTest() throws IOException { DeleteRequest deleteRequest = new DeleteRequest("index_test","2");
DeleteResponse delete = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
}
|
spring boot文档批量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
@Test public void bulkTest() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("bulk_test_1") .id("1") .source(XContentType.JSON, "name", "xiaobo"));
bulkRequest.add(new IndexRequest("bulk_test_2") .id("2") .source(XContentType.JSON, "name", "wangyibo"));
bulkRequest.add(new UpdateRequest("bulk_test_1","1") .doc("name","wyb"));
bulkRequest.add(new DeleteRequest("bulk_test_2","2"));
BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : response) {
DocWriteResponse response1 = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()){ case INDEX: IndexResponse indexResponse = (IndexResponse) response1; System.out.println("创建:" + indexResponse.getResult()); break; case CREATE: IndexResponse indexResponse1 = (IndexResponse) response1; System.out.println("创建:" + indexResponse1.getResult()); break; case UPDATE: UpdateResponse updateResponse = (UpdateResponse) response1; System.out.println("更新:" + updateResponse.getResult()); break; case DELETE: DeleteResponse deleteResponse = (DeleteResponse) response1; System.out.println("删除:" + deleteResponse.getResult()); break; } }
}
|
es内部机制
es分布式基础
透明隐藏特性
分布式机制
:分布式数据存储及共享
分片机制
:数据存储到哪个分片,副本数据写入
集群发现机制
:cluster discovery。新启动es实例,自动加入集群
shard负载均衡
:大量数据写入及查询,es会将数据平均分配
shard副本
:新增副本数,分片重分配
垂直扩容与水平扩容
垂直扩容
:使用更加强大的服务器替代老服务器,但单机存储及运算能力有上线,且成本直线上升
水平扩容
:采购更多服务器,加入集群,大数据
节点对等的分布式架构
节点对等,每个节点都能接收所有的请求
自动请求路由
响应收集
分片shard、副本replica机制
shard & replica机制
每个index包含一个或多个shard
每个shard都是一个最小工作单元,承载部分数据,lucene实例,完整的建立索引和处理请求的能力
增减节点时,shard会自动在nodes中负载均衡
primary shard和replica shard,每个document肯定只存在于某一个primary shard以及其对应的replica shard中,不可能存在于多个primary shard
replica shard是primary shard的副本,负责容错,以及承担读请求负载
primary shard的数量在创建索引的时候就固定了,replica shard的数量可以随时修改
primary shard的默认数量是1,replica默认是1,默认共有2个shard,1个primary shard,1个replica shard
es7以前primary shard的默认数量是5,replica默认是1,默认有10个shard,5个primary shard,5个replica shard
primary shard不能和自己的replica shard放在同一个节点上(否则节点宕机,primary shard和副本都丢失,起不到容错的作用),但是可以和其他primary shard的replica shard放在同一个节点上
单node环境下创建index
单node环境下,创建一个index,有3个primary shard,3个replica shard
集群status是yellow
将3个primary shard分配到仅有的一个node上去,另外3个replica shard无法分配
集群可以正常工作,但是一旦出现节点宕机,数据全部丢失,而且集群不可用,无法承接任何请求
1 2 3 4 5 6 7
| PUT /test_index1 { "settings" : { "number_of_shards" : 3, "number_of_replicas" : 1 } }
|
2个node环境下replica shard
横向扩容
分片自动负载均衡,分片向空闲机器转移
每个节点存储更少分片,系统资源给与每个分片的资源更多,整体集群性能提高
扩容极限,节点数大于整体分片数,则必有空闲机器
超出扩容极限时,可以增加副本数,存储和搜索性能更强。容错性更好
容错性,只要一个索引的所有主分片在,集群就就可以运行
es容错机制 master选举,replica容错,数据恢复
master node宕机,自动master选举,集群为red
replica容错,新master将replica提升为primary shard,状态为yellow
重启宕机node,master 复制 replica到该结点,使用原有的shard并同步宕机后的修改,状态为green
文档存储机制
数据路由
文档存储如何路由到相应分片
文档最终会落在主分片的一个分片上,到底落到哪一个,这就是数据路由
路由算法
哈希值对主分片数取模
1
| shard = hash(routing) % number_of_primary_shards
|
手动指定路由
1 2 3
| PUT /test_index/_doc/1?routing=numbers{
}
|
文档的增删改内部机制
客户端选择一个node发送请求过去,这个node就是coordinating node(协调节点)
coordinating node,对document进行路由,将请求转发给对应的node
实际的node上的primary shard处理请求,然后将数据同步到replica node
coordinating node,如果发现primary node和所有replica node都搞定之后,就返回响应结果给客户端
文档的查询内部机制
客户端发送请求到任意一个node,成为coordinate node
coordinate node对document进行路由,将请求转发到对应的node,此时会使用round-robin随机轮询算法,在primary shard以及其所有replica中随机选择一个,让读请求负载均衡
接收请求的node返回document给coordinate node
coordinate node返回document给客户端
document如果还在建立索引过程中,可能只有primary shard有,任何一个replica shard都没有,此时可能会导致无法读取到document,但是document完成索引建立之后,primary shard和replica shard就都有了
Mapping映射
自动或手动为index中的_doc建立的一种数据结构和相关配置,简称为mapping映射
dynamic mapping,自动为我们建立index,以及对应的mapping,mapping中包含了每个field对应的数据类型,以及如何分词等设置
精确匹配与全文搜索
精确匹配
exact value,搜索的时候必须输入完全相同的内容不过
全文检索
只是匹配完整的一个值,可以对值进行拆分词语后(分词)进行匹配,也可以通过缩写、时态、大小写、同义词等进行匹配
cn vs china
like liked likes
Tom vs tom
like vs love
分词器 analyzer
切分词语,normalization(时态转换,单复数转换)就是一个标准化
analyzer组成
- character filter:在一段文本进行分词之前,先进行预处理
& –> and
I Love You –> I,Love,You
dogs –> dog,liked –> like
内置分词器
standard analyzer 标准分词器
simple analyzer 简单分词器
whitespace analyzer 空格分词器
language analyzer 特定的语言的分词器
query string根据字段分词
query string必须和index建立相同的analyzer进行分词
1 2 3 4 5
| GET /_analyze { "analyzer": "standard", "text": "Text to analyze 80" }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| { "tokens" : [ { "token" : "text", "start_offset" : 0, "end_offset" : 4, "type" : "<ALPHANUM>", "position" : 0 }, { "token" : "to", "start_offset" : 5, "end_offset" : 7, "type" : "<ALPHANUM>", "position" : 1 }, { "token" : "analyze", "start_offset" : 8, "end_offset" : 15, "type" : "<ALPHANUM>", "position" : 2 }, { "token" : "80", "start_offset" : 16, "end_offset" : 18, "type" : "<NUM>", "position" : 3 } ] }
|
token
实际存储的term 关键字
position
在此词条在原文本中的位置
start_offset/end_offset
字符在原始字符串中的位置
手动管理mapping
查询所有索引映射
GET /_mapping
手动创建映射
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| PUT book/_mapping { "properties": { "name": { "type": "text" }, "description": { "type": "text", "analyzer":"english", "search_analyzer":"english" }, "pic":{ "type":"text", "index":false }, "studymodel":{ "type":"text" } } }
|
通过analyzer属性指定分词器,analyzer是指在索引和搜索都使用english,如果单独想定义搜索时使用的分词器则可以通过search_analyzer属性
index属性指定是否索引,默认为index=true,即要进行索引,只有进行索引才可以从索引库搜索到,但是也有一些内容不需要索引,例如,商品图片地址只被用来展示图片,不进行搜索图片,此时可以将index设置为false
是否在source之外存储,每个文档索引后会在 ES中保存一份原始文档,存放在_source
中,一般情况下不需要设置store为true,因为在_source中已经有一份原始文档了
keyword关键字字段
取代了”index”: false,创建keyword字段的索引时是不进行分词的
date日期类型
format可以设置日期格式
修改映射
只能创建index时手动建立mapping,或者新增field mapping,但是不能update field mapping
因为已有数据按照映射早已分词存储好,如果修改,这些存量数据不知道该怎么处理
删除映射
通过删除索引来删除映射
复杂数据类型
multivalue field
{ “tags”: [ “tag1”, “tag2” ]}
empty field
null,[],[null]
object field
1 2 3 4 5 6 7 8 9 10 11
| PUT /person/_doc/1 { "address": { "country": "china", "province": "liaoning", "city": "tieling" }, "name": "xiaobo", "age": 22, "join_date": "2022-12-22" }
|
1 2 3 4 5 6 7 8 9 10
| { "address": { "country": "china", "province": "liaoning", "city": "tieling" }, "name": "xiaobo", "age": 22, "join_date": "2022-12-22" }
|
1 2 3 4 5 6 7 8
| { "name": [xiaobo], "age": [22], "join_date": [2022-12-22], "address.country": [china], "address.province": [liaoning], "address.city": [tieling] }
|
1 2 3 4 5 6 7
| { "authors": [ { "age": 22, "name": "xiaobo"}, { "age": 23, "name": "wangyibo"}, { "age": 21, "name": "wangyangnan"} ] }
|
1 2 3 4
| { "authors.age": [21,22,23], "authors.name": [xiaobo,wangyibo,wangyangnan] }
|
正确的开始 微小的长进 然后持续 嘿 我是小博 带你一起看我目之所及的世界……