ElasticSearch 聚合查询与性能优化实战

聚合是 ES 的强大功能之一。本文深入剖析 ES 的聚合查询类型(Bucket、Metric、Pipeline)、聚合原理、性能优化技巧,以及实战中的最佳实践。

一、聚合概述

1.1 聚合类型

聚合类型

管道聚合

Parent

Sibling

指标聚合

Avg/Max/Min/Sum

Cardinality

Percentiles

桶聚合

Terms Aggregation

Range Aggregation

Date Histogram

聚合 Aggregations

1.2 聚合与查询结合

查询 + 聚合

Query

Filter Context

不影响聚合

缓存结果

Query Context

影响聚合

计算相关性


二、桶聚合

2.1 Terms 聚合

// 按字段值分组
{
  "aggs": {
    "by_status": {
      "terms": {
        "field": "status",
        "size": 10
      }
    }
  }
}

2.2 Range 聚合

// 按范围分组
{
  "aggs": {
    "price_ranges": {
      "range": {
        "field": "price",
        "ranges": [
          { "to": 100 },
          { "from": 100, "to": 500 },
          { "from": 500 }
        ]
      }
    }
  }
}

2.3 Date Histogram

// 按时间分桶
{
  "aggs": {
    "sales_over_time": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      }
    }
  }
}

三、指标聚合

3.1 基础指标

// 基础统计
{
  "aggs": {
    "price_stats": {
      "stats": {
        "field": "price"
      }
    }
  }
}

3.2 去重计数

// 去重计数
{
  "aggs": {
    "unique_users": {
      "cardinality": {
        "field": "user_id"
      }
    }
  }
}

3.3 百分位数

// 百分位数
{
  "aggs": {
    "latency_percentiles": {
      "percentiles": {
        "field": "latency",
        "percents": [50, 90, 99]
      }
    }
  }
}

四、管道聚合

4.1 Parent 管道

// 在桶聚合结果上计算
{
  "aggs": {
    "sales_per_month": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "sales": {
          "sum": { "field": "price" }
        },
        "cumulative_sales": {
          "cumulative_sum": {
            "buckets_path": "sales"
          }
        }
      }
    }
  }
}

4.2 Sibling 管道

// 在同级聚合上计算
{
  "aggs": {
    "max_price": {
      "max": { "field": "price" }
    },
    "min_price": {
      "min": { "field": "price" }
    },
    "price_diff": {
      "bucket_script": {
        "buckets_path": {
          "max": "max_price",
          "min": "min_price"
        },
        "script": "params.max - params.min"
      }
    }
  }
}

五、性能优化

5.1 聚合优化策略

优化策略

使用 filter 替代 query

避免计算相关性

结果可缓存

控制桶的数量

size 参数

避免返回过多桶

使用 approximate 聚合

cardinality 使用 HyperLogLog

性能更好

5.2 分片级别聚合

优化点

分片越多,协调开销越大

结果合并消耗资源

大数据集聚合慢

聚合流程

请求发送到协调节点

转发到所有分片

各分片执行聚合

协调节点合并结果

5.3 深度分页问题

解决方案

Search After

使用上一页最后一条排序值

无深度限制

Scroll

适合导出大量数据

有超时限制

问题

from + size 深度分页

每个分片返回 size 条

协调节点合并大量数据

内存溢出风险


六、实战技巧

6.1 多桶聚合

// 嵌套聚合
{
  "aggs": {
    "by_category": {
      "terms": { "field": "category" },
      "aggs": {
        "by_brand": {
          "terms": { "field": "brand", "size": 5 },
          "aggs": {
            "avg_price": {
              "avg": { "field": "price" }
            }
          }
        }
      }
    }
  }
}

6.2 动态桶大小

// 自适应桶大小
{
  "aggs": {
    "price_distribution": {
      "histogram": {
        "field": "price",
        "interval": "auto"
      }
    }
  }
}

七、面试高频问题

7.1 ES 聚合的原理?

1. 请求发送到协调节点
2. 协调节点广播到所有相关分片
3. 每个分片执行聚合,返回部分结果
4. 协调节点合并所有分片结果
5. 返回最终聚合结果

注意:
- 分片越多,协调开销越大
- 大量桶会消耗内存

7.2 如何优化聚合性能?

1. 使用 filter 而非 query
   - filter 结果可缓存
   - 不计算相关性

2. 限制桶数量
   - 设置合理的 size
   - 使用 shard_size

3. 使用 approximate 聚合
   - cardinality 使用 HyperLogLog
   - percentiles 使用 TDigest

4. 避免深度分页
   - 使用 search_after

九、聚合执行引擎与底层原理

9.1 聚合执行两阶段模型

阶段二: Fetch + Aggregate阶段

协调节点发送 doc_ids

各分片获取文档数据

执行聚合计算

返回部分聚合结果

协调节点合并结果

阶段一: Query阶段

协调节点广播查询

各分片执行 Query

收集匹配文档的 doc_ids

返回 doc_ids 到协调节点

9.2 聚合执行上下文

Aggregator生命周期

buildAggregation()

getValues()

postCollect()

buildAggregation()

聚合内存结构

AggregationContext

Aggregator[]

BucketAggregator

MetricAggregator

// Aggregator 核心接口
public abstract class Aggregator {
    
    // 聚合上下文
    protected final AggregationContext context;
    protected final String name;
    
    // 获取聚合值
    public abstract Double getValues(AggregationExecutionContext context) 
        throws IOException;
    
    // 收集文档
    public abstract void collect(int doc, long bucketOrd) throws IOException;
    
    // 构建聚合结果
    public abstract AggregationBuilder buildAggregation(long owningBucketOrd) 
        throws IOException;
    
    // 子聚合回调
    public Aggregator[] getSubAggregators() {
        return subAggregators;
    }
}

9.3 Terms 聚合执行细节

协调节点合并

收集所有分片结果

按 doc_count 排序

取 Top-N 返回

更新 shard_size

分片端执行

加载 fielddata

获取 global ordinals

构建 OrdinalMap

按 ordinal 分桶计数

返回 Top-K buckets

// OrdinalMap 用于 String 字段聚合
public class OrdinalMap extends AbstractOrdinalMap {
    
    // global_ordinal -> bucket ord
    private long[][] segmentOrds;
    private long[] prefixSums;
    
    // 映射: global_ordinal -> owning_ordinal
    public void globalToOwn(globalOrd) {
        int segment = binarySearch(prefixSums, globalOrd);
        long localOrd = globalOrd - prefixSums[segment];
        return segmentOrds[segment][localOrd];
    }
}

// Terms 聚合执行
public class StringTermsAggregator extends BucketsAggregator {
    
    @Override
    public void collect(int doc, long bucketOrd) throws IOException {
        // 1. 获取字段值
        SortedDocValues values = context.getSortedDocValues(field);
        
        // 2. 获取 global ordinal
        long globalOrd = values.getOrd(doc);
        
        // 3. 映射到 bucket ord
        long bucketKey = mapGlobalToOwn(globalOrd);
        
        // 4. 增加计数
        incrementBucketDocCount(bucketKey, 1);
    }
}

十、近似算法与基数统计原理

10.1 HyperLogLog 算法原理

HyperLogLog (HLL) 是用于基数估计的概率算法,标准误差约 0.81%:

基数估算

m = 2^p = 1024

E = α * m² / Σ(2^-m_k)

α = 0.673 (m=1024)

桶内统计

计算前导零

max(0, 64 - leadingZeros - 1)

更新桶内最大值

Hash + 分桶

元素 x

Hash(x) -> 64位

低 10 位 = 分桶编号 (0-1023)

高位作为随机数据

// HyperLogLog 核心实现
public class HyperLogLogPlusPlus {
    
    private final int p;           // 分桶位数 (4-18)
    private final long m;         // 桶数量 2^p
    private final byte[] registers; // 寄存器数组
    
    public HyperLogLogPlusPlus(int p) {
        this.p = p;
        this.m = 1 << p;
        this.registers = new byte[m];
    }
    
    public void add(long hash64) {
        // 1. 提取分桶编号
        int bucket = (int) (hash64 & (m - 1));
        
        // 2. 计算前导零 (不含符号位)
        int maxZero = 64 - Long.numberOfLeadingZeros(hash64);
        
        // 3. 更新寄存器 (取最大值)
        registers[bucket] = (byte) Math.max(
            registers[bucket], maxZero
        );
    }
    
    public long cardinality() {
        // 调和平均数
        double sum = 0.0;
        for (int i = 0; i < m; i++) {
            sum += 1.0 / (1 << registers[i]);
        }
        
        // 估算公式
        double estimate = m * m / sum;
        
        // 偏差修正
        if (estimate <= 2.5 * m) {
            // 使用线性计数
            return linearCounting(m, countNonZero());
        }
        
        // HLL 估算
        return Math.round(estimate * ALPHA[p]);
    }
    
    // 寄存器数组
    private static final double[] ALPHA = {
        0.673, 0.697, 0.709  // p=10, 11, 12...
    };
}

10.2 TDigest 百分位数算法

TDigest 用于百分位数估计,对极端值有更好的精度:

百分位查询

从左右向中间累积

找到目标百分位

线性插值

构建过程

累积数据

达到压缩阈值

合并为 Centroid

重新排序

Centroid结构

Centroid

mean: 中心值

count: 权重

// TDigest 核心实现
public class TDigest {
    
    private final double compression;
    private final List<Centroid> centroids = new ArrayList<>();
    
    // 添加数据点
    public void add(double value, double weight) {
        if (weight == 0) return;
        
        // 1. 尝试合并到现有 Centroid
        Centroid nearest = findNearest(value);
        if (nearest != null && canAdd(nearest, weight)) {
            nearest.merge(value, weight);
        } else {
            // 2. 创建新 Centroid
            centroids.add(new Centroid(value, weight));
        }
        
        // 3. 必要时压缩
        if (centroids.size() > 2 * compression) {
            compress();
        }
    }
    
    // 计算百分位数
    public double quantile(double q) {
        // 1. 按 mean 排序
        centroids.sort(Comparator.comparingDouble(c -> c.mean));
        
        // 2. 累积权重
        double totalWeight = centroids.stream()
            .mapToDouble(c -> c.count)
            .sum();
        double targetWeight = q * totalWeight;
        
        // 3. 插值计算
        double cumWeight = 0;
        for (int i = 0; i < centroids.size() - 1; i++) {
            double nextCum = cumWeight + centroids.get(i + 1).count;
            if (cumWeight <= targetWeight && targetWeight < nextCum) {
                // 线性插值
                double t = (targetWeight - cumWeight) / 
                           (nextCum - cumWeight);
                return lerp(centroids.get(i).mean, 
                           centroids.get(i + 1).mean, t);
            }
            cumWeight = nextCum;
        }
        
        return centroids.get(centroids.size() - 1).mean;
    }
    
    // 压缩: 按权重归并相邻 Centroid
    private void compress() {
        // 使用 k-means 风格合并
        // 限制: sum(count) 不超过某个阈值
    }
}

10.3 HLL vs TDigest 对比

特性 HyperLogLog TDigest
用途 基数估计 (COUNT DISTINCT) 百分位数、中位数
精度 ~0.81% 尾部更精确
内存 O(2^p) bytes O(n) centroids
内存可控
极端值友好

十一、Filter 缓存与 Bitset 缓存机制

11.1 Filter 查询执行流程

Cache存储

Key: CacheKey

Value: Bitset

LRU 淘汰策略

Filter执行

命中

未命中

查询解析

检查 Cache?

返回缓存 BitSet

执行 MatchDocs

生成 BitSet

存入 Cache

11.2 Lucene Bitset 实现

// Roaring Bitmap 实现
public class RoaringBitmap implements Iterable< int> {
    
    // 按高16位分桶
    private final RoaringArray ra;
    
    // Container 类型
    abstract static class Container {
        static final int MAX_SIZE = 65536;
    }
    
    // 1. BitmapContainer: 密集数据 (65536 bits = 8KB)
    static class BitmapContainer extends Container {
        long[] bitset = new long[1024];  // 65536 bits
    }
    
    // 2. ArrayContainer: 稀疏数据
    static class ArrayContainer extends Container {
        short[] array = new short[0];  // 最多 4096 个元素
    }
    
    // 3. RunContainer: 连续数据
    static class RunContainer extends Container {
        short[] runsLength;  // RLE 编码
    }
    
    // 按位与运算
    public RoaringBitmap and(RoaringBitmap other) {
        RoaringBitmap result = new RoaringBitmap();
        
        for (int i = 0; i < ra.size(); i++) {
            Container c1 = ra.getContainerAtIndex(i);
            Container c2 = other.ra.getContainerAtIndex(i);
            
            if (c1 != null && c2 != null) {
                Container intersection = c1.and(c2);
                if (intersection.getCardinality() > 0) {
                    result.ra.append(i, intersection);
                }
            }
        }
        return result;
    }
}

11.3 Filter 缓存策略

// Filter 缓存管理
public class QueryCache {
    
    // LRU 缓存
    private final LRUCache<CacheKey, WeakReference<Bitset>> cache;
    
    // 缓存条件判断
    public boolean shouldCache(Query query) {
        // 1. 必须是叶子查询
        if (!isLeafQuery(query)) return false;
        
        // 2. 查询成本超过阈值
        double cost = estimateQueryCost(query);
        return cost > indexWriter.config.getMaxCachedCost();
        
        // 3. 不缓存过于频繁的查询
    }
    
    // 获取缓存的 BitSet
    public Bitset getCachedBitset(CacheKey key) {
        WeakReference<Bitset> ref = cache.get(key);
        return ref != null ? ref.get() : null;
    }
}

十二、桶聚合与 Top-Hits 嵌套实现

12.1 Top-Hits 聚合执行

数据结构

Bucket

top_hits: { docs[], maxScore }

nested_hits: { hits[], total }

Top-Hits执行

按桶分组

每个桶独立查询

分页获取命中文档

按 sort 排序

限制返回数量

// TopHitsAggregator 实现
public class TopHitsAggregator extends BucksAggregator {
    
    private final int from;
    private final int size;
    private final ScoreDoc searchAfter;
    private final Query query;
    private final Sort sort;
    
    private CollectorContext<?> context;
    private Searcher searcher;
    
    @Override
    public void doPostCollect() {
        // 为每个桶执行一次查询
        for (int i = 0; i < buckets.size(); i++) {
            long bucketOrd = bucketOrds[i];
            String bucketKey = bucketKeys[i];
            
            // 构建桶特定查询 (添加桶过滤)
            Query bucketQuery = new ConstantScoreQuery(
                new BooleanQuery.Builder()
                    .add(query, BooleanClause.Occur.MUST)
                    .add(bucketKeyFilter, BooleanClause.Occur.FILTER)
                    .build()
            );
            
            // 执行搜索
            TopDocs topDocs = searcher.search(
                bucketQuery, size, sort, searchAfter, false
            );
            
            // 保存结果
            saveTopHits(bucketOrd, topDocs);
        }
    }
}

12.2 全局序数 (Global Ordinals)

Global Ordinals 是字符串字段聚合的关键优化:

OrdinalMap

segment_ord -> global_ord

支持快速查找

内存优化: 分段存储

构建过程

Segment Ordinals

合并为 Global

构建映射表

// Global Ordinals 映射
public class OrdinalMap {
    
    // 段 ordinal 到全局 ordinal 的映射
    private final long[][] segmentOrds;
    private final long[] prefixSum;  // 每个段 ordinal 的起始位置
    
    // 构建映射
    public static OrdinalMap build(
            List<SortedDocValues> segValues) throws IOException {
        
        OrdinalMap map = new OrdinalMap();
        map.segmentOrds = new long[segValues.size()][];
        map.prefixSum = new long[segValues.size() + 1];
        
        // 收集所有唯一值并排序
        SortedBytesWriter writer = new SortedBytesWriter();
        for (int i = 0; i < segValues.size(); i++) {
            prefixSum[i] = writer.getPosition();
            
            // 写入该段的所有值
            SortedDocValues values = segValues.get(i);
            for (int ord = 0; ord < values.getValueCount(); ord++) {
                writer.writeValue(values.lookupOrd(ord));
            }
        }
        prefixSum[segValues.size()] = writer.getPosition();
        
        // 构建反向映射
        map.segmentOrds[i] = new long[valueCount];
        // ... 填充映射
        
        return map;
    }
    
    // 查询: global -> segment + local
    public long[] lookupGlobalOrd(long globalOrd) {
        int segment = binarySearch(prefixSum, globalOrd);
        long localOrd = globalOrd - prefixSum[segment];
        return new long[]{segment, localOrd};
    }
}

十三、聚合性能调优实战

13.1 聚合慢日志分析

// 开启聚合分析
{
  "profile": true,
  "aggregations": {
    "my_agg": {
      "terms": { "field": "category" }
    }
  }
}
// profiler 结果
{
  "aggregations": [
    {
      "type": "terms",
      "description": "my_agg",
      "time_in_nanos": 1523400,
      "breakdown": {
        "reduce": 0,
        "build_aggregation": 1450000,
        "post_collection": 0,
        "initialize": 5000,
        "collect": 68300
      },
      "children": [
        {
          "type": "long_terms",
          "time_in_nanos": 234000
        }
      ]
    }
  ]
}

13.2 字段类型优化

// 优化: 使用 keyword 而非 text 进行聚合
{
  "mappings": {
    "properties": {
      "status": {
        "type": "keyword"  // ✅ 优化: 直接聚合
      },
      "status_text": {
        "type": "text",
        "fields": {
          "keyword": {  // ✅ 添加 keyword 子字段用于聚合
            "type": "keyword"
          }
        }
      }
    }
  }
}

13.3 doc_values 和 fielddata 配置

// 聚合字段优化
{
  "mappings": {
    "properties": {
      "price": {
        "type": "double",
        "doc_values": true,  // ✅ 默认开启, 用于聚合
        "fielddata": false    // ⚠️ 关闭, 节省内存
      },
      "name": {
        "type": "text",
        "fielddata": true,    // ⚠️ 仅在确实需要时开启
        "fields": {
          "keyword": {
            "type": "keyword",
            "doc_values": true  // ✅ 用于排序和聚合
          }
        }
      }
    }
  }
}

13.4 资源限制配置

// 聚合内存限制
// elasticsearch.yml
indices.queries.cache.size: 10%
indices.fielddata.cache.size: 20%

// 查询级别限制
{
  "indices.breaker.fielddata.limit": 40%
}

// 聚合深度限制
{
  "max_buckets": 10000  // 默认值
}

八、总结

8.1 聚合类型

聚合类型:

1. 桶聚合
   - 分组统计
   - terms, range, histogram

2. 指标聚合
   - 计算指标
   - avg, sum, cardinality

3. 管道聚合
   - 在聚合结果上计算
   - cumulative_sum, bucket_script

8.2 优化建议

优化建议:

1. 使用 filter
2. 限制桶数量
3. 使用 approximate 聚合
4. 避免深度分页
Logo

更多推荐