MongoDB7.0--SpringBoot聚合操作

摘要

聚合操作简介

  • 聚合操作允许用户处理多个文档并返回计算结果

  • 聚合操作包含三类

    • 单文档聚合:针对单个集合,如:db.collection.countDocument(), db.collection.distinct()
    • 聚合管道:它可以作用在一个或几个集合上,对集合中的数据进行的一系列运算,并将这些数据转化为用户期望的形式,本文主要介绍管道操作
    • MapReduce:从MongoDB 5.0开始,map-reduce操作已被弃用,本文不做介绍。

聚合管道

  • 聚合管道是MongoDB中非常强大的功能,它允许用户将多个操作组合在一起,以实现复杂的数据处理。

  • 从效果而言,聚合管道相当于 SQL 查询中的 GROUP BY、 LEFT OUTER JOIN 、 AS等。

  • 整个聚合运算过程称为管道(Pipeline),它是由多个阶段(Stage)组成的, 每个管道:

    • 接受一系列文档(原始数据)
    • 每个阶段对这些文档进行一系列运算
    • 结果文档输出给下一个阶段

  • 聚合管道操作语法

    • pipelines 一组数据聚合阶段。除out、Merge和$geonear阶段之外,每个阶段都可以在管道中出现多次。
    • options 可选,聚合操作的其他参数。包含:查询计划、是否使用临时文件、 游标、最大操作时间、读写策略、强制索引等等
    1
    2
    pipeline = [$stage1, $stage2, ...$stageN];
    db.collection.aggregate(pipeline, {options})

    聚合管道

  • 常用的聚合阶段运算符

阶段运算符 描述 SQL等价运算符
$match 过滤文档 WHERE
$project 投影,改变文档的形状和内容 SELECT filedName AS newName
$group 将文档分组 GROUP BY
$sort 对文档进行排序 ORDER BY
$limit 限制结果集的大小 LIMIT
$skip 跳过指定数量的文档 OFFSET
$unwind 展开数组 -
$lookup 从其他集合中获取相关文档,左外连接 LEFT OUTER JOIN
$out 将结果集输出到新的集合 -
$geoNear 按照地理位置附近的顺序返回文档 -
$graphLookup 执行递归查询 -
$addFields 添加新字段 -
$bucket 根据指定条件将文档分组成桶 -
$facet 允许在单个聚合阶段内执行多个独立的子聚合 -

  • 聚合表达式

获取字段信息

1
2
$<field>  : 用 $ 指示字段路径
$<field>.<subfield> : 使用 $ 和 . 来指示内嵌文档的路径

常量表达式

1
$literal :<value> : 指示常量 <value>

系统变量表达式

1
2
$$<variable>  使用 $$ 指示系统变量
$$CURRENT 指示管道中当前操作的文档

聚合管道示例

  • 本示例使用SpringBoot实现MongoDB7.0的聚合操作

  • 初始化数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/*
var tags = ["nosql","mongodb","document","developer","popular"];
var types = ["technology","sociality","travel","novel","literature"];
var books=[];
for(var i=0;i<50;i++){
var typeIdx = Math.floor(Math.random()*types.length);
var tagIdx = Math.floor(Math.random()*tags.length);
var tagIdx2 = Math.floor(Math.random()*tags.length);
var favCount = Math.floor(Math.random()*100);
var username = "xx00"+Math.floor(Math.random()*10);
var age = 20 + Math.floor(Math.random()*15);
var book = {
title: "book-"+i,
type: types[typeIdx],
tag: [tags[tagIdx],tags[tagIdx2]],
favCount: favCount,
author: {name:username,age:age}
};
books.push(book)
}
db.books.insertMany(books);
*/
String[] tags = {"nosql", "mongodb", "document", "developer", "popular"};
String[] types = {"technology", "sociality", "travel", "novel", "literature"};
List<Document> books = new ArrayList<>();

Random random = new Random();
for (int i = 0; i < 50; i++) {
int typeIdx = random.nextInt(types.length);
int tagIdx = random.nextInt(tags.length);
int tagIdx2 = random.nextInt(tags.length);
int favCount = random.nextInt(100);
String username = "xx00" + random.nextInt(10);
int age = 20 + random.nextInt(15);

Document book = new Document("title", "book-" + i)
.append("type", types[typeIdx])
.append("tag", List.of(tags[tagIdx], tags[tagIdx2]))
.append("favCount", favCount)
.append("author", new Document("name", username).append("age", age));
books.add(book);
}

mongoTemplate.insert(books, "books");

$project:投影操作

  • 将原始字段投影成指定名称, 如将集合中的 title 投影成 name

1
2
3
4
5
6
7
8
9
10
// db.books.aggregate([{$project:{name:"$title"}}])
ProjectionOperation projectOperation = Aggregation.project()
.andExpression("title").as("name");
Aggregation aggregation = Aggregation.newAggregation(projectOperation);
System.out.println(aggregation);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();

for (Document document : result) {
System.out.println(document.toJson());
}
  • 剔除不需要的字段

1
2
3
4
5
6
7
8
9
10
11
12
13
// db.books.aggregate([{$project:{name:"$title",_id:0,type:1,author.name:1}}])
ProjectionOperation projectOperation = Aggregation.project()
.andExclude("_id")
.andInclude("type")
.andExpression("author.name").as("author.name") // 这里必须使用as重新命名,否则只会用name做字段名称,这就和后面的title的别名冲突了
.andExpression("title").as("name");
Aggregation aggregation = Aggregation.newAggregation(projectOperation);
System.out.println(aggregation);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();

for (Document document : results) {
System.out.println(document.toJson());
}

$match:过滤操作

  • 过滤出指定条件的文档

1
2
3
4
5
6
7
8
// db.books.aggregate([{$match:{type:"technology"}}])
MatchOperation matchOperation = Aggregation.match(Criteria.where("type").is("technology"));
Aggregation aggregation = Aggregation.newAggregation(matchOperation);
System.out.println(aggregation);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();
for (Document document : results) {
System.out.println(document.toJson());
}
  • 组合其它管道

1
2
3
4
5
6
7
8
9
10
/*
db.books.aggregate([
{$match:{type:"technology"}},
{$project:{name:"$title",_id:0,type:1,author:{name:1}}}
])
*/
MatchOperation matchOperation = Aggregation.match(Criteria.where("type").is("technology"));
ProjectionOperation projectOperation = Aggregation.project("title").andExclude("_id").andInclude("type").and("author.name").as("author.name");
Aggregation aggregation = Aggregation.newAggregation(matchOperation, projectOperation);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();

$count:计数操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*
db.books.aggregate([
{$match:{type:"technology"}},
{$count: "type_count"}
])
*/
// MatchOperation to filter documents by type: "technology"
MatchOperation matchAggregation = Aggregation.match(Criteria.where("type").is("technology"));
// CountOperation to count the matched documents
CountOperation countAggregation = Aggregation.count().as("type_count");
// Combine the match and count operations
Aggregation aggregation = Aggregation.newAggregation(matchAggregation, countAggregation);
System.out.println(aggregation);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();

$group:分组操作

  • 按指定的表达式对文档进行分组,并将每个不同分组的文档输出到下一个阶段。输出文档包含一个_id字段,该字段按键包含不同的组。

  • 输出文档还可以包含计算字段,该字段保存由$group的_id字段分组的一些accumulator表达式的值。 $group不会输出具体的文档而只是统计信息。

  • 语法

1
2
3
4
{ $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }
# _id字段是必填的;但是,可以指定_id值为null来为整个输入文档计算累计值。
# 剩余的计算字段是可选的,并使用<accumulator>运算符进行计算。
# _id和<accumulator>表达式可以接受任何有效的表达式。
  • accumulator操作符

名称 描述 类比sql
avg 计算均值 avg
first 返回每组第一个文档,如果有排序,按照排序,如果没有按照默认的存储的顺序的第一个文档。 limit 0,1
last 返回每组最后一个文档,如果有排序,按照排序,如果没有按照默认的存储的顺序的最后个文档。 -
max 根据分组,获取集合中所有文档对应值得最大值。 max
min 根据分组,获取集合中所有文档对应值得最小值。 min
push 将指定的表达式的值添加到一个数组中。 -
addToSet 将表达式的值添加到一个集合中(无重复值,无序)。 -
sum 计算总和 sum
stdDevPop 返回输入值的总体标准偏差(population standard deviation) -
stdDevSamp 返回输入值的样本标准偏差(the sample standard deviation) -

示例

  • book的数量,收藏总数和平均值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
db.books.aggregate([
{$group:{_id:null,count:{$sum:1},pop:{$sum:"$favCount"},avg:{$avg:"$favCount"}}}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.group().count().as("count").sum("favCount").as("pop").avg("favCount").as("avg")
);

AggregationResults<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class);
List<Document> mappedResults = results.getMappedResults();

// Assuming that there will be only one result due to grouping by null (_id: null)
if (!mappedResults.isEmpty()) {
System.out.println(mappedResults.get(0).toJson());
} else {
System.out.println("{}");
}
  • 统计每个作者的book收藏总数

1
2
3
4
5
6
7
8
9
10
11
/*
db.books.aggregate([
{$group:{_id:"$author.name",pop:{$sum:"$favCount"}}}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.group("author.name").sum("favCount").as("pop")
);

AggregationResults<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class);
List<Document> mappedResults = results.getMappedResults();
  • 统计每个作者的每本book的收藏数

1
2
3
4
5
6
7
8
9
10
11
/*
db.books.aggregate([
{$group:{_id:{name:"$author.name",title:"$title"},pop:{$sum:"$favCount"}}}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.group("author.name", "title").sum("favCount").as("pop")
);

AggregationResults<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class);
List<Document> mappedResults = results.getMappedResults();
  • 每个作者的book的type合集

1
2
3
4
5
6
7
8
9
/*
db.books.aggregate([
{$group:{_id:"$author.name",types:{$addToSet:"$type"}}}
])
*/
GroupOperation groupOperation = Aggregation.group("author.name").addToSet("type").as("types");
Aggregation aggregation = Aggregation.newAggregation(groupOperation);
AggregationResults<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class);
List<Document> mappedResults = results.getMappedResults();

$unwind

  • 可以将数组拆分为单独的文档

  • 语法

1
2
3
4
5
6
7
8
9
10
11
{
$unwind:
{
#要指定字段路径,在字段名称前加上$符并用引号括起来。
path: <field path>,
#可选,一个新字段的名称用于存放元素的数组索引。该名称不能以$开头。
includeArrayIndex: <string>,
#可选,default :false,若为true,如果路径为空,缺少或为空数组,则$unwind输出文档
preserveNullAndEmptyArrays: <boolean>
}
}

示例

  • 姓名为xx006的作者的book的tag数组拆分为多个文档

1
2
3
4
5
6
7
8
9
10
11
/*
db.books.aggregate([
{$match:{"author.name":"xx006"}},
{$unwind:"$tag"}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("author.name").is("xx006")),
Aggregation.unwind("tag")
);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();
  • 使用includeArrayIndex选项来输出数组元素的数组索引

1
2
3
4
5
6
7
8
9
10
11
/*
db.books.aggregate([
{$match:{"author.name":"xx006"}},
{$unwind:{path:"$tag", includeArrayIndex: "arrayIndex"}}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("author.name").is("xx006")),
Aggregation.unwind("tag", "arrayIndex")
);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();
  • 每个作者的book的tag合集

1
2
3
4
5
6
7
8
9
10
11
/*
db.books.aggregate([
{$unwind:"$tag"},
{$group:{_id:"$author.name",types:{$addToSet:"$tag"}}}
])
*/
UnwindOperation unwindOperation = Aggregation.unwind("tag");
GroupOperation groupOperation = Aggregation.group("author.name").addToSet("tag").as("types");
Aggregation aggregation = Aggregation.newAggregation(unwindOperation, groupOperation);
AggregationResults<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class);
List<Document> mappedResults = results.getMappedResults();
  • 使用preserveNullAndEmptyArrays选项在输出中包含缺少tag字段,null或空数组的文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 初始化数据,加入一些tag为空数组或不存在tag的文档
db.books.insertMany([
{
"title" : "book-51",
"type" : "technology",
"favCount" : 11,
"tag":[],
"author" : {
"name" : "xx006",
"age" : 28
}
},
{
"title" : "book-52",
"type" : "technology",
"favCount" : 15,
"author" : {
"name" : "xx006",
"age" : 28
}
}
])
1
2
3
4
5
6
7
8
9
10
11
/*
db.books.aggregate([
{$match:{"author.name":"xx006"}},
{$unwind:{path:"$tag", preserveNullAndEmptyArrays: true}}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("author.name").is("xx006")),
Aggregation.unwind("tag", true)
);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();

limit/limit/skip/$sort

  • $limit:限制传递到管道中下一阶段的文档数

  • $skip:跳过传递到管道中下一阶段的文档数

  • $sort:对传递到管道中下一阶段的文档进行排序

示例

  • 姓名为xx006的作者的book的tag数组拆分为多个文档,按照收藏数降序排序,跳过2个文档,取5个文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
db.books.aggregate([
{$match:{"author.name":"xx006"}},
{$unwind:{path:"$tag", preserveNullAndEmptyArrays: true}},
{$sort:{"favCount":-1}},
{$limit : 5 },
{$skip : 2}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("author.name").is("xx006")),
Aggregation.unwind("tag", true),
Aggregation.sort(Sort.by(Sort.Direction.DESC, "favCount")),
Aggregation.skip(2),
Aggregation.limit(5)
);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();
``
* 统计每个分类的book文档数量
```java
/*
db.books.aggregate([
{$group:{_id:"$type",total:{$sum:1}}},
{$sort:{total:-1}}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.group("type").count().as("total"),
Aggregation.sort(Sort.by(Sort.Direction.DESC, "total"))
);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();
  • 标签的热度排行,标签的热度则按其关联book文档的收藏数(favCount)来计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
db.books.aggregate([
{$match:{favCount:{$gt:0}}},
{$unwind:"$tag"},
{$group:{_id:"$tag",total:{$sum:"$favCount"}}},
{$sort:{total:-1}}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("favCount").gt(0)),
Aggregation.unwind("tag"),
Aggregation.group("tag").sum("favCount").as("total"),
Aggregation.sort(Sort.by(Sort.Direction.DESC, "total"))
);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();

$bucket

  • $bucket:根据指定的条件和边界,将文档分组到不同的桶中

示例

  • 统计book文档收藏数[0,10),[10,60),[60,80),[80,100),[100,+∞)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
db.books.aggregate([{
$bucket:{
groupBy:"$favCount",
boundaries:[0,10,60,80,100],
default:"other",
output:{"count":{$sum:1}}
}
}])

[
{ _id: 0, count: 3 },
{ _id: 10, count: 27 },
{ _id: 60, count: 11 },
{ _id: 80, count: 11 }
]

groupBy: 指定用于分组的字段。在此示例中,根据 favCount 字段的值进行分组。

boundaries: 定义分桶的边界,即每个桶的范围。文档的值将根据这些边界被分配到不同的桶中。边界应按升序排列。在这个命令中,边界 [0, 10, 60, 80, 100] 表示将文档分成六个桶:小于等于0、大于0小于等于10、大于10小于等于60、大于60小于等于80、大于80小于等于100、大于100。

default: 这是一个可选参数,用于指定默认桶的名称。如果文档的值不在指定的边界范围内,则会被放入该桶中。在这个命令中,如果 favCount 的值不在指定的边界范围内,将会被放入名为 "other" 的桶中。比如这里设置边界 [10, 60, 80, 100],即表示从10开始分组,则0~10的数据会被分组到名为 "other" 的桶中。
[
{ _id: 10, count: 27 },
{ _id: 60, count: 11 },
{ _id: 80, count: 11 },
{ _id: 'other', count: 3 }
]

output: 定义了每个桶的输出内容。这是一个子文档,它指定了在每个桶中计算的聚合操作。在这个示例中,使用 $sum 聚合操作符来计算每个桶中的文档数量,并将结果存储在名为 "count" 的字段中。
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.bucket("$favCount")
.withBoundaries(0,10,60,80,100)
.withDefaultBucket("other")
.andOutput(context -> new Document("$sum", 1))
.as("count")
);
List<Document> results = mongoTemplate.aggregate(aggregation, "books", Document.class).getMappedResults();

$lookup

  • $lookup:将文档中的一个字段的值与另一个集合中的文档进行匹配,然后将匹配的文档添加到当前文档中

  • 语法

1
2
3
4
5
6
7
8
db.collection.aggregate([{
$lookup: {
from: "<collection to join>",
localField: "<field from the input documents>",
foreignField: "<field from the documents of the from collection>",
as: "<output array field>"
}
})
名称 描述
from 同一个数据库下等待被Join的集合。
localField 源集合中的match值,如果输入的集合中,某文档没有 localField这个Key(Field),在处理的过程中,会默认为此文档含有 localField:null的键值对。
foreignField 待Join的集合的match值,如果待Join的集合中,文档没有foreignField值,在处理的过程中,会默认为此文档含有 foreignField:null的键值对。
as 为输出文档的新增值命名。如果输入的集合中已存在该值,则会覆盖掉。
  • 其语法功能类似于下面的伪SQL语句

1
2
3
4
5
SELECT *, <output array field>
FROM collection
WHERE <output array field> IN (SELECT *
FROM <collection to join>
WHERE <foreignField> = <collection.localField>);

示例

  • 准备数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 顾客信息
db.customer.insertOne({customerCode:1,name:"customer1",phone:"13112345678",address:"test1"})
db.customer.insertOne({customerCode:2,name:"customer2",phone:"13112345679",address:"test2"})

# 订单信息
db.order.insertOne({orderId:1,orderCode:"order001",customerCode:1,price:200})
db.order.insertOne({orderId:2,orderCode:"order002",customerCode:2,price:400})

# 订单明细
db.orderItem.insertOne({itemId:1,productName:"apples",qutity:2,orderId:1})
db.orderItem.insertOne({itemId:2,productName:"oranges",qutity:2,orderId:1})
db.orderItem.insertOne({itemId:3,productName:"mangoes",qutity:2,orderId:1})
db.orderItem.insertOne({itemId:4,productName:"apples",qutity:2,orderId:2})
db.orderItem.insertOne({itemId:5,productName:"oranges",qutity:2,orderId:2})
db.orderItem.insertOne({itemId:6,productName:"mangoes",qutity:2,orderId:2})
  • 查询顾客的订单信息,关联字段为customerCode顾客号码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
db.customer.aggregate([
{$lookup: {
from: "order",
localField: "customerCode",
foreignField: "customerCode",
as: "customerOrder"
}
}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.lookup("order", "customerCode", "customerCode", "customerOrder")
);
List<Document> results = mongoTemplate.aggregate(aggregation, "customer", Document.class).getMappedResults();
  • 根据订单信息关联顾客信息和订单明细

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
db.order.aggregate([
{$lookup: {
from: "customer",
localField: "customerCode",
foreignField: "customerCode",
as: "curstomer"
}

},
{$lookup: {
from: "orderItem",
localField: "orderId",
foreignField: "orderId",
as: "orderItem"
}
}
])
*/
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.lookup("customer", "customerCode", "customerCode", "curstomer"),
Aggregation.lookup("orderItem", "orderId", "orderId", "orderItem")
);
List<Document> results = mongoTemplate.aggregate(aggregation, "order", Document.class).getMappedResults();