技术开发 频道

Active Object并发模式在Java 中的应用

  实现 Scheduler,如清单 5 所示:

  清单 5. MQ_Scheduler

class MQ_Scheduler {
public:
    
// Initialize the <Activation_List> and make <MQ_Scheduler>
    
// run in its own thread of control.
        
// we call this thread as Active Object thread.
    MQ_Scheduler ()
    : act_list_() {
        
// Spawn separate thread to dispatch method requests.
        
// The following call is leveraging the parallelism available on native OS
        
// transparently
        Thread_Manager::instance ()
->spawn (&svc_run,this);
    }
    
// ... Other constructors/destructors, etc.

    
// Put <Method_Request> into <Activation_List>. This
    
// method runs in the thread of its client,i.e.
    
// in the proxy's thread.
    void insert (Method_Request *mr) {
        act_list_.insert (mr);
    }

    
// Dispatch the method requests on their servant
    
// in its scheduler's thread of control.
    virtual void dispatch () {
        
// Iterate continuously in a separate thread(Active Object thread).
        
for (;;) {
            Activation_List::iterator request;
            
// The iterator's <begin> method blocks
            // when the <Activation_List> is empty.
            
for(request = act_list_.begin (); request != act_list_.end ();++request){
                
// Select a method request whose
                
// guard evaluates to true.
                
if ((*request).can_run ()) {
                    
// Take <request> off the list.
                    act_list_.remove (
*request);
                    (
*request).call () ;
                    delete
*request;
                }

                
// Other scheduling activities can go here,
                
// e.g., to handle when no <Method_Request>s
                
// in the <Activation_List> have <can_run>
                
// methods that evaluate to true.

            }

        }
    }

private:
    
// List of pending Method_Requests.
    Activation_List act_list_;

    
// Entry point into the new thread.
    static void
*svc_run (void *arg) {
        MQ_Scheduler
*this_obj =    static_cast<MQ_Scheduler *> (args);
        this_obj
->dispatch ();
    }
};

      实现 Future,如清单 6 所示:

  清单 6. Message_Future

class Message_Future {
public:
    
// Initializes <Message_Future> to
    
// point to <message> immediately.
    Message_Future (
const Message &message);

    
//Other implementatio……

    
// Block upto <timeout> time waiting to obtain result
    
// of an asynchronous method invocation. Throws
    
// <System_Ex> exception if <timeout> expires.
    Message result (Time_Value
*timeout = 0) const;
private:
    
//members definition here……
};

  事实上,对于调用者来说,可以通过以下的方式从 Future 对象获得真实的执行结果 Message:

  同步等待。调用者调用 Future 对象的 result() 方法同步等待,直到后端的 Servant 相应方法执行结束,并把结果存储到了 Future 对象中来,result 返回,调用者获得 Message。

  同步超时等待。调用者调用 Future 对象的 result(timeout) 方法。如果过了 timeout 时间之后,后端的 Servant 相应方法执行仍未结束,则调用失败,否则,调用者线程被唤醒,result 方法返回,调用者获得 Message。

  异步查询。调用者可以通过调用 Future 对象定义的查询方法 ( 清单 6 没有提供相应的定义 ),查看真实的结果是否准备好了,如果准备好了,调用 result 方法,直接获得 Message。

  清单 7 是使用该 Active Object 的示例。

  清单 7. Active Object 使用

MQ_Proxy message_queue;

//Optioin 1. Obtain future and block thread until message arrives.
Message_Future future
= message_queue.get();
Message msg
= future.result();
//Handle received message here
handle(msg);

//2. Obtain a future (does not block the client).
Message_Future future
= message_queue.get ();

//The current thread is not blocked, do something else here...
//Evaluate future and block if result is not available.
Message msg
= future.result ();
//Handle received message here
handle(msg);

  从清单 7 可以看到,MQ_Proxy 对于调用者而言,和一个普通的 C++ 定义的对象并没有区别,并发的实现细节已经被隐藏。

0
相关文章