技术开发 频道

MongoDB分布式存储的MapReduce并行查询

        【IT168 技术】之前的文章中介绍了如何基于Mongodb进行关系型数据的分布式存储,有了存储就会牵扯到查询。虽然用普通的方式也可以进行查询,但今天要介绍的是如何使用MONGODB中提供的MapReduce功能进行查询。

  今天介绍如何基于sharding机制进行mapreduce查询。在MongoDB的官方文档中,这么一句话:

Sharded Environments
      In sharded environments, data processing of map
/reduce operations runs in parallel on all shards.

  即: map/reduce操作会并行运行在所有的shards上。

  下面我们就用之前这篇文章中白搭建的环境来构造mapreduce查询:

  首先要说的是,基于sharding的mapreduce与非sharding的数据在返回结构上有一些区别,我目前注意到的主要是不支持定制式的json格式的返回数据,也就是下面方式可能会出现问题:

  return { count : total };

  注意:上面的情况目前出现在了我的测试环境下,如下图:

MongoDB分布式存储的MapReduce并行查询

  就需要改成 return count;

  下面是测试代码,首先是按帖子id来查询相应数量(基于分组查询实例方式):

Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/-->public partial class getfile : System.Web.UI.Page
    {

        
public Mongo Mongo { get; set; }

        
public IMongoDatabase DB
        {
            get
            {
                
return this.Mongo["dnt_mongodb"];
            }
        }

        
/// <summary>
        
/// Sets up the test environment.  You can either override this OnInit to add custom initialization.
        
/// </summary>
        
public virtual void Init()
        {
            string ConnectionString
= "Server=10.0.4.85:27017;ConnectTimeout=30000;ConnectionLifetime=300000;MinimumPoolSize=512;MaximumPoolSize=51200;Pooled=true";
            
if (String.IsNullOrEmpty(ConnectionString))
                throw new ArgumentNullException("Connection string
not found.");
            this.Mongo
= new Mongo(ConnectionString);
            this.Mongo.Connect();        
        }
        string mapfunction
= "function(){\n" +
                        "  
if(this._id=='548111') { emit(this._id, 1); } \n" +  
                        "};";

        string reducefunction
= "function(key, current ){" +
                                "  
var count = 0;" +
                                "  
for(var i in current) {" +
                                "      
count+=current[i];" +
                                "   }"
+
                                "  
return count ;\n" +
                              "};";

      
        protected void Page_Load(object sender, EventArgs e)
        {
            Init();

            
var mrb = DB["posts1"].MapReduce();//attach_gfstream.files
            
int groupCount = 0;
            using (
var mr = mrb.Map(mapfunction).Reduce(reducefunction))
            {
                foreach (Document doc
in mr.Documents)
                {
                    groupCount
= int.Parse(doc["value"].ToString());
                }
            }
            this.Mongo.Disconnect();
        }    
     }
0
相关文章