(MongoDBで)複数のコレクションのデータを1つのコレクションにまとめるにはどうしたらいいですか?
map-reduceは使えるか?使えるとしたらどのように?
私は初心者なので、いくつか例を挙げていただけるとありがたいです。
リアルタイムではできませんが、MongoDB 1.8+ の map/reduce で "reduce" out オプションを使えば、map-reduce を複数回実行してデータをマージすることができます (http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Outputoptions 参照)。 両方のコレクションに、_id として使える何らかのキーが必要です。
たとえば、users
コレクションとcomments
コレクションがあって、各コメントのユーザー属性情報を持つ新しいコレクションを作りたいとします。
例えば、users
コレクションに以下のようなフィールドがあるとします。
そして、comments
コレクションには、次のようなフィールドがあります。
このようなmap/reduceを行うことになります。
var mapUsers, mapComments, reduce;
db.users_comments.remove();
// setup sample data - wouldn't actually use this in production
db.users.remove();
db.comments.remove();
db.users.save({firstName:"Rich",lastName:"S",gender:"M",country:"CA",age:"18"});
db.users.save({firstName:"Rob",lastName:"M",gender:"M",country:"US",age:"25"});
db.users.save({firstName:"Sarah",lastName:"T",gender:"F",country:"US",age:"13"});
var users = db.users.find();
db.comments.save({userId: users[0]._id, "comment": "Hey, what's up?", created: new ISODate()});
db.comments.save({userId: users[1]._id, "comment": "Not much", created: new ISODate()});
db.comments.save({userId: users[0]._id, "comment": "Cool", created: new ISODate()});
// end sample data setup
mapUsers = function() {
var values = {
country: this.country,
gender: this.gender,
age: this.age
};
emit(this._id, values);
};
mapComments = function() {
var values = {
commentId: this._id,
comment: this.comment,
created: this.created
};
emit(this.userId, values);
};
reduce = function(k, values) {
var result = {}, commentFields = {
"commentId": '',
"comment": '',
"created": ''
};
values.forEach(function(value) {
var field;
if ("comment" in value) {
if (!("comments" in result)) {
result.comments = [];
}
result.comments.push(value);
} else if ("comments" in value) {
if (!("comments" in result)) {
result.comments = [];
}
result.comments.push.apply(result.comments, value.comments);
}
for (field in value) {
if (value.hasOwnProperty(field) && !(field in commentFields)) {
result[field] = value[field];
}
}
});
return result;
};
db.users.mapReduce(mapUsers, reduce, {"out": {"reduce": "users_comments"}});
db.comments.mapReduce(mapComments, reduce, {"out": {"reduce": "users_comments"}});
db.users_comments.find().pretty(); // see the resulting collection
この時点で、マージされたデータを含む users_comments
という新しいコレクションが作成され、それを使用できるようになります。 これらの縮小されたコレクションはすべて _id
を持っています。これはマップ関数で出力していたキーであり、すべての値は value
キーの中のサブオブジェクトです - 値はこれらの縮小されたドキュメントのトップレベルにはありません。
これはやや単純な例です。 縮小されたコレクションを構築し続けるために、好きなだけ多くのコレクションでこれを繰り返すことができます。 また、このプロセスでデータのサマリーやアグリゲーションを行うこともできます。 集約や既存のフィールドを保持するためのロジックがより複雑になるため、複数のreduce関数を定義することになるでしょう。
また、各ユーザーのドキュメントが1つになり、そのユーザーのすべてのコメントが配列になっていることにも気づくでしょう。 もし、一対多ではなく一対一の関係にあるデータをマージするのであれば、それはフラットであり、単純に次のようなreduce関数を使うことができます。
reduce = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
users_comments`コレクションをフラットにして、コメントごとに1つのドキュメントにしたい場合は、さらに次のように実行します。
var map, reduce;
map = function() {
var debug = function(value) {
var field;
for (field in value) {
print(field + ": " + value[field]);
}
};
debug(this);
var that = this;
if ("comments" in this.value) {
this.value.comments.forEach(function(value) {
emit(value.commentId, {
userId: that._id,
country: that.value.country,
age: that.value.age,
comment: value.comment,
created: value.created,
});
});
}
};
reduce = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
db.users_comments.mapReduce(map, reduce, {"out": "comments_with_demographics"});
このテクニックは絶対にその場で実行してはいけません。 定期的にマージされたデータを更新する cron ジョブなどに適しています。 おそらく、新しいコレクションに対して ensureIndex
を実行して、クエリが素早く実行されるようにしたいと思うでしょう (データはまだ value
キーの中にあることに注意してください。つまり、comments_with_demographics
をコメントの created
時間に基づいてインデックス化するとしたら、db.comments_with_demographics.ensureIndex({"value.created": 1});
となります。
コードスニペット。提供-この記事を含め、スタックオーバーフローに複数の記事が掲載されています。
db.cust.drop();
db.zip.drop();
db.cust.insert({cust_id:1, zip_id: 101});
db.cust.insert({cust_id:2, zip_id: 101});
db.cust.insert({cust_id:3, zip_id: 101});
db.cust.insert({cust_id:4, zip_id: 102});
db.cust.insert({cust_id:5, zip_id: 102});
db.zip.insert({zip_id:101, zip_cd:'AAA'});
db.zip.insert({zip_id:102, zip_cd:'BBB'});
db.zip.insert({zip_id:103, zip_cd:'CCC'});
mapCust = function() {
var values = {
cust_id: this.cust_id
};
emit(this.zip_id, values);
};
mapZip = function() {
var values = {
zip_cd: this.zip_cd
};
emit(this.zip_id, values);
};
reduceCustZip = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
if ("cust_id" in value) {
if (!("cust_ids" in result)) {
result.cust_ids = [];
}
result.cust_ids.push(value);
} else {
for (field in value) {
if (value.hasOwnProperty(field) ) {
result[field] = value[field];
}
};
}
});
return result;
};
db.cust_zip.drop();
db.cust.mapReduce(mapCust, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.zip.mapReduce(mapZip, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.cust_zip.find();
mapCZ = function() {
var that = this;
if ("cust_ids" in this.value) {
this.value.cust_ids.forEach(function(value) {
emit(value.cust_id, {
zip_id: that._id,
zip_cd: that.value.zip_cd
});
});
}
};
reduceCZ = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
db.cust_zip_joined.drop();
db.cust_zip.mapReduce(mapCZ, reduceCZ, {"out": "cust_zip_joined"});
db.cust_zip_joined.find().pretty();
var flattenMRCollection=function(dbName,collectionName) {
var collection=db.getSiblingDB(dbName)[collectionName];
var i=0;
var bulk=collection.initializeUnorderedBulkOp();
collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
print((++i));
//collection.update({_id: result._id},result.value);
bulk.find({_id: result._id}).replaceOne(result.value);
if(i%1000==0)
{
print("Executing bulk...");
bulk.execute();
bulk=collection.initializeUnorderedBulkOp();
}
});
bulk.execute();
};
flattenMRCollection("mydb","cust_zip_joined");
db.cust_zip_joined.find().pretty();
それはアプリケーション層で行う必要があります。もしORMを使っているのであれば、アノテーション(あるいはそれに類するもの)を使って他のコレクションに存在する参照を引き出すことができるでしょう。私はMorphiaしか使ったことがありませんが、@Reference
アノテーションはクエリをかけたときに参照されるエンティティをフェッチするので、コードの中で自分でやらなくても済むようになっています。