ElasticSearch 聚合查询与性能优化实战
本文深入解析ElasticSearch聚合查询与性能优化策略。主要内容包括:1)聚合类型划分,涵盖桶聚合(Terms、Range、Date Histogram)、指标聚合(基础统计、去重计数、百分位数)和管道聚合;2)性能优化技巧,如使用filter替代query、控制桶数量、采用近似算法等;3)分片级别聚合原理及深度分页解决方案;4)实战中的多桶聚合和动态桶大小设置;5)面试高频问题解析。文章通
·
ElasticSearch 聚合查询与性能优化实战
聚合是 ES 的强大功能之一。本文深入剖析 ES 的聚合查询类型(Bucket、Metric、Pipeline)、聚合原理、性能优化技巧,以及实战中的最佳实践。
一、聚合概述
1.1 聚合类型
1.2 聚合与查询结合
二、桶聚合
2.1 Terms 聚合
// 按字段值分组
{
"aggs": {
"by_status": {
"terms": {
"field": "status",
"size": 10
}
}
}
}
2.2 Range 聚合
// 按范围分组
{
"aggs": {
"price_ranges": {
"range": {
"field": "price",
"ranges": [
{ "to": 100 },
{ "from": 100, "to": 500 },
{ "from": 500 }
]
}
}
}
}
2.3 Date Histogram
// 按时间分桶
{
"aggs": {
"sales_over_time": {
"date_histogram": {
"field": "date",
"calendar_interval": "month"
}
}
}
}
三、指标聚合
3.1 基础指标
// 基础统计
{
"aggs": {
"price_stats": {
"stats": {
"field": "price"
}
}
}
}
3.2 去重计数
// 去重计数
{
"aggs": {
"unique_users": {
"cardinality": {
"field": "user_id"
}
}
}
}
3.3 百分位数
// 百分位数
{
"aggs": {
"latency_percentiles": {
"percentiles": {
"field": "latency",
"percents": [50, 90, 99]
}
}
}
}
四、管道聚合
4.1 Parent 管道
// 在桶聚合结果上计算
{
"aggs": {
"sales_per_month": {
"date_histogram": {
"field": "date",
"calendar_interval": "month"
},
"aggs": {
"sales": {
"sum": { "field": "price" }
},
"cumulative_sales": {
"cumulative_sum": {
"buckets_path": "sales"
}
}
}
}
}
}
4.2 Sibling 管道
// 在同级聚合上计算
{
"aggs": {
"max_price": {
"max": { "field": "price" }
},
"min_price": {
"min": { "field": "price" }
},
"price_diff": {
"bucket_script": {
"buckets_path": {
"max": "max_price",
"min": "min_price"
},
"script": "params.max - params.min"
}
}
}
}
五、性能优化
5.1 聚合优化策略
5.2 分片级别聚合
5.3 深度分页问题
六、实战技巧
6.1 多桶聚合
// 嵌套聚合
{
"aggs": {
"by_category": {
"terms": { "field": "category" },
"aggs": {
"by_brand": {
"terms": { "field": "brand", "size": 5 },
"aggs": {
"avg_price": {
"avg": { "field": "price" }
}
}
}
}
}
}
}
6.2 动态桶大小
// 自适应桶大小
{
"aggs": {
"price_distribution": {
"histogram": {
"field": "price",
"interval": "auto"
}
}
}
}
七、面试高频问题
7.1 ES 聚合的原理?
1. 请求发送到协调节点
2. 协调节点广播到所有相关分片
3. 每个分片执行聚合,返回部分结果
4. 协调节点合并所有分片结果
5. 返回最终聚合结果
注意:
- 分片越多,协调开销越大
- 大量桶会消耗内存
7.2 如何优化聚合性能?
1. 使用 filter 而非 query
- filter 结果可缓存
- 不计算相关性
2. 限制桶数量
- 设置合理的 size
- 使用 shard_size
3. 使用 approximate 聚合
- cardinality 使用 HyperLogLog
- percentiles 使用 TDigest
4. 避免深度分页
- 使用 search_after
九、聚合执行引擎与底层原理
9.1 聚合执行两阶段模型
9.2 聚合执行上下文
// Aggregator 核心接口
public abstract class Aggregator {
// 聚合上下文
protected final AggregationContext context;
protected final String name;
// 获取聚合值
public abstract Double getValues(AggregationExecutionContext context)
throws IOException;
// 收集文档
public abstract void collect(int doc, long bucketOrd) throws IOException;
// 构建聚合结果
public abstract AggregationBuilder buildAggregation(long owningBucketOrd)
throws IOException;
// 子聚合回调
public Aggregator[] getSubAggregators() {
return subAggregators;
}
}
9.3 Terms 聚合执行细节
// OrdinalMap 用于 String 字段聚合
public class OrdinalMap extends AbstractOrdinalMap {
// global_ordinal -> bucket ord
private long[][] segmentOrds;
private long[] prefixSums;
// 映射: global_ordinal -> owning_ordinal
public void globalToOwn(globalOrd) {
int segment = binarySearch(prefixSums, globalOrd);
long localOrd = globalOrd - prefixSums[segment];
return segmentOrds[segment][localOrd];
}
}
// Terms 聚合执行
public class StringTermsAggregator extends BucketsAggregator {
@Override
public void collect(int doc, long bucketOrd) throws IOException {
// 1. 获取字段值
SortedDocValues values = context.getSortedDocValues(field);
// 2. 获取 global ordinal
long globalOrd = values.getOrd(doc);
// 3. 映射到 bucket ord
long bucketKey = mapGlobalToOwn(globalOrd);
// 4. 增加计数
incrementBucketDocCount(bucketKey, 1);
}
}
十、近似算法与基数统计原理
10.1 HyperLogLog 算法原理
HyperLogLog (HLL) 是用于基数估计的概率算法,标准误差约 0.81%:
// HyperLogLog 核心实现
public class HyperLogLogPlusPlus {
private final int p; // 分桶位数 (4-18)
private final long m; // 桶数量 2^p
private final byte[] registers; // 寄存器数组
public HyperLogLogPlusPlus(int p) {
this.p = p;
this.m = 1 << p;
this.registers = new byte[m];
}
public void add(long hash64) {
// 1. 提取分桶编号
int bucket = (int) (hash64 & (m - 1));
// 2. 计算前导零 (不含符号位)
int maxZero = 64 - Long.numberOfLeadingZeros(hash64);
// 3. 更新寄存器 (取最大值)
registers[bucket] = (byte) Math.max(
registers[bucket], maxZero
);
}
public long cardinality() {
// 调和平均数
double sum = 0.0;
for (int i = 0; i < m; i++) {
sum += 1.0 / (1 << registers[i]);
}
// 估算公式
double estimate = m * m / sum;
// 偏差修正
if (estimate <= 2.5 * m) {
// 使用线性计数
return linearCounting(m, countNonZero());
}
// HLL 估算
return Math.round(estimate * ALPHA[p]);
}
// 寄存器数组
private static final double[] ALPHA = {
0.673, 0.697, 0.709 // p=10, 11, 12...
};
}
10.2 TDigest 百分位数算法
TDigest 用于百分位数估计,对极端值有更好的精度:
// TDigest 核心实现
public class TDigest {
private final double compression;
private final List<Centroid> centroids = new ArrayList<>();
// 添加数据点
public void add(double value, double weight) {
if (weight == 0) return;
// 1. 尝试合并到现有 Centroid
Centroid nearest = findNearest(value);
if (nearest != null && canAdd(nearest, weight)) {
nearest.merge(value, weight);
} else {
// 2. 创建新 Centroid
centroids.add(new Centroid(value, weight));
}
// 3. 必要时压缩
if (centroids.size() > 2 * compression) {
compress();
}
}
// 计算百分位数
public double quantile(double q) {
// 1. 按 mean 排序
centroids.sort(Comparator.comparingDouble(c -> c.mean));
// 2. 累积权重
double totalWeight = centroids.stream()
.mapToDouble(c -> c.count)
.sum();
double targetWeight = q * totalWeight;
// 3. 插值计算
double cumWeight = 0;
for (int i = 0; i < centroids.size() - 1; i++) {
double nextCum = cumWeight + centroids.get(i + 1).count;
if (cumWeight <= targetWeight && targetWeight < nextCum) {
// 线性插值
double t = (targetWeight - cumWeight) /
(nextCum - cumWeight);
return lerp(centroids.get(i).mean,
centroids.get(i + 1).mean, t);
}
cumWeight = nextCum;
}
return centroids.get(centroids.size() - 1).mean;
}
// 压缩: 按权重归并相邻 Centroid
private void compress() {
// 使用 k-means 风格合并
// 限制: sum(count) 不超过某个阈值
}
}
10.3 HLL vs TDigest 对比
| 特性 | HyperLogLog | TDigest |
|---|---|---|
| 用途 | 基数估计 (COUNT DISTINCT) | 百分位数、中位数 |
| 精度 | ~0.81% | 尾部更精确 |
| 内存 | O(2^p) bytes | O(n) centroids |
| 内存可控 | ✅ | ✅ |
| 极端值友好 | ❌ | ✅ |
十一、Filter 缓存与 Bitset 缓存机制
11.1 Filter 查询执行流程
11.2 Lucene Bitset 实现
// Roaring Bitmap 实现
public class RoaringBitmap implements Iterable< int> {
// 按高16位分桶
private final RoaringArray ra;
// Container 类型
abstract static class Container {
static final int MAX_SIZE = 65536;
}
// 1. BitmapContainer: 密集数据 (65536 bits = 8KB)
static class BitmapContainer extends Container {
long[] bitset = new long[1024]; // 65536 bits
}
// 2. ArrayContainer: 稀疏数据
static class ArrayContainer extends Container {
short[] array = new short[0]; // 最多 4096 个元素
}
// 3. RunContainer: 连续数据
static class RunContainer extends Container {
short[] runsLength; // RLE 编码
}
// 按位与运算
public RoaringBitmap and(RoaringBitmap other) {
RoaringBitmap result = new RoaringBitmap();
for (int i = 0; i < ra.size(); i++) {
Container c1 = ra.getContainerAtIndex(i);
Container c2 = other.ra.getContainerAtIndex(i);
if (c1 != null && c2 != null) {
Container intersection = c1.and(c2);
if (intersection.getCardinality() > 0) {
result.ra.append(i, intersection);
}
}
}
return result;
}
}
11.3 Filter 缓存策略
// Filter 缓存管理
public class QueryCache {
// LRU 缓存
private final LRUCache<CacheKey, WeakReference<Bitset>> cache;
// 缓存条件判断
public boolean shouldCache(Query query) {
// 1. 必须是叶子查询
if (!isLeafQuery(query)) return false;
// 2. 查询成本超过阈值
double cost = estimateQueryCost(query);
return cost > indexWriter.config.getMaxCachedCost();
// 3. 不缓存过于频繁的查询
}
// 获取缓存的 BitSet
public Bitset getCachedBitset(CacheKey key) {
WeakReference<Bitset> ref = cache.get(key);
return ref != null ? ref.get() : null;
}
}
十二、桶聚合与 Top-Hits 嵌套实现
12.1 Top-Hits 聚合执行
// TopHitsAggregator 实现
public class TopHitsAggregator extends BucksAggregator {
private final int from;
private final int size;
private final ScoreDoc searchAfter;
private final Query query;
private final Sort sort;
private CollectorContext<?> context;
private Searcher searcher;
@Override
public void doPostCollect() {
// 为每个桶执行一次查询
for (int i = 0; i < buckets.size(); i++) {
long bucketOrd = bucketOrds[i];
String bucketKey = bucketKeys[i];
// 构建桶特定查询 (添加桶过滤)
Query bucketQuery = new ConstantScoreQuery(
new BooleanQuery.Builder()
.add(query, BooleanClause.Occur.MUST)
.add(bucketKeyFilter, BooleanClause.Occur.FILTER)
.build()
);
// 执行搜索
TopDocs topDocs = searcher.search(
bucketQuery, size, sort, searchAfter, false
);
// 保存结果
saveTopHits(bucketOrd, topDocs);
}
}
}
12.2 全局序数 (Global Ordinals)
Global Ordinals 是字符串字段聚合的关键优化:
// Global Ordinals 映射
public class OrdinalMap {
// 段 ordinal 到全局 ordinal 的映射
private final long[][] segmentOrds;
private final long[] prefixSum; // 每个段 ordinal 的起始位置
// 构建映射
public static OrdinalMap build(
List<SortedDocValues> segValues) throws IOException {
OrdinalMap map = new OrdinalMap();
map.segmentOrds = new long[segValues.size()][];
map.prefixSum = new long[segValues.size() + 1];
// 收集所有唯一值并排序
SortedBytesWriter writer = new SortedBytesWriter();
for (int i = 0; i < segValues.size(); i++) {
prefixSum[i] = writer.getPosition();
// 写入该段的所有值
SortedDocValues values = segValues.get(i);
for (int ord = 0; ord < values.getValueCount(); ord++) {
writer.writeValue(values.lookupOrd(ord));
}
}
prefixSum[segValues.size()] = writer.getPosition();
// 构建反向映射
map.segmentOrds[i] = new long[valueCount];
// ... 填充映射
return map;
}
// 查询: global -> segment + local
public long[] lookupGlobalOrd(long globalOrd) {
int segment = binarySearch(prefixSum, globalOrd);
long localOrd = globalOrd - prefixSum[segment];
return new long[]{segment, localOrd};
}
}
十三、聚合性能调优实战
13.1 聚合慢日志分析
// 开启聚合分析
{
"profile": true,
"aggregations": {
"my_agg": {
"terms": { "field": "category" }
}
}
}
// profiler 结果
{
"aggregations": [
{
"type": "terms",
"description": "my_agg",
"time_in_nanos": 1523400,
"breakdown": {
"reduce": 0,
"build_aggregation": 1450000,
"post_collection": 0,
"initialize": 5000,
"collect": 68300
},
"children": [
{
"type": "long_terms",
"time_in_nanos": 234000
}
]
}
]
}
13.2 字段类型优化
// 优化: 使用 keyword 而非 text 进行聚合
{
"mappings": {
"properties": {
"status": {
"type": "keyword" // ✅ 优化: 直接聚合
},
"status_text": {
"type": "text",
"fields": {
"keyword": { // ✅ 添加 keyword 子字段用于聚合
"type": "keyword"
}
}
}
}
}
}
13.3 doc_values 和 fielddata 配置
// 聚合字段优化
{
"mappings": {
"properties": {
"price": {
"type": "double",
"doc_values": true, // ✅ 默认开启, 用于聚合
"fielddata": false // ⚠️ 关闭, 节省内存
},
"name": {
"type": "text",
"fielddata": true, // ⚠️ 仅在确实需要时开启
"fields": {
"keyword": {
"type": "keyword",
"doc_values": true // ✅ 用于排序和聚合
}
}
}
}
}
}
13.4 资源限制配置
// 聚合内存限制
// elasticsearch.yml
indices.queries.cache.size: 10%
indices.fielddata.cache.size: 20%
// 查询级别限制
{
"indices.breaker.fielddata.limit": 40%
}
// 聚合深度限制
{
"max_buckets": 10000 // 默认值
}
八、总结
8.1 聚合类型
聚合类型:
1. 桶聚合
- 分组统计
- terms, range, histogram
2. 指标聚合
- 计算指标
- avg, sum, cardinality
3. 管道聚合
- 在聚合结果上计算
- cumulative_sum, bucket_script
8.2 优化建议
优化建议:
1. 使用 filter
2. 限制桶数量
3. 使用 approximate 聚合
4. 避免深度分页
更多推荐

所有评论(0)