Elasticsearch Script度量聚集教程
前面有两篇博文详细介绍了Elasticsearch的度量聚集。本文补充介绍脚本度量,实现使用脚本自定义逻辑提供度量输出。
1. 语法介绍
这里通过示例代码来说明:
POST ledger/_search?size=0
{
"aggs": {
"profit": {
"scripted_metric": {
"init_script" : {
"id": "my_init_script"
},
"map_script" : {
"id": "my_map_script"
},
"combine_script" : {
"id": "my_combine_script"
},
"params": {
"field": "amount"
},
"reduce_script" : {
"id": "my_reduce_script"
}
}
}
}
}
上面示例的脚本使用存储脚本,也可以是内联脚本。首先我们介绍脚本范围,共分为四个阶段执行:
init_script
对所有集合的文档最先执行的脚本,可选脚本,一般用于设置初始化状态。
map_script
每个集合文档执行一次。必须提供脚本。
combine_script
在集合文档脚本执行之后,每个分片执行一次。必须提供脚本,用于从每个分片返回信息。
reduce_script
在所以分片返回结果后相应节点上执行一次。必须提供脚本。脚本能访问combine_script的数组类型结果。
返回值类型
虽然任何有效脚本对象都可以在单个脚本中使用,但脚本必须仅返回或存储在状态对象中以下类型:
- 基本类型
- String类型
- Map(包含key和这里列举的类型值)
- Array(包括这里列举类型值元素)
params
可选,用于给init_script, map_script 、 combine_script 三个脚本传递参数变量。让用户可以控制聚集的行为,在不同脚本之间存储状态。缺省值为:"params" : {}
2. 示例
定义映射
PUT ledger
{
"mappings": {
"properties": {
"type": {
"type": "keyword"
},
"amount": {
"type": "integer"
}
}
}
}
导入示例数据
PUT /ledger/_bulk?refresh
{"index":{"_id":1}}
{"type": "sale","amount": 80}
{"index":{"_id":2}}
{"type": "cost","amount": 10}
{"index":{"_id":3}}
{"type": "cost","amount": 30}
{"index":{"_id":4}}
{"type": "sale","amount": 130}
实现需求
使用脚本聚集计算利润:销售额-成本。实现代码:
POST ledger/_search?size=0
{
"query" : {
"match_all" : {}
},
"aggs": {
"profit": {
"scripted_metric": {
"init_script" : "state.transactions = []",
"map_script" : "state.transactions.add(doc['type'].value == 'sale' ? doc['amount'].value : -1 * doc['amount'].value)",
"combine_script" : "double profit = 0; for (t in state.transactions) { profit += t } return profit",
"reduce_script" : "double profit = 0; for (a in states) { profit += a } return profit"
}
}
}
}
返回结果:
{
"took" : 10,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"profit" : {
"value" : 170.0
}
}
}
过程解释
- 分片说明
这里共四条数据:
{"index":{"_id":1}}
{"type": "sale","amount": 80}
{"index":{"_id":2}}
{"type": "cost","amount": 10}
{"index":{"_id":3}}
{"type": "cost","amount": 30}
{"index":{"_id":4}}
{"type": "sale","amount": 130}
假设第一、三两天在Shard A,另外两条在Shard B .下面我们详细对每一步骤进行说明。
- 初始化(init_script)之前
state初始化为一个空对象。
"state" : {}
- 初始化(init_script)之后
任何集合文档执行之前每个分片执行一次,因此在每个分片上产生一个对象:
Shard A
"state" : {
"transactions" : []
}
Shard B
"state" : {
"transactions" : []
}
- map_script之后
每个分片对每个文档运行map_script脚本:
Shard A
"state" : {
"transactions" : [ 80, -30 ]
}
Shard B
"state" : {
"transactions" : [ -10, 130 ]
}
- combine_script 之后
文档收集之后,combine_script脚本在每个分片上执行,通过累加事务数组的值计算每个分片的利润并传给响应节点:
Shard A
50
Shard B
120
- reduce_script 之后
reduce_script脚本收到包括合并每个分片的结果的数组states
:
"states" : [
50,
120
]
最后通过累加聚集上面数组的值,生成最终返回的利润结果:
{
...
"aggregations": {
"profit": {
"value": 170
}
}
}
到此详细的执行流程解释完成。最后需要说明的是如果脚本聚集的父分组没有收集到任何记录,那么响应会是null,因此处理脚本聚集时需要考虑这种情况。
3. 总结
本文介绍了Elasticsearch的脚本聚集,详细介绍了其语法,并通过示例完整说明了其map-reduce的执行过程。
本文参考链接:https://blog.csdn.net/neweastsun/article/details/105159833