技术开发 频道

利用 Remoting 实现异步队列机制



【IT168 技术文档】

    很多需要提高应用性能(提高立即响应速度(但不立即处理)、提高吞吐能力、提升用户体验)的场景,都采用异步处理的机制,.Net 中可能用 MSMQ 的实现样例不少,其实自行实现异步队列并不复杂。

最近有两个项目组的同事向我要了类似的关于实现异步队列的方法,
于是翻出了两年以前写的实现 SP 与运营商短信网关通信的上下行短信队列处理相关的代码,
尽管不难,自觉多少有点儿技术含量,拿出来晒晒,

因此去掉了所有的与业务相关的逻辑,保留通用的异步处理功能,
提炼出一套完整的、通用的、核心关键代码,编译后即可运行,方便大家。

该 Remoting 异步队列的实现,主要使用的 .Net 技术是多线程、Remoting、泛型容器类...
流程如下
1.并发若干客户端程序通过调用 RemotingQueue Server 提供的公开远程方法 Enqueue 将数据元素入队尾
2.RemotingQueue Server 发现队列不为空,则并发若干(<n)多线程陆续 Dequeue 队首数据元素并处理

注意:
1.队列的数据元素定义需自行实现
2.对出列数据元素的处理程序需自行实现

本人手懒,就写这么多,不再多写废话!
完整代码如下,注释不多,手懒,请谅解!

/* Remoting 异步队列实现,流程如下 1.并发若干客户端程序通过调用 RemotingQueue Server 提供的公开远程方法 Enqueue 将数据元素入队尾 2.RemotingQueue Server 发现队列不为空,则并发若干线程陆续 Dequeue 队首数据元素并处理 注意: 1.队列的数据元素定义需自行实现 2.对出列数据元素的处理程序需自行实现 */ // server.cs //C:\WINDOWS\Microsoft.NET\Framework\v2.0.50727\csc.exe server.cs /r:share.dll //.Net 2.0 Remoting 宿主程序 服务 + Console namespace Microshaoft.RemotingObjects.Server { using System; using System.Threading; using System.Collections; using System.Runtime.Remoting; using System.Runtime.Remoting.Channels; using System.Runtime.Remoting.Channels.Tcp; using System.Runtime.Serialization.Formatters; using System.ServiceProcess; using System.ComponentModel; using System.Configuration.Install; using Microshaoft.RemotingObjects.Share; public class RemotingQueueServiceHost : ServiceBase { /**//// <summary> /// 应用程序的主入口点。 /// </summary> //[STAThread] static void Main(string[] args) { //Microshaoft //Microshaoft TODO: 在此处添加代码以启动应用程序 //Microshaoft RemotingQueueServiceHost service = new RemotingQueueServiceHost(); if (args != null) { Console.WriteLine("Console"); service.OnStart(null); Console.ReadLine(); return; } Console.WriteLine("Service"); ServiceBase.Run(service); } protected override void OnStart(string[] args) { Console.WriteLine(Environment.Version.ToString()); BinaryServerFormatterSinkProvider provider = new BinaryServerFormatterSinkProvider(); provider.TypeFilterLevel = TypeFilterLevel.Full; IDictionary props = new Hashtable(); props["port"] = 8080; TcpChannel tc = new TcpChannel(props, null, provider); ChannelServices.RegisterChannel(tc, false); RemotingConfiguration.RegisterWellKnownServiceType ( typeof(RemotingQueue) , "queueurl" , WellKnownObjectMode.Singleton ); RemotingQueue.OnDequeue += new RemotingQueue.QueueEventHandler(DequeueProcess); Console.WriteLine("Server . , Press Enter key to exit."); } public static void DequeueProcess(Item item) { //Microshaoft TO DO //Microshaoft 队列的数据元素定义需自行实现 //Microshaoft 数据库访问 //Microshaoft 发邮件等 DateTime DequeueTime = DateTime.Now; //Microshaoft Thread.Sleep(100); //Microshaoft 模拟长时间任务 DateTime EndTime = DateTime.Now; Console.WriteLine ( "QueueRemainCount {0}, Enqueue {1}, Dequeue {2},[{3}], End {4},[{5}],[{6}]" , RemotingQueue.Count , item.EnqueueTime , DequeueTime , (DequeueTime.Ticks - item.EnqueueTime.Ticks)/10000 , EndTime , (EndTime.Ticks - DequeueTime.Ticks)/10000 , RemotingQueue.ConcurrentThreadsCount ); } } [RunInstallerAttribute(true)] public class ProjectInstaller: Installer { private ServiceInstaller serviceInstaller; private ServiceProcessInstaller processInstaller; public ProjectInstaller() { processInstaller = new ServiceProcessInstaller(); serviceInstaller = new ServiceInstaller(); //Microshaoft Service will run under system account processInstaller.Account = ServiceAccount.LocalSystem; //Microshaoft Service will have Start Type of Manual serviceInstaller.StartType = ServiceStartMode.Manual; serviceInstaller.ServiceName = "RemotingQueueService"; Installers.Add(serviceInstaller); Installers.Add(processInstaller); } } }
//Microshaoft ===================== //Microshaoft Remoting Server Object namespace Microshaoft.RemotingObjects { using System; using System.IO; using System.Net; using System.Web; using System.Text; using System.Threading; using System.Configuration; using System.Collections.Generic; using Microshaoft.RemotingObjects.Share; public class RemotingQueue : MarshalByRefObject { public delegate void QueueEventHandler(Item item); public static event QueueEventHandler OnDequeue; private static Queue<Item> _Queue = new Queue<Item>(); private static object _SyncLockObject = new object(); private static int _MaxConcurrentThreadsCount = 10; //Microshaoft 允许并发出列处理线程数为 10 private static int _ConcurrentThreadsCount = 0; //Microshaoft 用于控制并发线程数 private static bool _QueueRuning = false; //Microshaoft 服务启动后可立即开启新的线程调用此方法(死循环) private static void QueueRun() //Microshaoft ThreadStart { if (!_QueueRuning) { _QueueRuning = true; lock (_SyncLockObject) { ThreadStart ts = new ThreadStart(QueueRunThreadProcess); Thread t = new Thread(ts); t.Name = "QueueRunThreadProcess"; t.Start(); } } } public static int Count { get { return _Queue.Count; } } public static int ConcurrentThreadsCount { get { return _ConcurrentThreadsCount; } } private static void QueueRunThreadProcess() { Console.WriteLine("Queue Runing "); while (_Queue.Count > 0) //Microshaoft 死循环 { Item item = null; lock(_SyncLockObject) { if (_ConcurrentThreadsCount < _MaxConcurrentThreadsCount) { if (_Queue.Count > 0) { _ConcurrentThreadsCount ++; item = _Queue.Dequeue(); } } } if (item != null) { //Microshaoft ThreadPool.QueueUserWorkItem(new WaitCallback(OnDequeueThreadProcess), item); ThreadProcessState tps = new ThreadProcessState(); tps.Item = item; Thread t = new Thread(new ThreadStart(tps.ThreadProcess)); t.Name = string.Format("ConcurrentThread[{0}]", _ConcurrentThreadsCount); t.Start(); } } _QueueRuning = false; Console.WriteLine("Queue Running Stopped "); } public void Enqueue(Item item) { //Microshaoft 队列的数据元素定义需自行实现 Item item.EnqueueTime = DateTime.Now; _Queue.Enqueue(item); //Microshaoft Console.WriteLine("Enqueue {0}", item.Name); if (!_QueueRuning) { QueueRun(); } } private static void OnDequeueThreadProcess(Item item) { try { if (OnDequeue != null) { OnDequeue(item); } DequeueThreadProcess(); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { lock (_SyncLockObject) { //Microshaoft Console.WriteLine("工作线程数: {0}", _ConcurrentThreadsCount - 1); _ConcurrentThreadsCount --; } } } static int count = 0; private static void DequeueThreadProcess() { while (_Queue.Count > 0) { Item item = null; lock (_SyncLockObject) { if (_Queue.Count > 0) { item = _Queue.Dequeue(); count ++; //Microshaoft Console.WriteLine("Queue Count: {0},count: {1}", _Queue.Count, count); } } if (item != null) { if (OnDequeue != null) { OnDequeue(item); } } } } private class ThreadProcessState { private Item _item; public Item Item { get { return _item; } set { _item = value; } } public void ThreadProcess() { //Microshaoft Console.WriteLine("{0} Thread Start:", Thread.CurrentThread.Name); RemotingQueue.OnDequeueThreadProcess(_item); //Microshaoft Console.WriteLine("{0} Thread End!", Thread.CurrentThread.Name); if (RemotingQueue._ConcurrentThreadsCount == 0) { Console.WriteLine("All Threads Finished! Queue Count {0}", RemotingQueue.Count); } } } } }
// share.cs //Server、Client 均需引用此 share.dll //C:\WINDOWS\Microsoft.NET\Framework\v2.0.50727\csc.exe /t:library share.cs //TO DO //队列的数据元素定义需自行实现,示例如下: namespace Microshaoft.RemotingObjects.Share { using System; [Serializable] public class Item { private string _Name; public string Name { get { return _Name; } set { _Name = value; } } private DateTime _EnqueueTime; public DateTime EnqueueTime { get { return _EnqueueTime; } set { _EnqueueTime = value; } } } }
// client.cs //C:\WINDOWS\Microsoft.NET\Framework\v1.1.4322\csc.exe client.cs /r:share.dll namespace Microshaoft.RemotingObjects.Client { using System; using System.Collections; using System.Runtime.Remoting; using System.Runtime.Remoting.Channels; using System.Runtime.Remoting.Channels.Tcp; using System.Runtime.Serialization.Formatters; using System.Threading; using Microshaoft.RemotingObjects; using Microshaoft.RemotingObjects.Share; public class Class1 { static RemotingQueue _queue; public static void Main() { Console.WriteLine(Environment.Version.ToString()); Class1 a = new Class1(); a.Run(); } public void Run() { _queue = (RemotingQueue) Activator.GetObject ( typeof(RemotingQueue) , "tcp://127.0.0.1:8080/queueUrl" ); //Microshaoft 以下是耗时的主程序 for (int i = 0; i < 100; i++) { Thread x = new Thread(new ThreadStart(ThreadProcess)); x.Start(); } } public void ThreadProcess() { for (int i = 0; i < 100; i++) { Item x = new Item(); DateTime EnqueueTime = DateTime.Now; x.Name = EnqueueTime.ToString(); _queue.Enqueue(x); Console.WriteLine ( "Enqueue: {0},[{1}]" , EnqueueTime , (DateTime.Now.Ticks - EnqueueTime.Ticks)/10000 ); } } } } //Microshaoft ========================================= //Microshaoft Remoting Object Client Local Proxy namespace Microshaoft.RemotingObjects { using System; using Microshaoft.RemotingObjects.Share; public interface RemotingQueue { void Enqueue(Item item); } }

0
相关文章