Active Object 并发模式在 Java 中的应用

红薯 发布于 2010/08/08 22:26
阅读 309
收藏 2

本文主要从以下两个方面进行阐述:

  • 使用 C++ 语言,来描述 Active Object 设计模式。

    Java 类库对于这样一个典型的模式做了很好的类库层面的封装,因此对于 Java 的开发者来说,很多关于该设计模式本身的东西被屏蔽掉了。本文试图使用 Native C++ 语言,帮助读者从本质上对 Active Object 设计模式有一个更全面的认识。

  • 结合 C++ 版本的 Active Object 设计模式,引领读者对于 Active Object 设计模式在 Java 类库中的支持,有一个更深刻的认识,帮助读者正确并且有效地使用这些类库。

预备知识

并发对象 (Concurrent Object)

在这里,我们先定义一下,什么是并发对象。不同于一般的对象,并发对象指的是该对象方法的调用与方法的执行不在同一个线程內,也即:该对象方 法被异步执行。这其实是在多线程编程环境下典型的计算特征,线程引入了异步。从另一个角度来看,并发对象其实是面向对象的概念应用于多线程计算环境下的产 物。

Active Object 设计模式 C++ 描述

我们将从以下几个方面来讨论 Active Object 模式。

问题描述

我们都知道,基于多线程机制的并发编程模式可以极大提高应用的 QoS(Quality of Service)。典型的例子如,我们在开发服务器端的应用时,通行的做法就是通过多线程机制并发地服务客户端提交上来的请求,以期提高服务器对客户端的 反应度 (Responsiveness)。同时,相比于单线程的应用,并发的多线程应用相对要复杂得多。在多线程的计算环境里,并发对象被所有的调用者线程所共 享。一般来说,并发对象的设计实现需要考虑下面的几个重要因素:

  • 并发对象的任何一次的方法执行,不允许无限地或者长时间阻止其它方法的调用执行,从而影响应用的 QoS。
  • 由于并发对象被调用者线程所共享,其内部状态必须保证是线程安全的,必须受限于某些线程同步约束,并且这些约束对调用者来说是透明的,不可见的。从调用者的角度来看,并发对象与普通对象没有多大区别。
  • 并发对象的设计实现,应该能够透明地利用底层平台所提供的并发机制。这样做的好处是,当并发对象运行在诸如多核处理器这类底层硬件平台上时,我们的应用能够充分挖掘底层平台带来的并发优势,以获得足够好的应用性能。

我们使用 Active Object 设计模式来解决这些问题。

Active Object 设计模式的本质是解耦合方法的调用 (Method invocation) 与方法的执行 (Method execution),方法调用发生在调用者线程上下文中,而方法的执行发生在独立于调用者线程的 Active Object 线程上下文中。并且重要的一点是,该方法与其它普通的对象成员方法对于调用者来说,没有什么特别的不同。从运行时的角度来看,这里涉及到两类线程,一个是 调用者线程,另外一个是 Active Object 线程,我们会在下面更详细地谈到。

结构

在 Active Object 模式中,主要有以下几种类型的参与者:

  • 代理 (Proxy) :代理是 Active Object 所定义的对于调用者的公共接口。运行时,代理运行在调用者线程的上下文中,负责把调用者的方法调用转换成相应的方法请求 (Method Request),并将其插入相应的 Activation List,最后返回给调用者 Future 对象。
  • 方法请求:方法请求定义了方法执行所需要的上下文信息,诸如调用参数等。
  • Activation List:负责存储所有由代理创建的,等待执行的方法请求。从运行时来看,Activation List 会被包括调用者线程及其 Active Object 线程并发存取访问,所以,Activation List 实现应当是线程安全的。
  • 调度者 (Scheduler):调度者运行在 Active Object 线程中,调度者来决定下一个执行的方法请求,而调度策略可以基于很多种标准,比如根据方法请求被插入的顺序 FIFO 或者 LIFO,比如根据方法请求的优先级等等。
  • Servant: Servant 定义了 Active Object 的行为和状态,它是 Proxy 所定义的接口的事实实现。
  • Future: 调用者调用 Proxy 所定义的方法,获得 Future 对象。调用者可以从该 Future 对象获得方法执行的最终结果。在真实的实现里,Future 对象会保留一个私有的空间,用来存放 Servant 方法执行的结果。

执行序列图

在 Active Object 设计模式中,在参与者之间将发生如下的协作过程:

  1. 方法请求的构造与调度。调用者调用 Proxy 的方法 method(),Proxy 创建相应的方法请求,把它传给调度者 (Scheduler),调度者负责把该方法请求放入 Activation List 中。如果 method() 需要返回执行结果,Proxy 返回一个 Future 对象给调用者(图 1 中步骤 1 到步骤 6)。
  2. 方法请求执行。调度者负责从 Activation List 队列里按照预先定义的规则拿出下一个可执行的方法请求,并把该请求绑定到相应 Servant 所定义的方法(图 1 中步骤 7 到步骤 11)。
  3. 完成阶段。保存任何 Servant 方法执行的结果到 Future 对象中去(图 1 中步骤 12)。重复第二步,调度者继续轮询 Activation List 队列,看是否有下一个可执行的方法请求。


图 1. Active Object Sequence Diagram.
图 1. Active Object Sequence Diagram.

从图 1 我们可以看到,步骤 1 到步骤 6 运行在调用者线程中,而步骤 7 到步骤 12 运行在 Active Object 的线程中。

实现

在本节中,我们给出 Active Object 的 C++ 示例实现。

调用者调用 Proxy 的 get() 方法,从 Active Object 获得 Message。我们可以假定,在真实的应用中, get() 方法的实现受制于某些慢速的 IO 操作,比如需要通过 TCP Socket 从远端的机器获得 Message, 然后返回给调用者。所以我们使用 Active Object 来实现该应用,通过线程的并发达到提高应用的 QoS。

  1. 实现 Servant,如清单 1 所示:

    清单 1. MQ_Servant
    						
    class MQ_Servant {
    public:
    // Constructor and destructor.
    MQ_Servant (size_t mq_size);
    virtual ~MQ_Servant ();

    // Message queue implementation operations.
    void put (const Message &msg);
    Message get ();

    // Predicates.
    bool empty () const;
    bool full () const;
    private:
    // Internal queue representation, e.g., a circular
    // array or a linked list, that does not use any
    // internal synchronization mechanism.
    };

    MQ_Servant 是真正的服务提供者,实现了 Proxy 中定义的方法。put() 和 get() 方法用来操作底层的队列。另外,Servant 的实现是纯粹的应用逻辑实现,或者称为商业逻辑实现,没有混合任何的线程同步机制 , 这有利于我们进行应用逻辑的重用,而不需要考虑不同的线程同步机制。

  2. 实现 Proxy,如清单 2 所示:

    清单 2. MQ_Proxy
    						
    class MQ_Proxy {
    public:
    // Bound the message queue size.
    enum { MQ_MAX_SIZE = 100 };
    MQ_Proxy (size_t size = MQ_MAX_SIZE)
    :scheduler_ (size),
    servant_ (size) {
    }

    // Schedule <put> to execute on the active object.
    void put (const Message &msg) {
    Method_Request *mr = new Put(servant_,msg);
    scheduler_.insert (mr);
    }

    // Return a <Message_Future> as the "future" result of
    // an asynchronous <get> method on the active object.
    Message_Future get () {
    Message_Future result;
    Method_Request *mr = new Get (&servant_,result);
    scheduler_.insert (mr);
    return result;
    }

    // empty() and full() predicate implementations ...
    private:
    // The servant that implements the active object
    // methods and a scheduler for the message queue.
    MQ_Servant servant_;
    MQ_Scheduler scheduler_;
    };

    同一个进程中的多个调用者线程可以共享同一个 Proxy。

  3. 实现 Method Request,如清单 3 所示:

    清单 3. Method_Request
    						
    class Method_Request {
    public:
    // Evaluate the synchronization constraint.
    virtual bool can_run () const = 0
    // Execute the method.
    virtual void call () = 0;
    };
    // Inherites from Method_Request
    class Get : public Method_Request {
    public:
    Get (MQ_Servant *rep, const Message_Future &f)
    :servant_ (rep),
    result_ (f)
    {
    }
    virtual bool can_run () const {
    // Synchronization constraint: cannot call a
    // <get> method until queue is not empty.
    return !servant_->empty ();
    }

    virtual void call () {
    // Bind dequeued message to the future result.
    result_ = servant_->get ();
    }
    private:
    MQ_Servant *servant_;
    Message_Future result_;
    };
  4. 实现 Activation List,如清单 4 所示:

    清单 4. Activation_List
    						
    class Activation_List {
    public:
    // Block for an "infinite" amount of time waiting
    // for <insert> and <remove> methods to complete.
    enum { INFINITE = -1 };

    // Define a "trait".
    typedef Activation_List_Iterator iterator;

    Activation_List ();

    // Insert <method_request> into the list, waiting up
    // to <timeout> amount of time for space to become
    // available in the queue. Throws the <System_Ex>
    // exception if <timeout> expires.
    void insert (Method_Request *method_request,Time_Value *timeout = 0);

    // Remove <method_request> from the list, waiting up
    // to <timeout> amount of time for a <method_request>
    // to be inserted into the list. Throws the
    // <System_Ex> exception if <timeout> expires.
    void remove (Method_Request *&method_request, Time_Value *timeout = 0);

    private:
    // Synchronization mechanisms, e.g., condition
    // variables and mutexes, and the queue implementation,
    // e.g., an array or a linked list, go here.
    };

    Activation List 的实际上就是一个线程同步机制保护下的 Method Request 队列,对该队列的所有操作 (insert/remove) 都应该是线程安全的。从本质上讲,Activation List 所基于的就是典型的生产者 / 消费者并发编程模型,调用者线程作为生产者把 Method Request 放入该队列,Active Object 线程作为消费者从该队列拿出 Method Request, 并执行。

  5. 实现 Scheduler,如清单 5 所示:

    清单 5. MQ_Scheduler
    						
    class MQ_Scheduler {
    public:
    // Initialize the <Activation_List> and make <MQ_Scheduler>
    // run in its own thread of control.
    // we call this thread as Active Object thread.
    MQ_Scheduler ()
    : act_list_() {
    // Spawn separate thread to dispatch method requests.
    // The following call is leveraging the parallelism available on native OS
    // transparently
    Thread_Manager::instance ()->spawn (&svc_run,this);
    }
    // ... Other constructors/destructors, etc.

    // Put <Method_Request> into <Activation_List>. This
    // method runs in the thread of its client,i.e.
    // in the proxy's thread.
    void insert (Method_Request *mr) {
    act_list_.insert (mr);
    }

    // Dispatch the method requests on their servant
    // in its scheduler's thread of control.
    virtual void dispatch () {
    // Iterate continuously in a separate thread(Active Object thread).
    for (;;) {
    Activation_List::iterator request;
    // The iterator's <begin> method blocks
    // when the <Activation_List> is empty.
    for(request = act_list_.begin (); request != act_list_.end ();++request){
    // Select a method request whose
    // guard evaluates to true.
    if ((*request).can_run ()) {
    // Take <request> off the list.
    act_list_.remove (*request);
    (*request).call () ;
    delete *request;
    }

    // Other scheduling activities can go here,
    // e.g., to handle when no <Method_Request>s
    // in the <Activation_List> have <can_run>
    // methods that evaluate to true.

    }

    }
    }

    private:
    // List of pending Method_Requests.
    Activation_List act_list_;

    // Entry point into the new thread.
    static void *svc_run (void *arg) {
    MQ_Scheduler *this_obj = static_cast<MQ_Scheduler *> (args);
    this_obj->dispatch ();
    }
    };
  6. 实现 Future,如清单 6 所示:

    清单 6. Message_Future
    						
    class Message_Future {
    public:
    // Initializes <Message_Future> to
    // point to <message> immediately.
    Message_Future (const Message &message);

    //Other implementatio……

    // Block upto <timeout> time waiting to obtain result
    // of an asynchronous method invocation. Throws
    // <System_Ex> exception if <timeout> expires.
    Message result (Time_Value *timeout = 0) const;
    private:
    //members definition here……
    };

    事实上,对于调用者来说,可以通过以下的方式从 Future 对象获得真实的执行结果 Message:

    • 同步等待。调用者调用 Future 对象的 result() 方法同步等待,直到后端的 Servant 相应方法执行结束,并把结果存储到了 Future 对象中来,result 返回,调用者获得 Message。
    • 同步超时等待。调用者调用 Future 对象的 result(timeout) 方法。如果过了 timeout 时间之后,后端的 Servant 相应方法执行仍未结束,则调用失败,否则,调用者线程被唤醒,result 方法返回,调用者获得 Message。
    • 异步查询。调用者可以通过调用 Future 对象定义的查询方法 ( 清单 6 没有提供相应的定义 ),查看真实的结果是否准备好了,如果准备好了,调用 result 方法,直接获得 Message。

    清单 7 是使用该 Active Object 的示例。



    清单 7. Active Object 使用
    						
    MQ_Proxy message_queue;

    //Optioin 1. Obtain future and block thread until message arrives.
    Message_Future future = message_queue.get();
    Message msg = future.result();
    //Handle received message here
    handle(msg);

    //2. Obtain a future (does not block the client).
    Message_Future future = message_queue.get ();

    //The current thread is not blocked, do something else here...
    //Evaluate future and block if result is not available.
    Message msg = future.result ();
    //Handle received message here
    handle(msg);

    从清单 7 可以看到,MQ_Proxy 对于调用者而言,和一个普通的 C++ 定义的对象并没有区别,并发的实现细节已经被隐藏。

Java 对 Active Object 支持

Java JDK 1.3 引入了 java.util.Timer 和 java.util.TimerTask,提供了对 timer-based 并发任务支持,Timer 和 TimerTask 可以看作是 Active Object 设计模式在 Java 中的实现。不过,在这里我们不打算过多讨论 Timer 及其 TimerTask。由于 Timer 和 TimerTask 的缺陷性,例如 Timer 使用单线程执行 TimerTask 导致的 Task 调度时间的不精确性等问题。从 Java1.5 开始,Java 建议使用 ScheduledThreadPoolExecutor 作为 Timer 的替代。

在这里,我们讨论一下自 Java1.5 引入的 Executor Framework。Java1.5 的 Executor Framework 可以看作是 Active Object 设计模式在 Java 中的体现。不过 Java 的 Executor Framework 极大地简化了我们前面所讨论的 Active Object 所定义的模式。

Java 的 Executor Framework 是一套灵活强大的异步任务执行框架,它提供了标准的方式解耦合任务的提交与任务的执行。Java Executor 框架中的任务指的是实现了 Runnable 或者 Callable 接口的对象。Executor 的示例用法如清单 8 所示:


清单 8. Java Executor 示例代码

				
public class TaskExecutionTcpServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
public void handleRequest(Socket connection) {
// Handle the incoming socket connection from
//individual client.
}
};
exec.execute(task);
}
}
}

 

在示例 8 中,我们创建了一个基于线程池的 Java Executor, 每当新的 TCP 连接进来的时候,我们就分配一个独立的实现了 Runnable 任务来处理该连接,所有这些任务运行在我们创建的有 100 个线程的线程池上。

我们可以从 Active Object 设计模式的角度来审视一下 Java Executor 框架。Java Executor 框架以任务 (Task) 为中心,简化了 Active Object 中的角色分工。可以看到,实现 Runnable 或者 Callable 接口的 Java Executor 任务整合了 Method Request 和 Servant 的角色 , 通过实现 run() 或者 call() 方法实现应用逻辑。Java Executor 框架并没有显式地定义 Proxy 接口,而是直接调用 Executor 提交任务,这里的 Executor 相当于 Active Object 中调度者角色。从调用者的角度来看,这看起来并不像是在调用一个普通对象方法,而是向 Executor 提交了一个任务。所以,在这个层面上说,并发的底层细节已经暴露给了调用者。对于 Java 的开发者来说,如果你不担心这样的底层并发细节直接暴露给调用者,或者说你的应用并不需要像对待普通对象一样对待并发对象,Java 的 Executor 框架是一个很好的选择。相反,如果你希望隐藏这样的并发细节,希望像操纵普通对象一样操纵并发对象,那你就需要如本文上节所描述的那样,遵循 Active Object 设计原则 , 清晰地定义各个角色,实现自己的 Active Object 模式。

总而言之,Java Executor 框架简化了 Active Object 所定义的模式,模糊了 Active Object 中角色的分工,其基于生产者 / 消费者模式,生产者和消费者基于任务相互协作。

总结

最后,我们讨论一下 Active Object 设计模式的优缺点。

Active Object 给我们的应用带来的好处:

  • 极大提高了应用的并发性以及简化了线程同步带来的复杂性。并发性的提高得益于调用者线程与 Active Object 线程的并发执行。简化的线程同步复杂性主要表现在所有线程同步细节封装在调度者内 ( 也就是 Java 的 Executor 对象 ),Active Object 调用者并不需要关心。
  • 在 Active Object 中,方法的执行顺序可以不同于方法的调用顺序。用 Java 的话说,也就是任务执行的顺序可以不同于任务提交的顺序。在一定情况下,这可以帮助优化我们应用的性能,提高应用的 QoS 及其 Responsiveness。在 Java Executor 框架下,你可以根据当前的计算资源,确定优化的执行策略 (Execution Policy),该执行策略的内容包括:任务将分配在多少线程上执行,以什么顺序执行,多少任务可以同时执行等等。

当然,Active Object 也有缺点:

  • 额外的性能开销。这涉及到从调用者线程到 Active Object 线程的上下文切换,线程同步,额外的内存拷贝等。
  • 难于调试。Active Object 引入了方法的异步执行,从调试者的角度看,调试这样的方法调用不像普通方法那样直截了当,并且这其中涉及到了线程的调度,同步等。
加载中
返回顶部
顶部