Skip to main content
 首页 » 编程设计

Elasticsearch 组合聚集(Composite aggregation)实现交叉分析

2022年07月19日122oomusou

Elasticsearch 组合聚集(Composite aggregation)实现交叉分析

实际分析应用中经常遇到需要交叉表的情况,它能够从不同维度组合分析业务。关系型数据库一般通过 group byPivot实现,很幸运,Elasticsearch也有类似功能。本文带你了解强大的组合分析,其中示例大多数来自官网。

1. 认识组合聚集

组合聚集(Composite aggregation) —— 从不同来源创建组合分组,属于多组聚集分析。与其他的多组聚集分析不同,它可以实现多聚集结果进行分页,其提供了对结果进行流化处理,类似于对文档实现滚动操作。

组合分组是从每个文档中抽取特定值进行组合,每个组合作为一个分组。举例:

{
    
  "keyword": ["foo", "bar"], 
  "number": [23, 65, 76] 
} 

如果把文档中的 keywordnumber 作为值来源进行分组,结果为:

{
    "keyword": "foo", "number": 23 } 
{
    "keyword": "foo", "number": 65 } 
{
    "keyword": "foo", "number": 76 } 
{
    "keyword": "bar", "number": 23 } 
{
    "keyword": "bar", "number": 65 } 
{
    "keyword": "bar", "number": 76 } 

2. 值来源

sources 参数控制分组的数据来源,sources中定义的顺序很重要,因为它影响结果的返回顺序。每个来源的名称必须唯一,主要有三中类型的值来源。

2.1 关键词Terms

terms值来源相当于简单的 terms聚集,和聚集类似它的值可以从字段或脚本中抽取。举例:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    "product": {
    "terms": {
    "field": "product" } } } 
        ] 
      } 
    } 
  } 
} 

当然也可以使用脚本创建组合聚集的值来源:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    
            "product": {
    
              "terms": {
    
                "script": {
    
                  "source": "doc['product'].value", 
                  "lang": "painless" 
                } 
              } 
            } 
          } 
        ] 
      } 
    } 
  } 
} 

2.2 直方图 Histogram

直方图可以为日期直方图和数值直方图。

2.2.1 数值直方图

直方图值来源可应用与数值类型构建固定大小的间隔,interval参数定义数据如何分组转换。举例,如果 interval设置为 5 ,则任何数值将转换至其最近的间隔分组,101 会 被转为 100,因为其在 100105之间的分组。举例:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    "histo": {
    "histogram": {
    "field": "price", "interval": 5 } } } 
        ] 
      } 
    } 
  } 
} 

同样也可以使用脚本实现:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    
            "histo": {
    
              "histogram": {
    
                "interval": 5, 
                "script": {
    
                  "source": "doc['price'].value", 
                  "lang": "painless" 
                } 
              } 
            } 
          } 
        ] 
      } 
    } 
  } 
} 

2.2.2 日期直方图

date_histogram 与上面 histogram类似,处理值来源为日期类型:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    "date": {
    "date_histogram": {
    "field": "timestamp", "calendar_interval": "1d" } } } 
        ] 
      } 
    } 
  } 
} 

上面示例使用每天作为间隔转换字段 timestamp的值至最近的间隔。其他间隔参数还有 year, quarter, month, week, day, hour, minute, second。当然可以通过缩写进行表示,但不支持小数(我们可以使用90m代替 1.5h)。

日期格式

日期数据是使用64位数值进行表示,即从epoch时间以来的毫秒数。默认该数值返回作为分组的 key。我们可以通过 format参数制定日期格式:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    
            "date": {
    
              "date_histogram": {
    
                "field": "timestamp", 
                "calendar_interval": "1d", 
                "format": "yyyy-MM-dd"          
              } 
            } 
          } 
        ] 
      } 
    } 
  } 
} 

时区

Elasticsearch 使用UTC格式存储日期时间,缺省所有的分组和凑整也按照UTC中完成。使用time_zone参数指示分组应该使用不同的时区。可以通过 ISO 8601 UTC (如:+01:00or-08:00),也可以使用时区ID, 如:America/Los_Angeles

偏移量

使用 offset 参数可以改变每个分组的初始值,通过制定整数 (+) 或负数 (-) 偏移量,如 1h 表示1小时, 1d 表示1天。举例,使用 day作为时间间隔,每个分组时间从午夜到午夜。设置 offset 为 +6h则改变每个分组跨度为早晨6点到下一个早晨6点:

PUT my-index-000001/_doc/1?refresh 
{
    
  "date": "2015-10-01T05:30:00Z" 
} 
 
PUT my-index-000001/_doc/2?refresh 
{
    
  "date": "2015-10-01T06:30:00Z" 
} 
 
GET my-index-000001/_search?size=0 
{
    
  "aggs": {
    
    "my_buckets": {
    
      "composite" : {
    
        "sources" : [ 
          {
    
            "date": {
    
              "date_histogram" : {
    
                "field": "date", 
                "calendar_interval": "day", 
                "offset": "+6h", 
                "format": "iso8601" 
              } 
            } 
          } 
        ] 
      } 
    } 
  } 
} 

如果采用默认偏移量,则返回一个分组结果;上面的示例返回结果从6am开始,有两个分组结果:

{
    
  ... 
  "aggregations": {
    
    "my_buckets": {
    
      "after_key": {
    "date": "2015-10-01T06:00:00.000Z" }, 
      "buckets": [ 
        {
    
          "key": {
    "date": "2015-09-30T06:00:00.000Z" }, 
          "doc_count": 1 
        }, 
        {
    
          "key": {
    "date": "2015-10-01T06:00:00.000Z" }, 
          "doc_count": 1 
        } 
      ] 
    } 
  } 
} 

注:每个桶的开始偏移量是在time_zone调整之后计算的。

2.3 地理块

实际应用中暂时用不到,为了文档完整性列出标题,读者有兴趣可以参阅

2.4 组合不同来源

sources参数接收值来源数组。其可以混合不同的值来源创建组合分组,举例:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    "date": {
    "date_histogram": {
    "field": "timestamp", "calendar_interval": "1d" } } }, 
          {
    "product": {
    "terms": {
    "field": "product" } } } 
        ] 
      } 
    } 
  } 
} 

上面组合分组创建了两个值来源,date_histogramterms ,每个分组有两个值组成,分别来自不同聚集值。任何类型的组合都可以,并且数组顺序在组合分组中保持不变。

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    "shop": {
    "terms": {
    "field": "shop" } } }, 
          {
    "product": {
    "terms": {
    "field": "product" } } }, 
          {
    "date": {
    "date_histogram": {
    "field": "timestamp", "calendar_interval": "1d" } } } 
        ] 
      } 
    } 
  } 
} 

3.6 对应Java Api

通过API增加不同来源:

List<CompositeValuesSourceBuilder<?>> sources = new ArrayList<>(); 
sources.add(new TermsValuesSourceBuilder("field1").field("field1")); 
sources.add(new TermsValuesSourceBuilder("field2").field("field2")); 
sources.add(new TermsValuesSourceBuilder("field3").field("field3"));CompositeAggregationBuilder compositeAggregationBuilder = 
        new CompositeAggregationBuilder("byProductAttributes", sources).size(10000); 

设置子聚集:

compositeAggregationBuilder 
        .subAggregation(AggregationBuilders.min("sub-field1").field("sub-field1")) 
        .subAggregation(AggregationBuilders.max("sub-field2").field("sub-field2")) 
searchSourceBuilder.aggregation(compositeAggregationBuilder); 

3. 其他选项

3.1 排序 (sort)

缺省分组时按照自然顺序排序,按照值的升序排列。当有多个值来源时,排序是按照每个值来源排序的,一个分组的第一个值与另一个分组的第一个进行比较,如果两者相等,则根据分组中的下一个值进行比较。这意味着 [foo, 100] 小于 [foobar, 0],因为 foo 小于 foobar。我们可以通过给每个值来源定义排序方向,设置 orderasc 或 desc,举例:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    "date": {
    "date_histogram": {
    "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } }, 
          {
    "product": {
    "terms": {
    "field": "product", "order": "asc" } } } 
        ] 
      } 
    } 
  } 
} 

上面示例日期直方图采用降序,而term来源分组按照升序排序。

3.2 缺失分组 (missing bucket)

给给定源中文档没有值缺省被忽略,我们也可以通过设置 missing_bucket参数为 true(默认为false),使得响应中包括缺省值文档。

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    "product_name": {
    "terms": {
    "field": "product", "missing_bucket": true } } } 
        ] 
      } 
    } 
  } 
} 

在上面的示例中,源product_name将为product字段没有值的文档显式设置为nullsort参数指示null值应该排在第一(升序,asc)还是最后(降序,desc)。

3.3 大小 (size)

size 参数可以定义返回多个组合分组。每个组合分组作为一个分组,所以设置为 10则返回前十个分组。缺省值为10。

3.4 分页

如果组合分组数量太大(或未知)而无法在单个响应中返回,则可将检索拆分为多个请求。由于组合分组实际上是平的,所以请求的大小正好是响应中返回的分组数量(假设它们就是要返回分组的大小)。如果要检索所有组合分组,那么最好使用较小的大小(例如100或1000),然后使用after参数检索下一个结果。例如:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "size": 2, 
        "sources": [ 
          {
    "date": {
    "date_histogram": {
    "field": "timestamp", "calendar_interval": "1d" } } }, 
          {
    "product": {
    "terms": {
    "field": "product" } } } 
        ] 
      } 
    } 
  } 
} 

返回结果:

{
    
  ... 
  "aggregations": {
    
    "my_buckets": {
    
      "after_key": {
    
        "date": 1494288000000, 
        "product": "mad max" 
      }, 
      "buckets": [ 
        {
    
          "key": {
    
            "date": 1494201600000, 
            "product": "rocky" 
          }, 
          "doc_count": 1 
        }, 
        {
    
          "key": {
    
            "date": 1494288000000, 
            "product": "mad max" 
          }, 
          "doc_count": 2 
        } 
      ] 
    } 
  } 
} 

需要达到下一组分组,重新发送请求,使用 after参数,设置为上次返回结果中 after_key的值。举例:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "size": 2, 
        "sources": [ 
          {
    "date": {
    "date_histogram": {
    "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } }, 
          {
    "product": {
    "terms": {
    "field": "product", "order": "asc" } } } 
        ], 
        "after": {
    "date": 1494288000000, "product": "mad max" }  
      } 
    } 
  } 
} 

after_key通常是响应中返回的最后一个分组的键,但这并不能保证。始终使用返回的after_key,而不是从结果中删除它。

3.5 子聚集

和其他多分组聚集一样,组合聚集也支持子聚集。可以通过子聚集技术其他分组或分析每个子分组。举例,下面示例技术每个分组的平均值:

GET /_search 
{
    
  "size": 0, 
  "aggs": {
    
    "my_buckets": {
    
      "composite": {
    
        "sources": [ 
          {
    "date": {
    "date_histogram": {
    "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } }, 
          {
    "product": {
    "terms": {
    "field": "product" } } } 
        ] 
      }, 
      "aggregations": {
    
        "the_avg": {
    
          "avg": {
    "field": "price" } 
        } 
      } 
    } 
  } 
} 

返回结果:

{
    
  ... 
  "aggregations": {
    
    "my_buckets": {
    
      "after_key": {
    
        "date": 1494201600000, 
        "product": "rocky" 
      }, 
      "buckets": [ 
        {
    
          "key": {
    
            "date": 1494460800000, 
            "product": "apocalypse now" 
          }, 
          "doc_count": 1, 
          "the_avg": {
    
            "value": 10.0 
          } 
        }, 
        {
    
          "key": {
    
            "date": 1494374400000, 
            "product": "mad max" 
          }, 
          "doc_count": 1, 
          "the_avg": {
    
            "value": 27.0 
          } 
        }, 
        {
    
          "key": {
    
            "date": 1494288000000, 
            "product": "mad max" 
          }, 
          "doc_count": 2, 
          "the_avg": {
    
            "value": 22.5 
          } 
        }, 
        {
    
          "key": {
    
            "date": 1494201600000, 
            "product": "rocky" 
          }, 
          "doc_count": 1, 
          "the_avg": {
    
            "value": 10.0 
          } 
        } 
      ] 
    } 
  } 
} 

注意:当前组合分组不支持管道聚集 (pipeline聚集)。

4. 组合分组应用示例

通过上面学习,我了解了组合分组是Elasticsearch中非常强大分析特性及分页能力,下面通过示例进行实战。

需求:一家在线匹萨公司的业务数据分析。主要实现三个方面功能:对结果进行分页展示,对与每个地址创建日期直方图分析,每天匹萨的平均数量。

4.1 数据准备

首先定义mapping,为了简化,这里直接把地址设置为一个字段:

PUT /pizza 
{
    
  "mappings" : {
    
      "properties": {
    
      "full_address": {
    
        "type": "keyword" 
      }, 
      "order" : {
    
        "type" : "text" 
      }, 
      "num_pizzas" : {
    
        "type" : "integer" 
      }, 
      "timestamp" : {
    
        "type" : "date" 
      } 
    } 
  } 
} 

插入示例数据:

POST /pizza/_bulk 
{
    "index": {
    "_id": 1 }} 
{
   "full_address" : "355 First St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 2, "timestamp": "2018-04-10T12:25" } 
{
    "index": {
    "_id": 2 }} 
{
   "full_address" : "961 Union St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 3, "timestamp": "2018-04-11T12:25" } 
{
    "index": {
    "_id": 3 }} 
{
   "full_address" : "123 Baker St, San Francisco, CA", "order" : "vegan", "num_pizzas" : 1, "timestamp": "2018-04-18T12:25" } 
{
    "index": {
    "_id": 4 }} 
{
   "full_address" : "1700 Powell St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 5, "timestamp": "2018-04-18T12:25" } 
{
    "index": {
    "_id": 5 }} 
{
   "full_address" : "900 Union St, San Francisco, CA", "order" : "pepperoni", "num_pizzas" : 4, "timestamp": "2018-04-18T12:25" } 
{
    "index": {
    "_id": 6 }} 
{
   "full_address" : "355 First St, San Francisco, CA", "order" : "pepperoni", "num_pizzas" : 3, "timestamp": "2018-04-10T12:25" } 
{
    "index": {
    "_id": 7 }} 
{
   "full_address" : "961 Union St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 1, "timestamp": "2018-04-12T12:25" } 
{
    "index": {
    "_id": 8 }} 
{
   "full_address" : "100 First St, San Francisco, CA", "order" : "pepperoni", "num_pizzas" : 3, "timestamp": "2018-04-11T12:25" } 
{
    "index": {
    "_id": 9 }} 
{
   "full_address" : "101 First St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 5, "timestamp": "2018-04-11T12:25" } 
{
    "index": {
    "_id": 10 }} 
{
   "full_address" : "355 First St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 10, "timestamp": "2018-04-10T12:25" } 
{
    "index": {
    "_id": 11 }} 
{
   "full_address" : "100 First St, San Francisco, CA", "order" : "pepperoni", "num_pizzas" : 4, "timestamp": "2018-04-11T14:25" } 

4.2 分页查询

实际应用中数据量非常大,如果需要查看中间页数据,其他聚集不方便实现。

GET /pizza/_search 
{
    
  "size": 0, 
  "track_total_hits": false, 
  "aggs": {
    
    "group_by_deliveries": {
    
      "composite": {
    
        "size": 3, 
        "sources": [ 
          {
    
            "by_address": {
    
              "terms": {
    
                "field": "full_address" 
              } 
            } 
          } 
        ] 
      } 
    } 
  } 
} 

上面结果展示第一页,注意我们设置size参数为3:

Address Deliveries
100 First St, San Francisco, CA 2
101 First St, San Francisco, CA 1
123 Baker St, San Francisco, CA 1

现在我们需要展示下一页结果。既然 123 Baker St, San Francisco, CA 在前面查询中是最后一条结果,因此现在查询中的参数 after设置为该值:

GET /pizza/_search 
{
    
  "size" : 0, 
  "track_total_hits" : false, 
  "aggs" : {
    
    "group_by_deliveries": {
    
      "composite" : {
    
        "after" : {
    "by_address" : "123 Baker St, San Francisco, CA" }, 
        "size": 3, 
        "sources" : [ 
          {
    "by_address": {
    "terms" : {
    "field": "full_address" } } } 
        ] 
      } 
    }  
  }  
} 

注,我们还可以设置 track_total_hits 参数的值为 false,它让Elasticsearch 在查询到足够的分组后尽快结束查询。

4.3 对每个地址创建日期直方图

这里我们设置多个源,分析每个地址每天的订单量:

GET /pizza/_search 
{
    
  "size": 0, 
  "track_total_hits": false, 
  "aggs": {
    
    "group_by_deliveries": {
    
      "composite": {
    
        "size": 3, 
        "sources": [ 
          {
    
            "by_address": {
    
              "terms": {
    
                "field": "full_address" 
              } 
            } 
          }, 
          {
    
            "histogram": {
    
              "date_histogram": {
    
                "field": "timestamp", 
                "calendar_interval": "1d", 
                "format": "yyyy-MM-dd" 
              } 
            } 
          } 
        ] 
      } 
    } 
  } 
} 

返回结果:

{
    
  "took" : 6, 
  "timed_out" : false, 
  "_shards" : {
    
    "total" : 1, 
    "successful" : 1, 
    "skipped" : 0, 
    "failed" : 0 
  }, 
  "hits" : {
    
    "max_score" : null, 
    "hits" : [ ] 
  }, 
  "aggregations" : {
    
    "group_by_deliveries" : {
    
      "after_key" : {
    
        "by_address" : "123 Baker St, San Francisco, CA", 
        "histogram" : "2018-04-18" 
      }, 
      "buckets" : [ 
        {
    
          "key" : {
    
            "by_address" : "100 First St, San Francisco, CA", 
            "histogram" : "2018-04-11" 
          }, 
          "doc_count" : 2 
        }, 
        {
    
          "key" : {
    
            "by_address" : "101 First St, San Francisco, CA", 
            "histogram" : "2018-04-11" 
          }, 
          "doc_count" : 1 
        }, 
        {
    
          "key" : {
    
            "by_address" : "123 Baker St, San Francisco, CA", 
            "histogram" : "2018-04-18" 
          }, 
          "doc_count" : 1 
        } 
      ] 
    } 
  } 
} 

上面示例给每个地址创建了一个直方图分组。我们还可以使用 after参数,查询下一页结果集:

GET /pizza/_search 
{
    
  "size": 0, 
  "track_total_hits": false, 
  "aggs": {
    
    "group_by_deliveries": {
    
      "composite": {
    
        "size": 3, 
        "after": {
    
          "by_address": "123 Baker St, San Francisco, CA", 
          "histogram": "2018-04-18" 
        }, 
        "sources": [ 
          {
    
            "by_address": {
    
              "terms": {
    
                "field": "full_address" 
              } 
            } 
          }, 
          {
    
            "histogram": {
    
              "date_histogram": {
    
                "field": "timestamp", 
                "calendar_interval": "1d", 
                "format": "yyyy-MM-dd" 
              } 
            } 
          } 
        ] 
      } 
    } 
  } 
} 

4.4 每天平均匹萨销售数量

为了分析每天每个地址的销售的匹萨数量,我们可以给组合聚集增加子聚集计算平均值:

GET /pizza/_search?size=0 
{
    
  "track_total_hits" : false, 
  "aggs" : {
    
    "group_by_deliveries": {
    
      "composite" : {
    
        "size": 3, 
        "sources" : [ 
          {
    "by_address": {
    "terms" : {
    "field": "full_address" } } }, 
          {
    "histogram": {
    "date_histogram" : {
    "field": "timestamp", "calendar_interval": "1d" } } } 
        ] 
      }, 
      "aggregations": {
    
        "avg_pizzas_per_day": {
    
           "avg": {
    "field": "num_pizzas" } 
        } 
      } 
    }  
  }  
} 
 
 

返回结果:

{
    
  ... 
  "aggregations" : {
   js 
    "group_by_deliveries" : {
    
      "after_key" : {
    
        "by_address" : "123 Baker St, San Francisco, CA", 
        "histogram" : 1524009600000 
      }, 
      "buckets" : [ 
        {
    
          "key" : {
    
            "by_address" : "100 First St, San Francisco, CA", 
            "histogram" : 1523404800000 
          }, 
          "doc_count" : 2, 
          "avg_pizzas_per_day" : {
    
            "value" : 3.5 
          } 
        }, 
        {
    
          "key" : {
    
            "by_address" : "101 First St, San Francisco, CA", 
            "histogram" : 1523404800000 
          }, 
          "doc_count" : 1, 
          "avg_pizzas_per_day" : {
    
            "value" : 5.0 
          } 
        }, 
        {
    
          "key" : {
    
            "by_address" : "123 Baker St, San Francisco, CA", 
            "histogram" : 1524009600000 
          }, 
          "doc_count" : 1, 
          "avg_pizzas_per_day" : {
    
            "value" : 1.0 
          } 
        } 
      ] 
    } 
  } 
} 

5. 总结

组合聚集是Elasticsearch中强大特性,可以实现对大量聚集结果进行分页,使得响应时间变得可确定。通过组合不同来源,实现对数据从不同维度进行组合分析。


本文参考链接:https://blog.csdn.net/neweastsun/article/details/108225541
阅读延展