Python 实现elasticsearch group by

系统 1341 0

es实现聚合

es通过 agg 实现聚合,详情可见 es文档


有时候查询es数据的时候可能需要实现多字段group by的功能,例如:

          
            SELECT sum(item_count) from A group by field1, field2, field3
          
        

要实现多个维度的聚合,需要嵌套的 agg 查询语句:

          
            {
    "query": {
    },
    "aggs": {
        "field1": {
            "terms": {
                "field": "field1",
                "size": 2147483647 #设置一个大的分桶数,防止一次统计不完整
            },
            "aggs": {
                "field2": {
                    "terms": {
                        "field": "field2",
                        "size": 2147483647
                    },
                    "aggs": {
                        "field3": {
                            "terms": {
                                "field": "field3",
                                "size": 2147483647
                            },
                            "aggs": {
                                "sum_field": {
                                    "sum": {
                                        "field": "sum_field"
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    },
    "size": 0
}
          
        

用函数构建聚合语句的 agg 部分:

          
            def build_query_aggs(fields, sum_field):
    agg_data = {}
    curr_field = agg_data
    for item in fields:
        curr_field[item] = {
            "terms": {
                "field": item,
                "size": 2147483647
            },
            "aggs": {}
        }
        curr_field = curr_field[item]["aggs"]
    curr_field[sum_field] = {
        "sum": {
                "field": sum_field
            }
    }
    return agg_data
          
        

处理得到的数据,将其组织成list:

          
            def build_es_aggs_data(data, fields, sum_field):
    curr_field = None
    res_data = []
    if len(fields) > 0:
        curr_field = fields[0]
    else:
        return
    curr_buckets = data[curr_field]['buckets']
    for item in curr_buckets:
        if len(fields) == 1:
            curr_data= {}
            curr_data[curr_field] = item['key']
            curr_data[sum_field] = item[sum_field]["value"]
            res_data.append(curr_data)
        else:
            pre_data = deepcopy(build_es_aggs_data(item, fields[1:], sum_field))
            for pre_item in pre_data:
                pre_item[curr_field] = item['key']
                res_data.append(pre_item)
    return res_data
          
        

更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论