技术开发 频道

使用NBear.MQ开发分布式系统

【IT168技术文档】

    NBear.MQ是NBearFramework中新增的分布式消息队列模块,作为NBear Framework的新成员,NBear.MQ秉承NBear一贯的易于使用和零配置需要的特点,大大改善开发基于消息队列的分布式系统的效率。本文通过介绍一个基于NBear.MQ的Sample - TestServiceMQ,演示基于NBear.MQ开发分布式系统的基本方法。

    解析

    1、TestRemotingServer

    首先是我们的Server,对于Server,如果您使用NBear.MQ内置的MemoryServiceMQ,则几乎不需要编码,只需要运行并发布server实例为remoting service。

class Program { static void Main(string[] args) { MemoryServiceMQ mq = new MemoryServiceMQ(); mq.OnLog = new LogHandler(Console.WriteLine); RemotingServiceHelper rh = new RemotingServiceHelper(RemotingChannelType.TCP, "127.0.0.1", 8000, new LogHandler(Console.WriteLine)); rh.PublishServiceInstance("MMQ", typeof(IServiceMQ), mq, System.Runtime.Remoting.WellKnownObjectMode.Singleton); while (Console.ReadLine() != "q") { } } }

    注意,关键是第8-9行这里,我们调用RemotingServiceHelper类发布MemoryServiceMQ的实例mq到tcp://127.0.0.1:8000/MMQ。

    如果你不希望使用MemoryServiceMQ,而希望使用基于其它MQ如MSMQ或ActiveMQ系统的MQ控制,您可以自己实现IServiceMQ接口,用来代替这里的mq。

    2、TestServiceHost

    Service host顾名思义就是服务的提供者,本示例的TestServiceHost定义了两个service,MathService和HelloWorldService,如下:
public class MathService : BaseAutoService { public MathService(IServiceMQ mq) : base("demo.math", mq) { } private int getResult(char op, int x, int y) { int rt = 0; switch (op) { case '+': rt = x + y; break; case '-': rt = x - y; break; case '*': rt = x * y; break; case '/': rt = x / y; break; } return rt; } protected override ResponseMessage Run(RequestMessage msg) { Parameter[] parms = msg.Parameters; int rt = getResult(parms[0].Value.ToString()[0], int.Parse(parms[1].Value.ToString()), int.Parse(parms[2].Value.ToString())); ResponseMessage retMsg = new ResponseMessage(); retMsg.ServiceName = msg.ServiceName; retMsg.Parameters = new Parameter[] { new Parameter("Result", rt) }; retMsg.MessageId = Guid.NewGuid(); retMsg.TransactionId = msg.TransactionId; retMsg.RequestHeader = msg.Header; retMsg.Timestamp = DateTime.Now; retMsg.Expiration = DateTime.Now.AddDays(1); return retMsg; } } public class HelloWorldService : BaseAutoService { public HelloWorldService(IServiceMQ mq) : base("demo.helloworld", mq) { } protected override ResponseMessage Run(RequestMessage msg) { Parameter[] parms = msg.Parameters; ResponseMessage retMsg = new ResponseMessage(); retMsg.ServiceName = msg.ServiceName; retMsg.MessageId = Guid.NewGuid(); retMsg.TransactionId = msg.TransactionId; retMsg.RequestHeader = msg.Header; retMsg.Timestamp = DateTime.Now; retMsg.Expiration = DateTime.Now.AddDays(1); retMsg.Text = "hello world"; retMsg.Bytes = System.Text.UTF8Encoding.UTF8.GetBytes(retMsg.Text); retMsg.Data = new System.Data.DataSet("hello world data"); return retMsg; } }
    大家可以看到,这里的service都是从BaseAutoService继承的,您也可以实现IService或继承BaseService。一个可以被NBear.MQ使用的Service基本上做的事情就是接受一个RequestMessage类型的输入参数,返回一个ResponseMessage类型的返回结果。

    RequestMessage和ResponseMessage都从Message类型继承,除了包含请求和返回相关的一组参数(.net基本数据类型),每个Message包含Text、Bytes和Data属性,分别允许存放和传递string,byte[]和Dataset类型的数据。可见Message实际是一个比较通用的DTO。

    下面,我们要将示例service注册到server,这个过程可以非常方便的通过SimpleServiceContainer来实现:
class Program { static void Main(string[] args) { IServiceMQ mq = new RemotingClientHelper(RemotingChannelType.TCP, "127.0.0.1", 8000, 0, new LogHandler(Console.WriteLine)).GetClientInstance<IServiceMQ>("MMQ"); IServiceContainer container = new SimpleServiceContainer(mq); container.RegisterComponent("math host 1", typeof(MathService)); container.RegisterComponent("math host 2", typeof(MathService)); container.RegisterComponent("helloworld 1", typeof(HelloWorldService)); while (Console.ReadLine() != "q") { } } }
    您可以看到,我们首先要通过RemotingClientHelper类获得我们的server发布出来的mq实例。然后,创建一个SimpleServiceContainer实例,并将我们的示例service添加到container,就这么简单。SimpleSeviceContainer会自动为我们注册和绑定service和mq,您也可以注册多个相同和不同类型的service到container。    3、TestClientApp

    Client是一个WinForm程序,自然就是调用Service了。代码如下:
1using System; 2using System.Collections.Generic; 3using System.ComponentModel; 4using System.Data; 5using System.Drawing; 6using System.Text; 7using System.Windows.Forms; 8 9using NBear.MQ.ServiceMQ; 10using NBear.MQ; 11using NBear.Common; 12using NBear.Common.Remoting; 13 14namespace TestClientApp 15{ 16 public partial class Form1 : Form 17 { 18 public Form1() 19 { 20 InitializeComponent(); 21 } 22 23 private void Form1_Load(object sender, EventArgs e) 24 { 25 comboOP.SelectedIndex = 0; 26 27 rh = new RemotingClientHelper(RemotingChannelType.TCP, "127.0.0.1", 8000, 0, new LogHandler(Console.WriteLine)); 28 mq = rh.GetClientInstance<IServiceMQ>("MMQ"); 29 mq.OnLog = new LogHandler(Console.WriteLine); 30 container = new SimpleServiceContainer(mq); 31 } 32 33 private void txtX_TextChanged(object sender, EventArgs e) 34 { 35 txtResult.Text = string.Empty; 36 } 37 38 private void comboOP_SelectedIndexChanged(object sender, EventArgs e) 39 { 40 txtResult.Text = string.Empty; 41 } 42 43 private void txtY_TextChanged(object sender, EventArgs e) 44 { 45 txtResult.Text = string.Empty; 46 } 47 48 RemotingClientHelper rh = null; 49 IServiceMQ mq = null; 50 IServiceContainer container = null; 51 52 private void btnCompute_Click(object sender, EventArgs e) 53 { 54 try 55 { 56 int.Parse(txtX.Text); 57 if (comboOP.SelectedIndex == 3) 58 { 59 if (int.Parse(txtY.Text) != 0) 60 { 61 } 62 } 63 else 64 { 65 int.Parse(txtY.Text); 66 } 67 } 68 catch 69 { 70 MessageBox.Show("x, y must be number, and y must not be 0 when OP is '/'!"); 71 return; 72 } 73 74 RequestMessage msg = new RequestMessage(); 75 msg.Expiration = DateTime.Now.AddDays(1); 76 msg.MessageId = Guid.NewGuid(); 77 msg.Parameters = new Parameter[] { new Parameter("op", comboOP.Text), new Parameter("x", int.Parse(txtX.Text)), new Parameter("y", int.Parse(txtY.Text)) }; 78 msg.ServiceName = "demo.math"; 79 msg.Timestamp = DateTime.Now; 80 msg.TransactionId = Guid.NewGuid(); 81 82 ResponseMessage retMsg = container.CallService(msg.ServiceName, msg); 83 84 if (retMsg != null) 85 { 86 txtResult.Text = retMsg.Parameters[0].Value.ToString(); 87 } 88 else 89 { 90 MessageBox.Show("Call service failed!"); 91 } 92 } 93 94 private void btnHelloWorld_Click(object sender, EventArgs e) 95 { 96 RequestMessage msg = new RequestMessage(); 97 msg.Expiration = DateTime.Now.AddDays(1); 98 msg.MessageId = Guid.NewGuid(); 99 msg.ServiceName = "demo.helloworld"; 100 msg.Timestamp = DateTime.Now; 101 msg.TransactionId = Guid.NewGuid(); 102 103 msg.Text = "who r u?"; 104 105 ResponseMessage retMsg = container.CallService(msg.ServiceName, msg); 106 107 MessageBox.Show(retMsg.Text); 108 } 109 } 110}

    Line 23-31 在Form Load是初始化container,这和service host是完全一样的。
    调用service只需通过container.CallService方法,如Line 82 和 105。

    问题

    1、您可能会问,client的代码除了调用service和service host的代码几乎相同,那么是否可以在client中既使用service又提供service呢 ?当然可以,你也完全可以在client这里register service,当client调用一个service时,container会首先检查是否存在本地service,存在则调用本地,不存在则向server请求远程service。

    2、本示例是在一台服务器上演示这个分布式构架,您也可以修改地址,并分别把server, service host和client部署到互相连接的多台server上。

    运行

    请按如下顺序执行bin中的代码:TestRemotingServer.exe, TestServiceHost.exe,TestClientApp.exe,ServiceHost和Client可以是多个,server将自动随机分配调用service的请求到各个service host进行处理。在运行过程中,请尝试多次点击client的按钮,并观察,server和service host的日志,看是不是负载均衡。您也可以尝试在运行过程中关闭某些service host或client,或者再新增一些,注意server会自动维护service host和client的绑定和回调。

0
相关文章