技术开发 频道

WCF中的发布-订阅服务


【IT168 专稿】 

    针对事件使用原来的双向回调通常会引入发布者与订阅者的高度耦合。订阅者必须知道所有发布的服务在应用程序的位置,并连接它们。订阅者不能识别的发布者无法通知事件的订阅者。如果增加新的订阅者(或者移除已经存在的订阅者)就会给已经部署了的应用程序带来困难。无论什么时候,对于应用程序的任何人发出的事件的一个特定类型,订阅者都无法要求获得通知。此外,订阅者必须为每个发布者发出多个昂贵的调用,不管是订阅还是取消订阅。不同的发布者可能会触发相同的事件,但却为订阅者和取消订阅提供了略微不同的方法,自然而然给订阅者与相关方法带来耦合。

   大致相同的是,发布者只能通知它知道的订阅者。无论是谁,如果希望接收事件,发布者都无法将消息传递给它,也没有能力广播事件。此外,所有的发布者都必须包含必要的代码,管理订阅者列表以及自身的发布行为。这些代码几乎与服务要解决的业务问题无关,如果还要提供一些高级特性,例如并发发布,就会增加相当大的复杂度。

   而且,基于双向的回调同样会引入发布者与订阅者生命周期的耦合度。为了订阅和接收事件,必须运行订阅者。

   订阅者无法询问事件是否被触发,而应用程序则需要创建一个订阅者的实例,让它处理该事件。

   安全性则代表了另外一种耦合:订阅者需要通过各种安全模式以及使用的证书,以具备验证所有发布者的能力。同时,发布者也需要具有足够的安全证书,从而允许触发事件,不同的发布者可能具有不同的角色成员机制。

    最后,必须以编程方式设置订阅信息。我们很难通过管理方式在应用程序中配置订阅信息,或者在系统运行时,改变订阅者的选项。

    这些问题实际上不是WCF双向调用所特有的,过去的技术例如COM连接点或者.NET委托同样具有这样的特性。所有这些都属于紧密耦合的事件管理机制。

发布-订阅设计模式

    若要解决以上提及的问题,可以使用已知的发布-订阅设计模式对它们进行设计。该模式所隐藏的含义很简单:通过引入一个专门的订阅服务,以及一个专门的发布服务,解除发布者与订阅者之间的耦合,如图B-1所示。

 

    图B-1 一个发布-订阅系统

    需要订阅事件的订阅者注册订阅服务,该服务负责管理订阅者列表,同时为取消订阅提供了相似的功能。同样,所有发布者均使用发布服务触发它们的事件,避免将事件直接传递给订阅者。订阅和发布服务提供了一个间接层,从而解除了与系统之间的耦合。订阅者不需要了解发布者的身份。它们能够订阅事件类型,以及接收任何发布者的事件,并且订阅机制对于所有订阅者都是统一的。事实上,发布者不需要管理任何订阅列表,也不用关心订阅者是谁。它们会将事件传递给发布服务,然后再传递给需要事件的订阅者。


订阅者类型

   我们甚至可以定义两种类型的订阅者:临时订阅者是在内存中运行的订阅者;持久订阅者则是持久化到磁盘的订阅者,它们代表了服务对发生事件的调用时间。对于临时订阅者,可以使用双向回调机制,通过它将回调引用传递到正在运行的服务。持久订阅者则需要将订阅者地址当作引用进行记录。当事件发生时,发布服务会调用持久订阅者的地址,然后将事件传递给它。两种订阅类型还有另外一个显著区别,就是我们可以将持久订阅者存储在磁盘或数据库中。这样就能够在关闭应用程序或机器崩溃或重启的时候,持久化订阅者。这一过程允许以管理方式对订阅进行配置。显然,在关闭应用程序时,我们不能存储临时订阅,而需要在每次应用程序启动时,明确地创建临时订阅。

发布-订阅框架

    本书附带的源代码中包含了完整的发布-订阅实例。我并不只是提供了发布-订阅服务以及客户端的实例,而且还提供了一个通用的框架,能够自动实现发布-订阅服务,以及增加对所有应用程序的支持。若要构建这样的框架,首先需要分解管理发布-订阅的接口,然后为临时订阅和持久订阅以及发布提供单独的契约。

管理临时订阅

   可以使用我定义的ISubscriptionService接口管理临时订阅,定义如例B-1所示。

   例B-1 管理临时订阅者的ISubscriptionService接口
[ServiceContract]
public interface ISubscriptionService
{
[OperationContract]
void Subscribe(string eventOperation);

[OperationContract]
void Unsubscribe(string eventOperation);
}
    注意,ISubscriptionService接口无法识别实现了它所期待的终结点的回调契约。作为一个通用的接口,它与特定的回调契约无关。定义这些回调契约取决于如何使用应用程序。通过继承ISubscriptionService接口,可以在应用程序中提供回调接口,并指定所需的回调契约:

interface IMyEvents 
{
[OperationContract(IsOneWay = true)]
void OnEvent1( );

[OperationContract(IsOneWay = true)]
void OnEvent2(int number);

[OperationContract(IsOneWay = true)]
void OnEvent3(int number,string text);
}

[ServiceContract(CallbackContract = typeof(IMyEvents))]
interface IMySubscriptionService : ISubscriptionService
{}

    通常,回调契约的每个操作都对应于特定的事件。ISubscriptionService的子接口(在本例中为IMySubscriptionService接口)不需要添加操作。ISubscriptionService接口提供了临时订阅的管理功能。每次调用Subscribe()或Unsubscribe()方法时,订阅者需要提供它需要订阅或取消订阅的操作名(以及事件名)。如果调用者希望订阅所有的事件,则传递一个空字符串,或者null值。

   我设计的框架提供了ISubscriptionService接口方法的实现,形式为SubscriptionManager<T>泛型抽象类:

public abstract class SubscriptionManager<T> where T : class 
{
public void Subscribe(string eventOperation);
public void Unsubscribe(string eventOperation);
//更多成员
}
   SubscriptionManager<T>的泛型类型参数为事件契约。注意,SubscriptionManager<T>并没有实现ISubscriptionService接口。

   应用程序需要以终结点形式暴露自己的临时订阅服务,该终结点需要支持ISubscriptionService接口的特定子接口。为此,应用程序需要提供派生自SubscriptionManager<T>的服务类,并将回调契约指定为类型参数,同时还要实现ISubscriptionService接口的子接口。例如,使用IMyEvent回调接口实现一个临时订阅服务:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] 
class MySubscriptionService : SubscriptionManager<IMyEvents>,IMySubscriptionService
{}
   MySubscriptionService的实现无需任何代码,因为IMySubscriptionService接口不会添加任何新的操作,而SubscriptionManager<T>已经实现了ISubscriptionService接口的方法。

   注意,仅仅继承SubscriptionManager<IMyEvents>是不够的,因为它没有派生自契约接口。我们必须添加对IMySubscriptionService的继承,才可以支持临时订阅。

   最后,应用程序需要定义IMySubscriptionService的终结点:

<services> 
<service name = "MySubscriptionService">
<endpoint
address = "..."
binding = "..."
contract = "IMySubscriptionService"
/>
</service>
</services>
   例B-2演示了SubscriptionManager<T>管理临时订阅的方式。

public abstract class SubscriptionManager<T> where T : class 
{
static Dictionary<string,List<T>> m_TransientStore;

static SubscriptionManager( )
{
m_TransientStore = new Dictionary<string,List<T>>( );
string[] methods = GetOperations( );
Action<string> insert = delegate(string methodName)
{
m_TransientStore.Add(methodName,new List<T>( ));
};
Array.ForEach(methods,insert);
}
static string[] GetOperations( )
{
MethodInfo[] methods = typeof(T).GetMethods(BindingFlags.Public|
BindingFlags.FlattenHierarchy|
BindingFlags.Instance);
List<string> operations = new List<string>(methods.Length);

Action<MethodInfo> add = delegate(MethodInfo method)
{
Debug.Assert(!operations.Contains(method.Name));
operations.Add(method.Name);
};
Array.ForEach(methods,add);
return operations.ToArray( );
}
static void AddTransient(T subscriber,string eventOperation)
{
lock(typeof(SubscriptionManager<T>))
{
List<T> list = m_TransientStore[eventOperation];
if(list.Contains(subscriber))
{
return;
}
list.Add(subscriber);
}
}
static void RemoveTransient(T subscriber,string eventOperation)
{
lock(typeof(SubscriptionManager<T>))
{
List<T> list = m_TransientStore[eventOperation];
list.Remove(subscriber);
}
}

public void Subscribe(string eventOperation)
{
lock(typeof(SubscriptionManager<T>))
{
T subscriber = OperationContext.Current.GetCallbackChannel<T>( );
if(String.IsNullOrEmpty(eventOperation) == false)
{
AddTransient(subscriber,eventOperation);
}
else
{
string[] methods = GetOperations( );
Action<string> addTransient = delegate(string methodName)
{
AddTransient(subscriber,methodName);
};
Array.ForEach(methods,addTransient);
}
}
}

public void Unsubscribe(string eventOperation)
{
lock(typeof(SubscriptionManager<T>))
{
T subscriber = OperationContext.Current.GetCallbackChannel<T>( );
if(String.IsNullOrEmpty(eventOperation) == false)
{
RemoveTransient(subscriber,eventOperation);
}
else
{
string[] methods = GetOperations( );
Action<string> removeTransient = delegate(string methodName)
{
RemoveTransient(subscriber,methodName);
};
Array.ForEach(methods,removeTransient);
}
}
}
//更多成员
}



 
   SubscriptionManager<T>在泛型的静态字典对象m_TransientStore中存储了临时订阅者:

static Dictionary<string,List<T>> m_TransientStore;
    字典中包含了事件操作名以及以链表形式组成的所有订阅者。SubscriptionManager<T>的静态构造函数使用反射获得了回调契约(SubscriptionManager<T>的类型参数)的所有操作,并初始化了字典对象,让所有的操作都包含了一个空的链表。Subscribe()方法从操作调用的上下文中抽取出了回调引用。如果调用指定了一个操作名,Subscribe()方法会调用辅助方法AddTransient()。AddTransient()从存储中获取了事件的订阅者列表。如果列表没有包含该订阅者,则添加它。

   如果调用者为操作名指定了空字符串或者null,Subsribe()方法则在回调契约中为每个操作调用AddTransient()方法。

   Unsubscribe()方法的执行方式相似。注意,调用者能够订阅所有事件,然后取消其中一个的订阅。

管理持久订阅者

   我所定义的IPersistentSubscriptionService接口可以管理持久订阅者,定义如例B-3所示。

   例B-3 管理持久订阅者的IPersistentSubscriptionService接口

[ServiceContract] 
public interface IPersistentSubscriptionService
{
[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
void Subscribe(string address,string eventsContract,string eventOperation);

[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
void Unsubscribe(string address,string eventsContract,string eventOperation);
//更多成员
}
   调用者若要添加一个持久订阅者,需要调用Subscribe()方法,调用时需要提供订阅者的地址、事件的契约名以及指定的事件操作自身。若要取消订阅,则可以提供相同的信息,然后调用Unsubscribe()方法。注意,IPersistentSubscriptionService接口并没有指定订阅者持久化在服务端的哪个地方,因为这属于实现细节。

   之前介绍的类SubscriptionManager<T>同样可以实现IPersistentSubscriptionService接口的方法:

[BindingRequirement(TransactionFlowEnabled = true)] 
public abstract class SubscriptionManager<T> where T : class
{
public void Unsubscribe(string address,string eventsContract,
string eventOperation);
public void Subscribe(string address,string eventsContract,
string eventOperation);
//更多成员
}
   SubscriptionManager<T>在SQL Server中存储了持久订阅者。它的配置使用了Client/Service事务模式(参见第7章的内容),它要求该模式使用我编写的BindingRequirement特性。

   SubscriptionManager<T>的泛型类型参数为事件契约。注意,SubscriptionManager<T>并没有继承IPersistentSubscriptionService接口。应用程序需要公开它自己的持久订阅服务,但是不需要继承IPersistentSubscriptionService的新契约,因为它不需要回调引用。应用程序只需要继承SubscriptionManager<T>,并将事件契约指定为类型参数,同时继承IPersistentSubscriptionService接口。例如:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] 
class MySubscriptionService : SubscriptionManager<IMyEvents>,
IPersistentSubscriptionService
{}
   MySubscriptionService类的实现无需编写任何代码,因为SubscriptionManager<T>已经实现了IPersistentSubscriptionService接口的方法。

   注意,仅仅继承SubscriptionManager<IMyEvents>是不够的,因为它没有继承一个契约接口。我们必须添加对IPersistentSubscriptionService的实现,才可以支持持久订阅。

   最后,应用程序需要定义一个IPersistentSubscriptionService的终结点:

<services> 
<service name = "MySubscriptionService">
<endpoint
address = "..."
binding = "..."
contract = "IPersistentSubscriptionService"
/>
</service>
</services>

    IPersistentSubscriptionService接口的方法通过SubscriptionManager<T>实现,如例B-4所示。例B-4与例B-2非常相似,但订阅者是被存储到SQL Server中,而不是存储在内存的字典对象中。

   例B-4: 在SubscriptionManager<T>中管理持久订阅者
public abstract class SubscriptionManager<T> where T : class 
{
static void AddPersistent(string address,string eventsContract,
string eventOperation)
{
//使用ADO.NET在 SQL Server中存储订阅
}

static void RemovePersistent(string address,string eventsContract,
string eventOperation)
{
//使用ADO.NET将订阅从SQL Server中移除
}

[OperationBehavior(TransactionScopeRequired = true)]
public void Unsubscribe(string address,string eventsContract,
string eventOperation)
{
if(String.IsNullOrEmpty(eventOperation) == false)
{
RemovePersistent(address,eventsContract,eventOperation);
}
else
{
string[] methods = GetOperations( );
Action<string> removePersistent = delegate(string methodName)
{
RemovePersistent(address,eventsContract,methodName);
};
Array.ForEach(methods,removePersistent);
}
}
[OperationBehavior(TransactionScopeRequired = true)]
public void Subscribe(string address,string eventsContract,
string eventOperation)
{
if(String.IsNullOrEmpty(eventOperation) == false)
{
AddPersistent(address,eventsContract,eventOperation);
}
else
{
string[] methods = GetOperations( );
Action<string> addPersistent = delegate(string methodName)
{
AddPersistent(address,eventsContract,methodName);
};
Array.ForEach(methods,addPersistent);
}
}
//更多成员
}

   如果针对相同的事件契约,应用程序需要同时支持临时订阅者和持久订阅者,可以让订阅服务类直接继承ISubscriptionService接口的子接口,以及IPersistentSubscriptionService接口:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] 
class MySubscriptionService : SubscriptionManager<IMyEvents>,
IMySubscriptionService,IPersistentSubscriptionService
{}
   同时公开两个与之匹配的终结点:

<services> 
<service name = "MySubscriptionService">
<endpoint
address = "..."
binding = "..."
contract = "IMySubscriptionService"
/>
<endpoint
address = "..."
binding = "..."
contract = "IPersistentSubscriptionService"
/>
</service>
</services>
(未完待续)
0
相关文章