我如何(在MongoDB中)将多个集合的数据合并为一个集合?
我可以使用map-reduce吗,如果可以的话,怎么做?
我将非常感谢一些例子,因为我是个新手。
虽然你不能实时做到这一点,但你可以通过使用MongoDB 1.8+ map/reduce中的"reduce"out选项(见http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Outputoptions),多次运行map-reduce来合并数据。 你需要在两个集合中都有一些可以作为_id使用的键。
例如,假设你有一个 "用户 "集合和一个 "评论 "集合,你想有一个新的集合,为每个评论提供一些用户人口信息。
假设users
集合有以下字段。
然后comments
集合有以下字段。
你会做这个map/reduce。
var mapUsers, mapComments, reduce;
db.users_comments.remove();
// setup sample data - wouldn't actually use this in production
db.users.remove();
db.comments.remove();
db.users.save({firstName:"Rich",lastName:"S",gender:"M",country:"CA",age:"18"});
db.users.save({firstName:"Rob",lastName:"M",gender:"M",country:"US",age:"25"});
db.users.save({firstName:"Sarah",lastName:"T",gender:"F",country:"US",age:"13"});
var users = db.users.find();
db.comments.save({userId: users[0]._id, "comment": "Hey, what's up?", created: new ISODate()});
db.comments.save({userId: users[1]._id, "comment": "Not much", created: new ISODate()});
db.comments.save({userId: users[0]._id, "comment": "Cool", created: new ISODate()});
// end sample data setup
mapUsers = function() {
var values = {
country: this.country,
gender: this.gender,
age: this.age
};
emit(this._id, values);
};
mapComments = function() {
var values = {
commentId: this._id,
comment: this.comment,
created: this.created
};
emit(this.userId, values);
};
reduce = function(k, values) {
var result = {}, commentFields = {
"commentId": '',
"comment": '',
"created": ''
};
values.forEach(function(value) {
var field;
if ("comment" in value) {
if (!("comments" in result)) {
result.comments = [];
}
result.comments.push(value);
} else if ("comments" in value) {
if (!("comments" in result)) {
result.comments = [];
}
result.comments.push.apply(result.comments, value.comments);
}
for (field in value) {
if (value.hasOwnProperty(field) && !(field in commentFields)) {
result[field] = value[field];
}
}
});
return result;
};
db.users.mapReduce(mapUsers, reduce, {"out": {"reduce": "users_comments"}});
db.comments.mapReduce(mapComments, reduce, {"out": {"reduce": "users_comments"}});
db.users_comments.find().pretty(); // see the resulting collection
在这一点上,你会有一个新的集合,叫做users_comments
,它包含了合并后的数据,现在你可以使用它。 这些缩小的集合都有_id
,这是你在map函数中发出的键,然后所有的值都是value
键里面的一个子对象--值不在这些缩小的文件的最高层。
这是一个有点简单的例子。 你可以用更多的集合来重复这个过程,只要你想,就可以不断地建立起缩小的集合。 你也可以在这个过程中做数据的总结和聚合。 可能你会定义一个以上的还原函数,因为聚合和保留现有字段的逻辑变得更加复杂。
你还会注意到,现在每个用户都有一个文档,其中有该用户的所有评论都在一个数组中。 如果我们合并的数据是一对一的关系,而不是一对多的关系,那么它将是扁平的,你可以简单地使用这样的减少函数。
reduce = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
如果你想把users_comments
集合扁平化,使其成为每个评论的一个文件,另外运行这个。
var map, reduce;
map = function() {
var debug = function(value) {
var field;
for (field in value) {
print(field + ": " + value[field]);
}
};
debug(this);
var that = this;
if ("comments" in this.value) {
this.value.comments.forEach(function(value) {
emit(value.commentId, {
userId: that._id,
country: that.value.country,
age: that.value.age,
comment: value.comment,
created: value.created,
});
});
}
};
reduce = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
db.users_comments.mapReduce(map, reduce, {"out": "comments_with_demographics"});
这项技术绝对不应该即时执行。 它适合于cron job或类似的定期更新合并后的数据。 你可能想在新的集合上运行ensureIndex
,以确保你对它的查询能快速运行(请记住,你的数据仍然在value
键内,所以如果你要对comments_with_demographics
的评论创建
时间进行索引,这将是db.comments_with_demographics.sureIndex({"value.created" : 1});
代码片段。礼貌-堆栈溢出上的多个帖子,包括这个。
db.cust.drop();
db.zip.drop();
db.cust.insert({cust_id:1, zip_id: 101});
db.cust.insert({cust_id:2, zip_id: 101});
db.cust.insert({cust_id:3, zip_id: 101});
db.cust.insert({cust_id:4, zip_id: 102});
db.cust.insert({cust_id:5, zip_id: 102});
db.zip.insert({zip_id:101, zip_cd:'AAA'});
db.zip.insert({zip_id:102, zip_cd:'BBB'});
db.zip.insert({zip_id:103, zip_cd:'CCC'});
mapCust = function() {
var values = {
cust_id: this.cust_id
};
emit(this.zip_id, values);
};
mapZip = function() {
var values = {
zip_cd: this.zip_cd
};
emit(this.zip_id, values);
};
reduceCustZip = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
if ("cust_id" in value) {
if (!("cust_ids" in result)) {
result.cust_ids = [];
}
result.cust_ids.push(value);
} else {
for (field in value) {
if (value.hasOwnProperty(field) ) {
result[field] = value[field];
}
};
}
});
return result;
};
db.cust_zip.drop();
db.cust.mapReduce(mapCust, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.zip.mapReduce(mapZip, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.cust_zip.find();
mapCZ = function() {
var that = this;
if ("cust_ids" in this.value) {
this.value.cust_ids.forEach(function(value) {
emit(value.cust_id, {
zip_id: that._id,
zip_cd: that.value.zip_cd
});
});
}
};
reduceCZ = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
db.cust_zip_joined.drop();
db.cust_zip.mapReduce(mapCZ, reduceCZ, {"out": "cust_zip_joined"});
db.cust_zip_joined.find().pretty();
var flattenMRCollection=function(dbName,collectionName) {
var collection=db.getSiblingDB(dbName)[collectionName];
var i=0;
var bulk=collection.initializeUnorderedBulkOp();
collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
print((++i));
//collection.update({_id: result._id},result.value);
bulk.find({_id: result._id}).replaceOne(result.value);
if(i%1000==0)
{
print("Executing bulk...");
bulk.execute();
bulk=collection.initializeUnorderedBulkOp();
}
});
bulk.execute();
};
flattenMRCollection("mydb","cust_zip_joined");
db.cust_zip_joined.find().pretty();
你必须在你的应用层中做到这一点。如果你使用的是ORM,它可以使用注解(或类似的东西)来获取其他集合中存在的引用。我只用过Morphia,@Reference
注解会在查询时获取被引用的实体,所以我能够避免在代码中自己做这些。