【IT168 技术文档】
上一篇文档:WCF中的发布-订阅服务(上)
事件发布
目前介绍的发布-订阅框架只包含处理订阅管理的一部分。该框架同样可以简化发布服务的实现。发布服务必须支持与订阅者相同的事件契约,这是获知应用程序的发布者的唯一连接点。因为发布服务在端点中暴露了事件契约,我们需要将事件契约标记为服务契约。假定我们只是通过双向回调使用临时订阅者:
[ServiceContract]发布-订阅框架包含了辅助类PublishService<T>,定义如下:
interface IMyEvents
{
[OperationContract(IsOneWay = true)]
void OnEvent1( );
[OperationContract(IsOneWay = true)]
void OnEvent2(int number);
[OperationContract(IsOneWay = true)]
void OnEvent3(int number,string text);
}
public abstract class PublishService<T> where T : classPublishService<T>定义了一个类型参数,它的类型为事件契约。若要提供开发者自己的发布服务,可以继承PublishService<T>,然后使用FireEvent()方法将事件传递给所有订阅者,不管它们是临时订阅者还是持久订阅者,如例B-5所示。
{
protected static void FireEvent(params object[] args);
}
例B-5 实现一个事件发布服务
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class MyPublishService : PublishService<IMyEvents>,IMyEvents
{
public void OnEvent1( )
{
FireEvent( );
}
public void OnEvent2(int number)
{
FireEvent(number);
}
public void OnEvent3(int number,string text)
{
FireEvent(number,text);
}
}
注意,我们可以使用FireEvent()触发任意类型的事件,而不用考虑参数的个数,因为我们使用了params object数组。
最后,应用程序需要为包含了事件契约的发布服务公开一个终结点:
<services>例B-6演示了PublishService<T>的实现。
<service name = "MyPublishService">
<endpoint
address = "..."
binding = "..."
contract = "IMyEvents"
/>
</service>
</services>
例B-6 实现PublishService<T>
public abstract class PublishService<T> where T : class
{
protected static void FireEvent(params object[] args)
{
StackFrame stackFrame = new StackFrame(1);
string methodName = stackFrame.GetMethod( ).Name;
//解析显式接口实现
if(methodName.Contains("."))
{
string[] parts = methodName.Split('.');
methodName = parts[parts.Length-1];
}
FireEvent(methodName,args);
}
static void FireEvent(string methodName,params object[] args)
{
PublishPersistent(methodName,args);
PublishTransient(methodName,args);
}
static void PublishPersistent(string methodName,params object[] args)
{
T[] subscribers = SubscriptionManager<T>.GetPersistentList(methodName);
Publish(subscribers,true,methodName,args);
}
static void PublishTransient(string methodName,params object[] args)
{
T[] subscribers = SubscriptionManager<T>.GetTransientList(methodName);
Publish(subscribers,false,methodName,args);
}
static void Publish(T[] subscribers,bool closeSubscribers,string methodName,
params object[] args)
{
WaitCallback fire = delegate(object subscriber)
{
Invoke(subscriber as T,methodName,args);
if(closeSubscribers)
{
using(subscriber as IDisposable)
{}
}
};
Action<T> queueUp = delegate(T subscriber)
{
ThreadPool.QueueUserWorkItem(fire,subscriber);
};
Array.ForEach(subscribers,queueUp);
}
static void Invoke(T subscriber,string methodName,object[] args)
{
Debug.Assert(subscriber != null);
Type type = typeof(T);
MethodInfo methodInfo = type.GetMethod(methodName);
try
{
methodInfo.Invoke(subscriber,args);
}
catch(Exception e)
{
Trace.WriteLine(e.Message);
}
}
}
若要通过发布服务简化事件的触发过程,FireEvent()方法需要接受多个参数,并将它们传递给订阅者。而且,发布服务的调用者并不提供订阅者调用的操作名。要实现这一目的,FireEvent()方法会访问它的堆栈帧,以获取它正在调用的方法名。然后使用重载版本的FireEvent(),它会接收方法名。该方法紧接着会调用辅助方法PublishPersistent(),发布给所有的持久订阅者,以及调用PublishTransient()辅助方法,发布给所有的临时订阅者。两种发布方法的实现几乎完全相同:它们通过访问SubscriptionManager<T>以获取各自的订阅者列表,然后使用Publish()方法触发事件。订阅者以订阅者代理数组的形式返回。该数据会被传递给Publish()方法。
采用这种办法,Publish()可以简单地调用订阅者。但是,我还需要支持事件的并发发布,如果能够这样,即使订阅者在处理事件时耗费时间过长,仍然不会影响其它订阅者能够及时地接收事件。注意,将事件操作标记为单向并不能保证异步调用。此外,在事件操作没有被标记为单向操作时,我还需要支持并发发布。Publish()定义了两个匿名方法。第一个匿名方法调用了Invoke()辅助方法,通过Invoke()方法触发事件,并传递到提供的单独的订阅者。如果SubscriptionManager<T>指定要求关闭代理,则匿名方法还会关闭代理。由于Invoke()方法不可能调用编译后的指定的订阅者类型,因此需要使用反射对调用进行迟绑定。同时,Invoke()方法还会禁止调用抛出的任何异常,因为这些都不是发布方所需要关注的。第二个匿名方法则对第一个匿名方法委托对象进行排队,以便于线程池中的线程执行。最后,Publish()对方法提供的subscribers数组中的每个订阅者执行第二个匿名方法的操作。
注意,PublishService<T>对待订阅者是一视同仁的,它并没有区分订阅者是临时的,还是持久的。唯一的区别是在发布到持久订阅者之后,需要关闭代理。当然不一致的地方还包括获取订阅列表的方法,分别为GetTransientList()和GetPersistentList()。两者之中,GetTransientList()更简单:
public abstract class SubscriptionManager<T> where T : class
{
internal static T[] GetTransientList(string eventOperation)
{
lock(typeof(SubscriptionManager<T>))
{
if(m_TransientStore.ContainsKey(eventOperation))
{
List<T> list = m_TransientStore[eventOperation];
return list.ToArray( );
}
return new T[]{};
}
}
//更多成员
}
GetTransientList()根据指定的操作查找临时存储的所有订阅者,然后以数组形式返回。GetPersistentList()面临更大的挑战:框架没有现成的持久订阅者的代理列表;我们所能获知的只是它们的地址。因此,GetPersistentList()需要实例化持久订阅者的代理,如例B-7所示。
例B-7 创建持久订阅者的代理列表
public abstract class SubscriptionManager<T> where T : class若要为每个订阅者创建代理,GetPersistentList()方法需要订阅者的地址、绑定和契约。当然,契约正好就是SubscriptionManager<T>的类型参数。为获取地址,GetPersistentList()方法调用了GetSubscribersToContractEventOperation()查询数据库,然后返回那些订阅了特定事件的持久订阅者的所有地址的数组。现在,GetPersistentList()方法所需要的就是每个订阅者的绑定。为此,GetPersistentList()调用了辅助方法GetBindingFromAddress(),它可以根据地址样式推断出使用的绑定。GetBindingFromAddress()将所有的HTTP地址当作WSHttpBinding。此外,GetBindingFromAddress()还为每个绑定实现了可靠性以及事务传播。如果没有使用单向操作,则允许在发布者的事务中包含事件,例如这样的事件契约:
{
internal static T[] GetPersistentList(string eventOperation)
{
string[] addresses = GetSubscribersToContractEventOperation(
typeof(T).ToString( ),eventOperation);
List<T> subscribers = new List<T>(addresses.Length);
foreach(string address in addresses)
{
Binding binding = GetBindingFromAddress(address);
T proxy = ChannelFactory<T>.CreateChannel(binding,
new EndpointAddress(address));
subscribers.Add(proxy);
}
return subscribers.ToArray( );
}
static string[] GetSubscribersToContractEventOperation(string eventsContract,
string eventOperation)
{
//使用ADO.NET在SQL Server中查询事件的订阅者
}
static Binding GetBindingFromAddress(string address)
{
if(address.StartsWith("http:") || address.StartsWith("https:"))
{
WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message,true);
binding.ReliableSession.Enabled = true;
binding.TransactionFlow = true;
return binding;
}
if(address.StartsWith("net.tcp:"))
{
NetTcpBinding binding = new NetTcpBinding(SecurityMode.Message,true);
binding.ReliableSession.Enabled = true;
binding.TransactionFlow = true;
return binding;
}
/* 命名管道绑定与MSMQ绑定与上面的代码相似*/
Debug.Assert(false,"Unsupported protocol specified");
return null;
}
//更多成员
}
[ServiceContract]
interface IMyEvents
{
[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
void OnEvent1( );
[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
void OnEvent2(int number);
[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
void OnEvent3(int number,string text);
}
管理持久订阅者
如果我们要在运行时添加和移除持久订阅,可以使用例B-3所示的IPersistentSubscriptionService接口的方法,因为它们持久的本质,管理订阅最合适的方式就是通过某种管理工具。为了这一目的,IPersistentSubscriptionService定义了额外的操作,以支持对订阅者存储的各种查询。
例B-8 IPersistentSubscriptionService接口
[DataContract]所有的这些管理操作都使用了一个简单的数据结构PersistentSubscription,它包含了订阅者的地址、契约和事件。
public struct PersistentSubscription
{
[DataMember]
public string Address;
[DataMember]
public string EventsContract;
[DataMember]
public string EventOperation;
}
[ServiceContract]
public interface IPersistentSubscriptionService
{
//管理操作
[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
PersistentSubscription[] GetAllSubscribers( );
[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
PersistentSubscription[] GetSubscribersToContract(string eventsContract);
[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
string[] GetSubscribersToContractEventType(string eventsContract,
string eventOperation);
[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
PersistentSubscription[] GetAllSubscribersFromAddress(string address);
//更多成员
}
GetAllSubscribers()方法只是返回所有订阅者的列表。GetSubscribersToContract()方法则返回对应于特定契约的所有订阅者,而GetSubscribersToContractEventType()方法则返回对应于特定契约的特定事件操作的所有订阅者。最后,出于完整性的考虑,GetAllSubscribersFromAddress()根据提供的指定地址,返回符合条件的所有订阅者。我设计的发布-订阅框架包含了一个简单的持久订阅管理工具,名为持久订阅管理器(Persistent Subscription Manager),如图B-2所示。
图B-2 持久订阅管理器应用程序
管理工具使用IPersistentSubscriptionService添加和移除订阅。要添加新的订阅,需要为它提供事件契约定义的元数据交换地址。我们可以使用持久订阅者自己的元数据交换地址,也可以使用发布服务的元数据交换地址(例如例B-5中定义的发布服务),因为它们是多态的。在MEX地址文本框中输入元数据交换基地址,然后单击Lookup按钮。工具会以编码方式获取事件服务的元数据,生成Contract和Event组合框。我们可以使用第2章介绍的MetadataHelper获取元数据,解析它的内容。
如果要订阅,则提供持久订阅者的地址,然后单击Subscribe按钮。持久订阅管理器会调用订阅服务(在实例中为MySubscriptionService服务)添加订阅。订阅服务的地址由持久订阅管理器的配置文件维护。
发布-订阅模式同样可以解除系统安全的耦合。相对于多个订阅者以及多个潜在的安全机制,所有的发布者都需要针对一个单独的发布者认证它们自身。接着,订阅者只需要允许发布服务将它们传递给事件,而不是系统的所有发布者,因为发布者信任发布服务会对发布者进行妥善地授权与认证。对于发布服务而言,如果应用基于角色的安全机制,可以允许开发者在一个地方轻易地应用多个角色,以授权支持跨系统发布一个事件。
队列发布者与队列订阅者
如果发布事件或订阅事件时不使用同步绑定,可以使用NetMsmqBinding。一个队列发布-订阅服务组合了松散耦合系统的优势,以及离线执行(Disconnected Execution)的灵活性。当然,使用队列事件时,契约的所有事件需要被标记为单向操作。如图B-3所示,我们可以让队列的两端保持独立。
图B-3 队列发布-订阅
我们可以使用一个队列发布者和联机的同步订阅者,也可以让一个联机的发布者发布给队列订阅者,或者同时为队列发布者与队列订阅者。然而需要注意的是,我们不能使用队列的临时订阅,因为它不支持MSMQ绑定下的双向回调,从而无法使用通信的断开特性。如前所示,我们也可以使用管理工具管理订阅者,管理的操作仍然是联机的,以及同步的。
队列发布者
如果要使用一个队列发布者,那么在发布服务时,需要使用MSMQ绑定暴露一个队列终结点。当队列发布者触发事件时,发布服务可以为离线状态,或者发布的客户端自身可以是断开的。注意,当发布两个事件到队列发布服务中时,并不能保证它们传递的顺序,以及终端订阅者对这些事件的处理。只有当事件契约被配置为一个会话服务时,并且只有在处理一个单独的发布服务时,我们才能够假定发布的顺序。
队列订阅者
要部署一个队列订阅者,持久订阅服务需要暴露一个队列终结点。即使发布者为联机状态,这样做也能够使得它处于离线状态。当订阅者再次联机时,就会收到所有排队等候的事件。此外,当发布服务自身为断开状态时,队列订阅者也能够处理,因为并没有事件被丢失。在一个单独的队列订阅者中,如果触发多个事件,无法保证事件传递的顺序。如果事件契约拥有一个会话,则订阅者只能假定发布的顺序。当然,如果同时为队列发布者与队列订阅者,则允许它们在同一时间可以脱机工作。