技术开发 频道

浅析Web Service模式中的Service Observer

【IT168 管理】随着SOA化,服务对象间经常出现一对多的依赖关系,当某个Source服务发现出现新的情况下,常常需要同时通知到多个既定目标服务(Target服务)。另外,如果必要的话,Source服务还需要通知Target服务进行某些更新。

问题 

    Web Service作为一个彻底的分布式系统,按照软件职能的需要常常被分割为多个服务,但其中经常存在一些对象,它们的内容对其他服务而言是参考信息(比如新的汇率、不良的信用记录等),其他服务如果总是通过分布式调用轮循访问这些信息,从效率上、服务设计上都不恰当,这时候,Source服务最好可以在相关事件发生的时候根据需要通知到其他Target服务,告知相关的参考信息,保持多个Target服务中相关信息的一致性。 

    下面按照设计模式中Observer模式的称呼,我们把Source服务称为Subject,将Target服务称为Observer。 

    上面仅仅介绍了一种单向的模式,也就是Subject自身发生变化后,把信息“推”到每个Observer的情况,但在实际企业环境中很少有这种“无中生有”的系统,实际情况往往是这个Subject本身还会偶尔接收某个Observer的输入,更新某些信息后,再告诉其他Observer。 

    还有一个问题:Subject之前是怎么知道它要通知的各个Observer呢?注册就好了么?对于一个SOA环境而言,服务的注册一般由Service Registry/Repository完成,这样重复建设等于“坏了规矩”。那么Subject应该怎么做呢?在Subject一方定义一个对Observer服务依赖反转的Observer Service Registry Stub,与集中的Service Registry/Repository不同的是,这个Registry Stub仅仅定义可以预订自己内容的Observer,虽然增加了Stub但从定义上讲Subject他还是自治的。

资源争用

    从资源使用的角度看,一般Subject对修改是限制的(每个资源一次仅允许一个Observer修改),对查询是开放的,所以在可写Subject模式下,随着Observer的增加,Subject中具体资源的“读/写”比为N:1(N为Observer数量)。为了有效控制这个并发过程,需要增加一个适合的锁——多读单写锁(ReaderWriterLock)。同时从Subject向Observer发送通知的角度看,由于这种Web Service调用效率上相对于本地内存调用存在很大的差别,如果不加控制很容易在接手端造成通知到达次序颠倒的情况。如果应用逻辑中对这个颠倒不敏感,那么Subject就可以借助Stub中登记的Observer,尽最大可能发送通知。但如果应用上下文,严重依赖这个次序,那么需要在Subject发送通知前增加一个Gateway,确保发送端、接收端次序保持一致。

数据内容的筛选 

    上文的讨论中回避了一个问题,就是信息内容本身,一旦出现更新的时候是Subject把某块信息全部推到Observer,还是仅仅把更新后的内容推下去。这里有两个因素需要考虑:

1、 相对于进程内的内存引用交换,Web Service算是非常昂贵的调用,不仅仅有数据准备,还需要进行XML数据的序列化、反序列化,传输过程中还会被HTTP和TCP进行反复包装,因此如果更新内容仅仅占整个数据内容很小一部分的时候,应该对发送的消息进行筛选。

2、 Web Service中交互的数据本身是XML数据,它具有自身描述性,因此可以很容易通过XPath或者XQuery在Subject一端进行抽取,并在Observer进行更新(一般这个过程可以称之为两个XML数据的Merge)。 

    由于实现相对简单,但对于减少Web Service数据量比较有效,因此考虑增加一个数据Assembly,用于数据实际更新前选择有必要更新的数据。

耦合关系分析 

    虽然Subject需要同时和多个Observer交互,但因为是每个Observer自己向Subject注册,因此在Subject一端看到的都是抽象的IObserver接口,而不包括实体Observer类型,因此Subject与每个Observer之间没有明显的耦合关系。在Observer一方,实际上很多时候并不知道具体的Subject是什么,如果让每个Observer与实体Subject耦合,设计上也相对比较“密”,不妨将相关工作交给一个统一的根据配置文件动态组装的Assembly,专门负责将某个Observer 绑顶到具体的Subject上。

组合关系

    使用上Subject往往仅针对某个具体内容,而Observer往往关心的不只一个Subject所能提供的内容,一种可以通过改造上面说的Observer Assembly,同时把Observer装配到不同的Subject上,另一种可以在保持每个Observer仅仅接收单一Subject通知的前提下,为Observer增加组合关系。


最基本的Subject-Observer关系

说明:

•与经典的Subject不同,为了简化具体每个ConcreateSubject、ConcreateObserver的实现工作量,增加了SubjectBase和ObserverBase两个抽象基类。

•在Subject实际发送通知前,需要每个Observer把自己通过Attach方法注册到Subject中,Subject根据State的变化,对Observer作组播。

•但这个结构仅仅适合进程内调用,在跨进程、跨网络、甚至是Web Service调用中需要增加一些角色,完成分布式的调用。

示例

///C# Observer namespace VisionTask.Training.ServicePattern.ServiceObserver { /// <summary> /// 抽象Observer接口 /// </summary> public interface IObserver<T> { T Data { get;} void Update(T data); } public abstract class ObserverBase<T> : IObserver<T> { protected T data; public virtual void Update(T data) { this.data = data; } public virtual T Data { get { return data; } } } } ///C# Subject using System; using System.Collections.Generic; namespace VisionTask.Training.ServicePattern.ServiceObserver { /// <summary> /// 抽象Subject接口 /// </summary> public interface ISubject<T> { IList<IObserver<T>> Observers { get;} void Attach(IObserver<T> observer); void Detach(IObserver<T> observer); T State { get;set;} void Notify(); } /// <summary> /// 为了简化实体Subject开发量增加的抽象基类 /// </summary> public abstract class SubjectBase<T> : ISubject<T> { protected IList<IObserver<T>> observers = new List<IObserver<T>>(); protected T state; /// <summary> /// 当前Subject的数据内容 /// </summary> public virtual T State { get { return state; } set { state = value; Notify(); } } /// <summary> /// 所有已经注册到该Subject的Observer列表 /// </summary> public virtual IList<IObserver<T>> Observers { get { return observers; } } /// <summary> /// 注册一个Observer实体 /// </summary> /// <param name="observer"></param> public virtual void Attach(IObserver<T> observer) { if (observers == null) throw new ArgumentNullException("observer"); if (observers.Contains(observer)) throw new DuplicateWaitObjectException("observer"); observers.Add(observer); } /// <summary> /// 反注册一个Observer实体 /// </summary> /// <param name="observer"></param> public virtual void Detach(IObserver<T> observer) { if((observer == null) || (!observers.Contains(observer))) return; observers.Remove(observer); } /// <summary> /// 通知每个Observer最新的State信息 /// </summary> public virtual void Notify() { if (observers.Count == 0) return; foreach (IObserver<T> observer in observers) observer.Update(State); } } } ///C# 测试用的实体Observer(s)和Subject class DoubleDataObserver : ObserverBase<int> { public override int Data{get{return data * 2;}} } class DataPlusOneObserver : ObserverBase<int> { public override int Data{get{return data + 1;}} } class DataSubject : SubjectBase<int> { } ///C# UnitTest using System; using System.Collections.Generic; using Microsoft.VisualStudio.TestTools.UnitTesting; using VisionTask.Training.ServicePattern.ServiceObserver; namespace VisionTask.Training.ServicePattern.UtilityService.UnitTest.InProcessObserver { [TestClass] public class TestServiceObserver { [TestMethod] public void Test() { ISubject<int> subject = new DataSubject(); IObserver<int> observer1 = new DoubleDataObserver(); IObserver<int> observer2 = new DataPlusOneObserver(); subject.Attach(observer1); subject.Attach(observer2); subject.State = 20; Assert.AreEqual<int>(20 * 2, observer1.Data); Assert.AreEqual<int>(20 + 1, observer2.Data); } } }

用于Web Service调用的Subject-Observer关系

设计

1、 限制Subject.State:由于Web Service调用中,消息需要被XML序列化,然后保存在SOAP消息中,因此不是什么类型都可以用于这个调用的,必须为他“贴”上一个SerializableAttribute的标签或者验证它的类型是否Type.IsSerializable?;

2、 将Subject的Attach、Detach暴露为WSDL接口,同时Observer的Update也暴露为WSDL接口;

3、 在Subject服务端增加一个指向各个Observer的Stub类,用于调用他们的Update方法,同时在涉及到会更新Subject内容的Observer端增加一个透明的Proxy类,用户修改Subject.State;(一般情况下,这些工作可以通过开发工具自动完成);

修改后的结构如下(为了简化,同时考虑到System.Web.Services.WebService已经占用了基类的位置,因此把两个抽象类去掉):

1、定义Web Service版本Subject和Observer接口

说明:

•与经典方式下进程内的Subject-Observer不同,由于调用会通过无状态的Web Service完成,因此只能使用方法而不能使用属性Property。

•由于抽象的IWebServiceObserver接口为引用对象,无法通过调用常规的Attach方法和Detach方法进行注册,需要借助Stub间接完成,为了保证所有的参数都可以被序列化,这里为每个Observer对象增加了逻辑名称,在Web Service接口层面Attach和Detach仅针对逻辑名称操作,而并非直接使用具体Observer的引用实例。

•所有的泛型参数<T>必须为可以序列化的类型。

///C# IWebServiceSubject<T> using System.Collections.Generic; namespace VisionTask.Training.ServicePattern.UtilityService { /// <summary> /// 抽象Web Service Subject接口 /// </summary> /// <remarks> /// 为了让IWebServiceSubject仅仅依赖于抽象的IWebServiceObserver, /// 但又确保实体WebServiceObserver可以被注册,采用变通的方法, /// WebServiceSubject中仅仅登记每个WebServiceObserver的名称,具体调度 /// 哪个WebServiceObserver实体则是通过访问Stub获得。 /// </remarks> public interface IWebServiceSubject<T> { string[] GetObserverNames(); void Attach(string observerName); void Detach(string observerName); void SetState(T state); T GetState(); void Notify(); } } ///C# IWebServiceObserver<T>  using System; using System.Runtime.Serialization; namespace VisionTask.Training.ServicePattern.UtilityService { /// <summary> /// 抽象Web Service Observer接口 /// </summary> public interface IWebServiceObserver<T>  { /// <summary> /// 每个Observer对象的逻辑名称 /// </summary> string Name { get;} T GetData(); void Update(T data); } }


2、为所有Observer Web Service准备抽象基类

///C# ObserverServiceBase<T> using System; using System.Collections.Generic; using System.Web.Services; using System.Runtime.Serialization; namespace VisionTask.Training.ServicePattern.UtilityService { public abstract class ObserverServiceBase<T> : WebService, IWebServiceObserver<T> { static ObserverServiceBase() { // 判断提供的泛型参数是否可以序列化 if (!typeof(T).IsSerializable) throw new InvalidOperationException(); } public abstract void Update(T data); public abstract T GetData(); public abstract string Name { get;} } }


3、实现两个Observer Web Service

///C# DoubleDataObserverService using System; using System.Web.Services; namespace VisionTask.Training.ServicePattern.UtilityService { [Serializable] [WebService] public class DoubleDataObserverService : ObserverServiceBase<int> { private static int current = 0; [WebMethod] public override void Update(int data) { current = data; } [WebMethod] public override int GetData() { return current * 2; } public override string Name { get { return "DoubleData"; } } } } ///C# PlusOneObserverService using System; using System.Web.Services; namespace VisionTask.Training.ServicePattern.UtilityService { [Serializable] [WebService] public class PlusOneObserverService : ObserverServiceBase<int> { private static int current; [WebMethod] public override void Update(int data) { current = data; } [WebMethod] public override int GetData() { return current + 1; } public override string Name { get { return "PlusOne"; } } } }

 

4、定义Subject Web Service 的Stub类型

说明:

•通过在Subject Web Service项目中引用每个Observer Web Service,由开发工具自动生成对应的Observer Web Service Proxy类;(实际项目中也可以手工生成Proxy类)

•Stub实际登记各个Observer Web Service的Proxy类型,允许Subject Web Service按照Observer的名称访问具体Proxy类的实例;

•为了便于Subject在通知过程中遍历每个Observer Web Service的Proxy实例,为Stub增加了迭代器;

///C# Stub #region Using using System; using System.Collections.Generic; using VisionTask.Training.ServicePattern.UtilityService.DoubleDataObserver; using VisionTask.Training.ServicePattern.UtilityService.PlusOneObserver; #endregion namespace VisionTask.Training.ServicePattern.UtilityService { /// <summary> /// Subject Web Service 对象用于访问其他Observer Web Service的Stub类 /// </summary> internal class SubjectStub { private static IDictionary<string, IWebServiceObserver<int>> observers; /// <summary> /// 初始化过程加载每个Observer的Proxy信息 /// </summary> static SubjectStub() { observers = new Dictionary<string, IWebServiceObserver<int>>(); IWebServiceObserver<int> observer = new DoubleDataObserverService(); observers.Add(observer.Name, observer); observer = new PlusOneObserverService(); observers.Add(observer.Name, observer); } /// <summary> /// 根据Observer的名称,提出相关Observer Web Service的Proxy实例; /// </summary> /// <param name="name">Observer的逻辑名称</param> public IWebServiceObserver<int> this[string name] { get { if(string.IsNullOrEmpty(name)) throw new ArgumentNullException("name"); IWebServiceObserver<int> observer; if(observers.TryGetValue(name, out observer)) return observer; else return null; } } /// <summary> /// 为了便于Subject Web Service,Stub采用Iterator模式对外提供每个注册 /// 好的Observer Web Server Proxy的引用。 /// </summary> /// <returns></returns> public IEnumerable<IWebServiceObserver<int>> GetObservers() { foreach (IWebServiceObserver<int> observer in observers.Values) yield return observer; } } }

 

5、定义Subject Web Service

///C# using System; using System.Collections.Generic; using System.Web.Services; namespace VisionTask.Training.ServicePattern.UtilityService { [WebService] public class SubjectService : WebService, IWebServiceSubject<int> { /// <summary> /// 登记所有已经被Stub注册的Observer Web Service逻辑名称. /// </summary> private static IList<string> observerNames = new List<string>(); /// <summary> /// Subject Web Service保存的当前信息 /// </summary> private static int current = 0; /// <summary> /// 获取所有已经登记的Observer Web Service逻辑名称 /// </summary> [WebMethod] public string[] GetObserverNames() { if(observerNames.Count == 0) return null; string[] result = new string[observerNames.Count]; observerNames.CopyTo(result, 0); return result; } [WebMethod] public void Attach(string observerName) { if (string.IsNullOrEmpty(observerName) || (observerNames.Contains(observerName))) return; observerNames.Add(observerName); } [WebMethod] public void Detach(string observerName) { if (string.IsNullOrEmpty(observerName) || (!observerNames.Contains(observerName))) return; observerNames.Remove(observerName); } [WebMethod] public void SetState(int state) { current = state; Notify(); } [WebMethod] public int GetState() { return current; } /// <summary> /// 该方法在Web Service端自动执行,不需要Web Service Client显式调用, /// 所以不用声明为[WebMethod] /// </summary> public void Notify() { foreach (IWebServiceObserver<int> observer in (new SubjectStub()).GetObservers()) observer.Update(current); } } }

 

6、单元测试验证

///C# UnitTest ///首先,将上述3个Web Service在单元测试项目中引用,分别命名为: DoubleDataObserverService PlusOneObserverService SubjectService using System; using Microsoft.VisualStudio.TestTools.UnitTesting; using VisionTask.Training.ServicePattern.UtilityService; namespace VisionTask.Training.ServicePattern.UtilityService.UnitTest { [TestClass] public class ServiceObserverTest { [TestMethod] public void Test() { // 创建对个Web Service的本地实例 // 1个Subject Web service 和2个Observer Web service。 SubjectService.SubjectService subject = new SubjectService.SubjectService(); PlusOneObserverService.PlusOneObserverService plusOneObserver = new PlusOneObserverService.PlusOneObserverService(); DoubleDataObserverService.DoubleDataObserverService doubleDataObserver = new DoubleDataObserverService.DoubleDataObserverService(); // 注册各Observer subject.Attach("PlusOne"); subject.Attach("DoubleData"); // 更新Subject信息 subject.SetState(10); Assert.AreEqual<int>(10, subject.GetState()); // 检查Subject对各个Observer的通知情况 Assert.AreEqual<int>(11, plusOneObserver.GetData()); Assert.AreEqual<int>(20, doubleDataObserver.GetData()); } } }

控制资源争用

    按照上面的讨论,为了控制Observer对Subject内容的更新和读取,需要在Subject的SetState部分增加并发控制,示例如下:

///C# ///修改前: [WebMethod] public void SetState(int state) { current = state; Notify(); } ///修改后: private static ReaderWriterLock readerWriterLock = new ReaderWriterLock(); private const int LockTimeout = 2000; private object lockRoot = new object(); [WebMethod] public void SetState(int state) { readerWriterLock.AcquireWriterLock(LockTimeout); try { current = state; Notify(); } finally { readerWriterLock.ReleaseWriterLock(); } }


控制耦合关系

    上面的分析中虽然把Subject与具体Observer Web Service Proxy的耦合关系解开,但等于把问题转嫁到Subject Web Service Stub上,为了进一步解耦,考虑通过配置机制,动态装载具体Observer Web Service Proxy的类型信息,示例如下:

说明:

    通过增加一个新的NamedObserverCollection完成配置信息的对象化,为了便于Stub的使用,它需要实现IEnumerable、ICollection两个接口。实际使用中可以考虑通过继承System.Collections.Specialized. NameValueCollection实现。

    (本文的附件部分有一个如何通过访问配置文件,根据抽象类型动态加载实体类型实例的示范。)

///App.Config <?xml version="1.0" encoding="utf-8" ?> <configuration> <configuration> <configSections> <section name ="ServiceObserver" type="System.Configuration.NameValueSectionHandler, System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"/> </configSections> <ServiceObserver> <add key="PlusOne" value="VisionTask.Training.ServicePattern.UtilityService.DoubleDataObserver. DoubleDataObserverService, UtilityService"/> <add key="DoubleData" value="VisionTask.Training.ServicePattern.UtilityService.PlusOneObserver. PlusOneObserverService, UtilityService"/> </ServiceObserver> </configuration> </configuration>
小结

    通过上面的示例不难看出Web Service中的Service Observer模式可以比较好的解决1:N的信息更新和数据一致性问题,但由于Web Service的分布式特性,实现上需要在经典的Observer模式上增加专门的机制来间接完成SOAP消息的传递和转发。同时,部署上必须充分考虑Subject与各个Observer的物理位置、进程关系,否则会出现较大的性能问题。


相关文章:

《浅析Web Service数据转换器对象》,
http://tech.it168.com/m/2007-08-15/200708150951837.shtml


《浅析Web Service适配器》
http://tech.it168.com/m/2007-08-07/200708070946609.shtml


《如何实现Web Service设计与整合模式?》
http://tech.it168.com/m/2007-07-23/200707231447484.shtml

附件

    通过访问配置文件,根据抽象类型动态加载实体类型实例 :

///C# 测试用的抽象类型 interface IPerson { } ///C# 测试用的实体类型 class Student : IPerson { } class Employee : IPerson { } ///App.Config ///在配置文件中增加独立的配置节,并登记相关的实体类型信息 QualifiedTypeName方式: Quanlified Class Name, Assembly Name <configuration> <configSections> <section name ="DemoSection" type="System.Configuration.NameValueSectionHandler,
System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089
"/> </configSections> <DemoSection> <add key="student" value="VisionTask.Training.ServicePattern.ConfigurationDynamics.
UnitTest.Student, ConfigurationDynamics.UnitTest
"/> <add key="employee" value="VisionTask.Training.ServicePattern.ConfigurationDynamics.
UnitTest.Employee, ConfigurationDynamics.UnitTest
"/> </DemoSection> </configuration> ///C# UnitTest using System; using System.Collections.Generic; using System.Collections.Specialized; using System.Configuration; using System.Diagnostics; using Microsoft.VisualStudio.TestTools.UnitTesting; namespace VisionTask.Training.ServicePattern.ConfigurationDynamics.UnitTest { [TestClass] public class TestLoadConfiguration { [TestMethod] public void Test() { NameValueCollection items = (NameValueCollection)(ConfigurationManager.GetSection("DemoSection")); Assert.IsNotNull(items); IPerson person; foreach (string key in items.Keys) { string typeName = items[key]; person = (IPerson)(Activator.CreateInstance(Type.GetType(typeName))); } } } }

 

0
相关文章