实现 Scheduler,如清单 5 所示:
清单 5. MQ_Scheduler
class MQ_Scheduler {
public:
// Initialize the <Activation_List> and make <MQ_Scheduler>
// run in its own thread of control.
// we call this thread as Active Object thread.
MQ_Scheduler ()
: act_list_() {
// Spawn separate thread to dispatch method requests.
// The following call is leveraging the parallelism available on native OS
// transparently
Thread_Manager::instance ()->spawn (&svc_run,this);
}
// ... Other constructors/destructors, etc.
// Put <Method_Request> into <Activation_List>. This
// method runs in the thread of its client,i.e.
// in the proxy's thread.
void insert (Method_Request *mr) {
act_list_.insert (mr);
}
// Dispatch the method requests on their servant
// in its scheduler's thread of control.
virtual void dispatch () {
// Iterate continuously in a separate thread(Active Object thread).
for (;;) {
Activation_List::iterator request;
// The iterator's <begin> method blocks
// when the <Activation_List> is empty.
for(request = act_list_.begin (); request != act_list_.end ();++request){
// Select a method request whose
// guard evaluates to true.
if ((*request).can_run ()) {
// Take <request> off the list.
act_list_.remove (*request);
(*request).call () ;
delete *request;
}
// Other scheduling activities can go here,
// e.g., to handle when no <Method_Request>s
// in the <Activation_List> have <can_run>
// methods that evaluate to true.
}
}
}
private:
// List of pending Method_Requests.
Activation_List act_list_;
// Entry point into the new thread.
static void *svc_run (void *arg) {
MQ_Scheduler *this_obj = static_cast<MQ_Scheduler *> (args);
this_obj->dispatch ();
}
};
public:
// Initialize the <Activation_List> and make <MQ_Scheduler>
// run in its own thread of control.
// we call this thread as Active Object thread.
MQ_Scheduler ()
: act_list_() {
// Spawn separate thread to dispatch method requests.
// The following call is leveraging the parallelism available on native OS
// transparently
Thread_Manager::instance ()->spawn (&svc_run,this);
}
// ... Other constructors/destructors, etc.
// Put <Method_Request> into <Activation_List>. This
// method runs in the thread of its client,i.e.
// in the proxy's thread.
void insert (Method_Request *mr) {
act_list_.insert (mr);
}
// Dispatch the method requests on their servant
// in its scheduler's thread of control.
virtual void dispatch () {
// Iterate continuously in a separate thread(Active Object thread).
for (;;) {
Activation_List::iterator request;
// The iterator's <begin> method blocks
// when the <Activation_List> is empty.
for(request = act_list_.begin (); request != act_list_.end ();++request){
// Select a method request whose
// guard evaluates to true.
if ((*request).can_run ()) {
// Take <request> off the list.
act_list_.remove (*request);
(*request).call () ;
delete *request;
}
// Other scheduling activities can go here,
// e.g., to handle when no <Method_Request>s
// in the <Activation_List> have <can_run>
// methods that evaluate to true.
}
}
}
private:
// List of pending Method_Requests.
Activation_List act_list_;
// Entry point into the new thread.
static void *svc_run (void *arg) {
MQ_Scheduler *this_obj = static_cast<MQ_Scheduler *> (args);
this_obj->dispatch ();
}
};
实现 Future,如清单 6 所示:
清单 6. Message_Future
class Message_Future {
public:
// Initializes <Message_Future> to
// point to <message> immediately.
Message_Future (const Message &message);
//Other implementatio……
// Block upto <timeout> time waiting to obtain result
// of an asynchronous method invocation. Throws
// <System_Ex> exception if <timeout> expires.
Message result (Time_Value *timeout = 0) const;
private:
//members definition here……
};
public:
// Initializes <Message_Future> to
// point to <message> immediately.
Message_Future (const Message &message);
//Other implementatio……
// Block upto <timeout> time waiting to obtain result
// of an asynchronous method invocation. Throws
// <System_Ex> exception if <timeout> expires.
Message result (Time_Value *timeout = 0) const;
private:
//members definition here……
};
事实上,对于调用者来说,可以通过以下的方式从 Future 对象获得真实的执行结果 Message:
同步等待。调用者调用 Future 对象的 result() 方法同步等待,直到后端的 Servant 相应方法执行结束,并把结果存储到了 Future 对象中来,result 返回,调用者获得 Message。
同步超时等待。调用者调用 Future 对象的 result(timeout) 方法。如果过了 timeout 时间之后,后端的 Servant 相应方法执行仍未结束,则调用失败,否则,调用者线程被唤醒,result 方法返回,调用者获得 Message。
异步查询。调用者可以通过调用 Future 对象定义的查询方法 ( 清单 6 没有提供相应的定义 ),查看真实的结果是否准备好了,如果准备好了,调用 result 方法,直接获得 Message。
清单 7 是使用该 Active Object 的示例。
清单 7. Active Object 使用
MQ_Proxy message_queue;
//Optioin 1. Obtain future and block thread until message arrives.
Message_Future future = message_queue.get();
Message msg = future.result();
//Handle received message here
handle(msg);
//2. Obtain a future (does not block the client).
Message_Future future = message_queue.get ();
//The current thread is not blocked, do something else here...
//Evaluate future and block if result is not available.
Message msg = future.result ();
//Handle received message here
handle(msg);
//Optioin 1. Obtain future and block thread until message arrives.
Message_Future future = message_queue.get();
Message msg = future.result();
//Handle received message here
handle(msg);
//2. Obtain a future (does not block the client).
Message_Future future = message_queue.get ();
//The current thread is not blocked, do something else here...
//Evaluate future and block if result is not available.
Message msg = future.result ();
//Handle received message here
handle(msg);
从清单 7 可以看到,MQ_Proxy 对于调用者而言,和一个普通的 C++ 定义的对象并没有区别,并发的实现细节已经被隐藏。