实现
在本节中,我们给出 Active Object 的 C++ 示例实现。
调用者调用 Proxy 的 get() 方法,从 Active Object 获得 Message。我们可以假定,在真实的应用中, get() 方法的实现受制于某些慢速的 IO 操作,比如需要通过 TCP Socket 从远端的机器获得 Message, 然后返回给调用者。所以我们使用 Active Object 来实现该应用,通过线程的并发达到提高应用的 QoS。
1.实现 Servant,如清单 1 所示:
清单 1. MQ_Servant
class MQ_Servant {
public:
// Constructor and destructor.
MQ_Servant (size_t mq_size);
virtual ~MQ_Servant ();
// Message queue implementation operations.
void put (const Message &msg);
Message get ();
// Predicates.
bool empty () const;
bool full () const;
private:
// Internal queue representation, e.g., a circular
// array or a linked list, that does not use any
// internal synchronization mechanism.
};
public:
// Constructor and destructor.
MQ_Servant (size_t mq_size);
virtual ~MQ_Servant ();
// Message queue implementation operations.
void put (const Message &msg);
Message get ();
// Predicates.
bool empty () const;
bool full () const;
private:
// Internal queue representation, e.g., a circular
// array or a linked list, that does not use any
// internal synchronization mechanism.
};
MQ_Servant 是真正的服务提供者,实现了 Proxy 中定义的方法。put() 和 get() 方法用来操作底层的队列。另外,Servant 的实现是纯粹的应用逻辑实现,或者称为商业逻辑实现,没有混合任何的线程同步机制 , 这有利于我们进行应用逻辑的重用,而不需要考虑不同的线程同步机制。
2.实现 Proxy,如清单 2 所示:
清单 2. MQ_Proxy
class MQ_Proxy {
public:
// Bound the message queue size.
enum { MQ_MAX_SIZE = 100 };
MQ_Proxy (size_t size = MQ_MAX_SIZE)
:scheduler_ (size),
servant_ (size) {
}
// Schedule <put> to execute on the active object.
void put (const Message &msg) {
Method_Request *mr = new Put(servant_,msg);
scheduler_.insert (mr);
}
// Return a <Message_Future> as the "future" result of
// an asynchronous <get> method on the active object.
Message_Future get () {
Message_Future result;
Method_Request *mr = new Get (&servant_,result);
scheduler_.insert (mr);
return result;
}
// empty() and full() predicate implementations ...
private:
// The servant that implements the active object
// methods and a scheduler for the message queue.
MQ_Servant servant_;
MQ_Scheduler scheduler_;
};
public:
// Bound the message queue size.
enum { MQ_MAX_SIZE = 100 };
MQ_Proxy (size_t size = MQ_MAX_SIZE)
:scheduler_ (size),
servant_ (size) {
}
// Schedule <put> to execute on the active object.
void put (const Message &msg) {
Method_Request *mr = new Put(servant_,msg);
scheduler_.insert (mr);
}
// Return a <Message_Future> as the "future" result of
// an asynchronous <get> method on the active object.
Message_Future get () {
Message_Future result;
Method_Request *mr = new Get (&servant_,result);
scheduler_.insert (mr);
return result;
}
// empty() and full() predicate implementations ...
private:
// The servant that implements the active object
// methods and a scheduler for the message queue.
MQ_Servant servant_;
MQ_Scheduler scheduler_;
};
同一个进程中的多个调用者线程可以共享同一个 Proxy。