Elasticsearch入门

版本提示:本文涉及的ES版本为5.4

1. 简介

Elasticsearch是一个实时分布式搜索和分析引擎,它让你已前所未有的速度处理大数据成为可能。它用于全文搜索,结构化搜索,分析以及将这三者混合使用。
Elasticsearch 也是使用 Java 编写的,它的内部使用 Lucene 做索引与搜索,但是它的目的是使全文检索变得简单, 通过隐藏 Lucene 的复杂性,取而代之的是提供一套简单一致的 RESTful API。

Elasticsearh可以被这样形容:

  • 一个分布式的实时文档存储,每个字段可以被索引与搜索
  • 一个分布式实时分析搜索引擎
  • 能胜任上百个服务节点的扩展,并支持 PB 级别的结构化或者非结构化数据

2. 核心元素

快速入门Demo

  • 面向文档(document oriented):Elasticsearch是面向文档的,它可以存储整个对象或文档。它不仅仅是存储,还会索引每个文档的内容。文档是不可变的:他们不能被修改,只能被替换。Elasticsearch使用JSON作为文档序列化格式。类比传统关系数据库:
类比 项1 项2 项3 项4
Relational DB Databases Tables Rows Columns
Elasticsearch Indices Types Documents Fields

3. 地心历险

3.1 关于分布式

  • 集群和节点:节点(node)是一个运行着的Elasticserach实例。集群(cluster)是一组具有相同cluster.name(配置在./config/elasticsearch.yml文件)的节点集合,他们协同工作,共享数据并提供故障转移和扩展功能,当然一个节点也可以组成一个集群。当加入新节点或者删除一个节点时,集群就会感知到并平衡数据。

  • 主节点:集群中一个节点会被选举为主节点,它将临时管理集群级别的一些变更,例如新建或删除索引、增加或移除节点等。主节点不参与文档级别的变更或搜索,这意味着在流量增长的时候,主节点不会成为集群的瓶颈。

  • 节点:任何节点都可以成为主节点,我们可以和集群中任何节点通信。每个节点都知道文档存在于哪个节点上,它们可以转发请求到相应的节点上。我们访问的节点负责收集各节点返回的数据,最后一起返回给客户端。这一切有Elasticsearch处理。

  • 分片(Shard):索引只是一个用来指向一个或多个分片的“逻辑命名空间(logical namespace)”。一个分片是一个最小级别的工作单元,它只保存了索引中所有数据的一部分。分片是Elasticsearch在集群中分发数据的关键。文档存储在分片中,然后分片分配到你集群中的节点上。当你的集群扩容或缩小,Elasticsearch将会自动在你的节点中迁移分片,以使集群保持平衡。
    分片有两种类型:主分片(primary shard) 或者是 复制分片(replica shard)。索引中的每个文档属于一个单独的主分片,所以主分片的数量决定了索引最多能存储多少数据。复制分片只是主分片的一个副本,它可以防止硬件故障导致的数据丢失,同时可以提供读请求,比如搜索或者从别的shard取回文档。复制分片与主分片不会分配到同一个节点上。文档的索引首先被存储在主分片中,然后并发复制到对应的复制节点上。

  • 乐观并发控制:如果在读写过程中数据发生了变化,更新操作将失败,这时候由程序决定在失败后如何解决冲突。ELasticsearch使用"_version"这个字段(每个文档都有)来保证所有修改都被正确排序。这个字段的值将会在文档被改变时加一。
    我们可以通过以下请求来保证数据不会因为修改冲突而丢失:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//修改_index=1 且 _version=1的文档
PUT /myTyte/index/1?version=1

//失败时返回409Http状态码,响应体如下
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[user][1]: version conflict, current version [7] is different than the one provided [5]",
"index_uuid": "XBy70gdWTJ6MQMVUCoQC-A",
"shard": "3",
"index": "myTyte"
}
],
"type": "version_conflict_engine_exception",
"reason": "[user][1]: version conflict, current version [7] is different than the one provided [5]",
"index_uuid": "XBy70gdWTJ6MQMVUCoQC-A",
"shard": "3",
"index": "myTyte"
},
"status": 409
}

当然也可以使用外部版本控制系统,一种常见的结构是使用一些其他的数据库作为主数据库,然后使用Elasticsearch搜索数据,当主数据库发生变化,就要拷贝到Elasticsearch中。如果主数据库有版本字段,或者是timstamp等可以用于版本控制的字段,可以在QueryString后加version_type=external来使用这些版本号。外部版本号与内部版本号检查不一样,内部版本号检查相等,外部版本号检查是否小于指定版本。

1
2
//使用外部版本号
PUT /myTyte/index/1?version=1&version_type=external

3.2 分布式文档存储

路由文档

通过以下公式决定文档路由到哪一个分片

1
shard = hash(routing) % number_of_primary_shards

routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值。 routing 通过 hash 函数生成一个数字,然后这个数字再除以 number_of_primary_shards (主分片的数量)后得到 余数 。这个分布在 0 到 number_of_primary_shards-1 之间的余数,就是我们所寻求的文档所在分片的位置。

consistency
在默认设置下,即使仅仅是在试图执行一个_写_操作之前,主分片都会要求 必须要有 _规定数量(quorum)_(或者换种说法,也即必须要有大多数)的分片副本处于活跃可用状态,才会去执行_写_操作(其中分片副本可以是主分片或者副本分片)。这是为了避免在发生网络分区故障(network partition)的时候进行_写_操作,进而导致数据不一致。_规定数量_即:

1
int( (primary + number_of_replicas) / 2 ) + 1

consistency 参数的值可以设为
one (只要主分片状态 ok 就允许执行_写_操作);
all(必须要主分片和所有副本分片的状态没问题才允许执行_写_操作);
quorum 。默认值为 quorum ,即大多数的分片副本状态没问题就允许执行_写_操作。

NOTE: 新索引默认有 1 个副本分片,这意味着为满足 规定数量 应该 需要两个活动的分片副本。 但是,这些默认的设置会阻止我们在单一节点上做任何事情。为了避免这个问题,要求只有当 number_of_replicas 大于1的时候,规定数量才会执行。

timeout
如果没有足够的副本分片会发生什么? Elasticsearch会等待,希望更多的分片出现。默认情况下,它最多等待1分钟。 如果你需要,你可以使用 timeout 参数 使它更早终止: 100(ms),30s。

取文档

取回单个文档
步骤:协调节点确定文档所属分片,转发请求到相应分片的节点(轮询所有的副本分片来达到负载均衡),返回文档;

搜索

GET /_search?timeout=10ms

搜索可以指定超时,在请求超时之前,Elasticsearch 将会返回已经成功从每个分片获取的结果。
应当注意的是 timeout 不是停止执行查询,它仅仅是告知正在协调的节点返回到目前为止收集的结果并且关闭连接。在后台,其他的分片可能仍在执行查询即使是结果已经被发送了。
使用超时是因为 SLA(服务等级协议)对你是很重要的,而不是因为想去中止长时间运行的查询。

分布式系统中的深度分页问题,限制查询量不要超过1000;

分析与分析器
索引文本和查询字符串必须标准化为同样的格式。
分析器包含三个功能:字符过滤器(例如去除html),分词器(拆分文本为token),Token过滤器(例如转化大小写,同义词关联,转化词根等)

核心简单域类型:

  • 字符串 : string
  • 整数 : byte, short, integer, long
  • 浮点数: float, double
  • 布尔型: boolean
  • 日期: date

复杂核心域类型

3.3 Modules

线程池

线程池分为两种:

  1. fixed
    固定线程池拥有固定数量的线程来处理任务,并且自带一个可选有界的队列。
    size参数控制线程数量,默认数量是5。
    queue_size控制队列容量,默认会被设置为-1,表示无界。当有请求到来且队列是满的,请求将会被中断。
1
2
3
4
thread_pool:
index:
size: 30
queue_size: 1000
  1. scaling
    可扩展线程拥有动态数量的线程。线程数量和工作负载成比例,且在coremax参数之间变化
1
2
3
4
5
thread_pool:
warmer:
core: 1
max: 8
keep_alive: 2m

主要的线程池如下:
generic
通用操作,比如后台节点的恢复,线程池类型是scaling

index
index/delete操作。线程池的类型是fixed,线程数量等于CPU核心数,队列容量为200;

search
count/search/suggest操作。线程池的类型是fixed,线程数量等于(CPU核心数 * 3) / 2) + 1,队列容量为1000;

get
get操作。线程数量等于CPU核心数,队列容量为1000;

bulk
bulk操作。线程数量等于CPU核心数,队列容量为200;


4. 实战

4.1 插件

  • Marvel

4.2 命令

  • 集群健康
1
2
//集群健康有三种状态:green、yellow、red
GET /_cluster/health
  1. green
    所有的主分片和副本分片都正常运行。

  2. yellow
    所有的主分片都正常运行,但不是所有的副本分片都正常运行。

  3. red
    有主分片没能正常运行。

4.3 基础操作

几乎所有操作可以通过http请求来实现
curl -X${m} ${url} -d ${json}

  • 索引文档
    在Elasticsearch中存储数据的行为就叫做索引(indexing)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//客户端请求:
PUT localhost:9200/myType/user/1
{
"id": 1,
"name": "oabern",
"age": 22,
"address": {
"city": "cq",
"emailNo": 9527
},
"about": "I love to go rock climbing",
"interests": [
"photography",
"programing"
]
}

//服务端响应
{
"_index": "myType",
"_type": "user",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}

需要更新文档时只要重新put,此时响应结果中的"_version"会自增。

  • 局部更新
1
2
3
4
5
6
7
8
9
10
POST /myType/user/1/_update
{
"doc":{
"age": 21,
"address": {
"city": "cqcq"
},
"addField":"default"
}
}

文档是不可变的:他们不能被修改,只能被替换。
更新操作过程:

  1. 从旧文档构建 JSON;
  2. 更改该 JSON;
  3. 删除旧文档;
  4. 索引一个新文档;
    将会合并到现有文档中,对象合并在一起,存在的标量字段被覆盖,新字段被添加。
    更多复杂的局部更新操作可以使用脚本(Groovy)来实现。
  • 检索文档
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//客户端请求
GET localhost:9200/myType/user/1

//服务端响应
{
"_index": "myType",
"_type": "user",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"id": 1,
"name": "oabern",
"age": 22,
"address": {
"city": "cq",
"emailNo": 9527
},
"about": "I love to go rock climbing",
"interests": [
"photography",
"programing"
]
}
}
  • 其他操作
  1. 删除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//客户端请求
//DELETE localhost:9200/myType/user/1

//服务端响应
{
"found": true,
"_index": "myType",
"_type": "user",
"_id": "1",
"_version": 3,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}
  1. 存在
1
2
3
4
5
6
//客户端请求
HEAD localhost:9200/myType/user/1

//服务端响应(无json),
//存在返回状态码200,
//不存在返回状态码404
  1. 获取多个
1
2
3
4
5
6
7
8
9
10
GET /_mget
{
"docs":[
{
"_index":"",
"_type":"",
"_id":""
}
]
}
  1. 全部搜索
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//客户端请求
//查询所有文档
GET localhost:9200/myType/user/_search

//服务端响应
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1,
"hits": [
{
"_index": "myType",
"_type": "user",
"_id": "2",
"_score": 1,
"_source": {
"id": 2,
"name": "miyakee",
"age": 21,
"address": {
"city": "cq",
"emailNo": 9527
},
"about": "I love to collect rock albums",
"interests": [
"paint",
"ukulele"
]
}
}
],
//……省略
}
}

默认返回前10个得分最高的结果。

  1. 带条件搜索(Query String)
1
GET localhost:9200/myTyte/user/_search?q=age:21
  1. 带条件搜索(DSL)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//客户端请求
GET localhost:9200/myTyte/user/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"address.city": "cq"
}
},
{
"match": {
"about": "rock climbing"
}
}
]
}
}
}

//服务端响应
{
"took": 8,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 0.82252765,
"hits": [
{
"_index": "myTyte",
"_type": "user",
"_id": "1",
"_score": 0.82252765,
"_source": {
"id": 1,
"name": "oabern",
"age": 22,
"address": {
"city": "cq",
"emailNo": 9527
},
"about": "I love to go rock climbing",
"interests": [
"photography",
"programing"
]
}
},
{
"_index": "myTyte",
"_type": "user",
"_id": "2",
"_score": 0.55510485,
"_source": {
"id": 2,
"name": "miyakee",
"age": 21,
"address": {
"city": "cq",
"emailNo": 9527
},
"about": "I love to collect rock albums",
"interests": [
"paint",
"ukulele"
]
}
}
]
}
}

默认情况下,Elasticsearch根据结果相关性评分来对结果集进行排序,所谓的「结果相关性评分」就是文档与查询条件的匹配程度。

  1. 高亮搜索
  • 分析(聚合Aggregations)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//客户端请求
//统计每个兴趣下的人数,以及平均年龄
GET localhost:9200/myTyte/user/_search
{
"aggs": {
"all_interests": {
"terms": {
"field": "interests"
},
"aggs": {
"avg_age": {
"avg": {
"field": "age"
}
}
}
}
}
}

//服务端响应
{
//……省略,
"aggregations": {
"all_interests": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "photography",
"doc_count": 2,
"avg_age": {
"value": 22
}
},
{
"key": "climbing",
"doc_count": 1,
"avg_age": {
"value": 22
}
},
{
"key": "paint",
"doc_count": 1,
"avg_age": {
"value": 21
}
},
{
"key": "programing",
"doc_count": 1,
"avg_age": {
"value": 22
}
},
{
"key": "rock",
"doc_count": 1,
"avg_age": {
"value": 22
}
},
{
"key": "ukulele",
"doc_count": 1,
"avg_age": {
"value": 21
}
}
]
}
}
}

5. 参考资料

Elasticsearch Reference
Elasticsearch: 权威指南
搭建elasticsearch,并同步mysql数据
Elasticsearch搜索类型(query type)详解
Understanding “Query Then Fetch” vs “DFS Query Then Fetch”
ELK:kibana使用的lucene查询语法


Elasticsearch入门
https://oabern.github.io/posts/201802021020867394/
作者
OABern
发布于
2018年2月2日
许可协议