ZeroMQ 替换 HTTP 作为内部服务 已翻译 100%

oschina 投递于 2013/07/23 07:51 (共 18 段, 翻译完成于 08-06)
阅读 7990
收藏 10
0
加载中

本文介绍了如何使用ZeroMQ来进行内部服务的RPC调用。HTTP是面向公众服务的标准选择,但在由许多小部分组成的系统内调用内部RPC时,最好使用ZeroMQ代替HTTP。

总结一下,ZeroMQ较HTTP提供的好处如下:

  • 多个并发的RPC调用可以使用同一个TCP连接,而HTTP需要按顺序调用(保持连接的情况下)。这是最大的一个好处。
  • 没有手动连接管理。HTTP也是可以做到的,说到底是库的问题,但据我所知只有个别的库可以这样实现HTTP。
  • 支持多个服务器进程响应请求。不需要HTTP负载平衡器。
  • 没有手动重试处理。停止服务器,调用一个RPC请求,然后启动服务器,然后得到一个响应-消息是队列化的而不是直接传送。
让我们体验一下吧!
Garfielt
Garfielt
翻译于 2013/07/23 12:33
1

一个简单的Clojure程序教程

Clojure的Java程序可能无法一眼便了解意图,这里是一个简单的注释。

;; Quick Clojure tutorial, with Java equivalents

 ;;  Java method call on string instance. Returns "A STRING"
(.toUpperCase "a string")
"a string".toUpperCase()

 ;; Java method call with one argument. Returns byte[].
(.getBytes "a string" "UTF8")
"a string".getBytes("UTF8")

 ;; Calls the constructor and creates a new instance of Point
(Point. 15 21)
new Point(15, 21)

;; Create a new thread and start it
(.start (Thread. (fn [] ...)))
Thread t = new Thread(aClojureFunctionHere)
t.start()

ZeroMQ教程

ZeroMQ命名的确很巧妙。它不是一个消息队列,没有服务器或你需要下载并运行的第三方依赖。事实上,ZeroMQ没有集中的组件。换句话说,ZeroMQ的名字很恰当:ZeroMQ中有零个消息队列零。那它是什么?

ZeroMQ是一个网络库。你创建不同类型的socket(REQ、REP、DEALER、ROUTER、PUB、SUB……),以各种有趣的方式连接这些socket,发送信息到这些socket。socket的类型决定了消息如何路由和传递。

如何连接socket、如何使用它们调用RPC代替HTTP,就是这篇博客涵盖的内容。

不要以为ZeroMQ是ActiveMQ或RabbitMQ的一种替代方案,或者认为ZeroMQ是一个原子支持网络或socket库。

Garfielt
Garfielt
翻译于 2013/07/23 13:09
1

迭代1:请求和回复,无并发

第一个例子是你永远不会在生产代码中使用,因为它没有并发。我们对一些短小独立的代码改善迭代,让他们达到我们预期的效果,这些代码演示了一些ZeroMQ重要概念。

我们将创建两个socket。一个REQ(请求)socket。ZeroMQ的socket更具类型有不同的API。如果你打算将两个信息在一组从socket发送,你会得到一个错误。你需要先发送,在等待到答复后你才能再次发送。

另一个socket是一个REP(回复)socket。它有一个类似但相反的API。如果你对REP做的第一件事是发送数据,你会得到一个错误,你必须以等待消息开始,只有当你发送一个消息才可以做其他的。然后你必须等待另一个消息。

这个API使它容易做RPC调用,正如ZeroMQ将为我们处理所有的细节。我们需要做的是等待响应和答复答复回复,发出请求并等待响应。

Garfielt
Garfielt
翻译于 2013/07/23 14:37
1

显然,例子中的一个简单的ZeroMQ的REQ或REP的处理并不会有并发。你一次只能发送一个请求或者处理一个相应。后面我们会看到并发的例子。

;; 非常简单的REQ/REP初始化,只是用于基本的演示
;; 一次一个请求,没有并发。

(.start
 (Thread.
  (fn []
    (let [sock (.socket (ZMQ/context 1) ZMQ/REP)]
      (.bind sock "tcp://127.0.0.1:1337")
      (while true
        ;; 阻塞住,直到收到消息
        (let [req (.recv sock 0)]
          ;; req是一个byte[]类型。你可以对其做一些处理!
          ;; 将req回显给客户端。
          (.send sock (.getBytes (str (String. req) " - echoed!")) ZMQ/NOBLOCK)))))))

(.start
 (Thread.
  (fn []
    (let [sock (.socket (ZMQ/context 1) ZMQ/REQ)]
      (.connect sock "tcp://127.0.0.1:1337")
      (dotimes [n 5]
        ;; 处理一次请求
        (.send sock (.getBytes (str "Hello, " n)) 0)
        ;; 阻塞住,直到收到响应
        (let [res (.recv sock 0)]
          ;; res是byte[]类型,包含REP包的回复内容。
          (println (String. res))))))))

从这个小例子中我们可以了解很多东西。

  • 看见没,没有用到代理!再重复一次,ZeroMQ不是传统的消息队列,它是一个网络库。(但是包含了一些类似消息队列的原语,往下看!)
  • 假定消息是字符串,与 JVM 默认的编码一致。但是对 ZeroMQ 来说都是 byte 类型。
  • bind/connect的顺序并没有关系。呵呵,如果不是这样的话,我们就不得不确保在连接REQ之前必须绑定好REP,实际上我们没有这样做(可以看到,线程代理的顺序并没有指定)。对于ZeroMQ 的套接字,真实的连接对我们来说是隐藏的。我们发送一条消息时,并没有立即连接,消息被加入REQ套接字上的本地队列,由REQ去决定何时连接。试一下,在bind之前加一行代码(Thread/sleep1000),可以看到代码仍然是工作的。
  • 单个ZeroMQ套接字无法在不同的线程之间并发的使用。实际上,限制跟线程没有关系,而是跟状态有关。上面解释了,我们不可能在REQ上并排发送两次,那样会报错。所以你没法在多线程中使用同一个套接字。不过后面我们会提到,如何来共享一直变化的状态。
zicode
zicode
翻译于 2013/07/26 16:33
1

迭代2:增加并发应答

到现在为止,我们一直保持请求代码不变,不过我们将替代每次只能进行一个应答的单个线程。

ZeroMQ想到了进程内消息传递,而且这就是我们用来获得并发应答的方法。我们将创建多个线程,并且每个线程对应一个REP套接字,然后使用具有新套接字的新线程把这些线程连接为一个大对象。这时就没有跨线程的状态了,我们在线程间要传递的唯一“状态”就是ZeroMQ消息。

为了获得并发应答,我们创建一个类型为DEALER的套接字。我们把这个套接字与"inproc://“协议捆绑在一起,然后把真正的REP套接字和这个DEALER套接字连接在一起。当DEALER套接字接收到消息的时候,它就把这个消息传递给一个REP套接字,然后看看那些套接字正忙。当一个REP套接字应答的时候,它将给只把消息按原样向前传递DEALER同样的应答。这种配置考虑到对许多REP套接字前转消息的一个顶级套接字。

几点人
几点人
翻译于 2013/08/03 14:15
1

在这片文章的所有例子里,我们将在一个单独的TCP连接上使用多个REQ套接字来实现并发请求。不过,如果你需要支持到我们服务器的多个TCP连接的话,我们当然需要这样的连接,那么我们需要在DEALER前面有一个ROUTER。例如我们可能有多个系统正连接这一个服务,并要求每个服务/进程对应各自的TCP连接。或者我们可能需要连接到服务器来维护RPC调用。

ROUTER将给每个连接到它的套接字一个内部ID号,然后立刻前转附带有这个号码的元数据的消息,包含内部套接字ID。当它获得返回的消息的时候,它立刻前转消息给已经连接的并出现在元数据里的套接字ID对应的REQ套接字。

(defn my-response-handler
  "Takes a req (bytes), returns the response (also bytes)."
  [req]
  (.getBytes (str (String. req) " - echoed!")))

(let [ctx (ZMQ/context 1)
      worker-url "inproc://responders"
      router-socket (.socket ctx ZMQ/ROUTER)
      dealer-socket (.socket ctx ZMQ/DEALER)]
  (.bind router-socket "tcp://127.0.0.1:1337")
  (.bind dealer-socket worker-url)
  ;;现在我们可以并发地响应10个请求了
  (dotimes [n 10]
    (.start
     (Thread.
      (fn []
        (let [sock (.socket ctx ZMQ/REP)]
          ;; 我们应答DEALER
          (.connect sock worker-url)
          (while true
            ;; 同以前相同的API- 接收消息,然后应答。
            (let [req (.recv sock 0)]
              (.send sock (my-response-handler req) ZMQ/NOBLOCK))))))))
  (.start
   (Thread.
    ;; 前转来自ROUTER的消息给DEALER或者相反。
    (fn [] (.run (ZMQQueue. ctx router-socket dealer-socket))))))
你可以用前面例子里的REP线程替代这段代码,而且它也可以正常运行。不同之处在于现在我们已经有十个响应请求的线程了。
几点人
几点人
翻译于 2013/08/03 14:19
1
要注意的几个事项:
  • ZMQQueue是一个内置的便利函数,它可以把所有的消息传递给其他套接字或者相反。在我们的部署里,这意味着来自ROUTER的所有的消息都将发送给DEALER,并且所有来自DEALER的消息将发送给ROUTER,而且不需要我们书写这段代码完成这样的事情。
  • 在ZMQQueue上调用.run将阻塞当前线程,这就是我们让它拥有一个独立线程的原因。
  • "inproc://"协议的绑定/连接事件的顺序。对inproc来说,顺序如何都是真的;正如我们前面看到的"tcp://"首先要进行连接,然后才绑定。而对inproc来说,你不需要冒关闭或者没有起来等这些风险,而且你控制着inproc的绑定和连接的流,因此与其说顺序产生了重要问题不如说它更不方便。这也意味着连接inproc非常容易-它不需要检查是否连接已经完成,做重试等,所以inproc比TCP有效多了,不仅仅因为它不需要TCP握手,和校验等。
几点人
几点人
翻译于 2013/08/03 14:23
1

迭代三:增加并发请求

现在我没拥有了应答多个的多线程。是时候用更聪明的且让我们可获得并发的东西替代单个REQ套接字了。

你可能已经猜到,我们只要创建无数REQ套接字就可以了。虽然这个主意很糟糕,因为每个REQ套接字都需要拥有它们自己的TCP连接。我们可以拥有一个可用的REQ套接字池,这样我们每次做请求的时候就不需要花费大力气创建新的TCP连接了。然而ZeroMQ有一个更好的方案,这种情况下它可以处理完们遇到的所有令人讨厌的问题。

我们不能脱离这样一个事实:REQ用一个阻塞的API。我们假设你处在阻塞请求是可行的这种环境里。我们所使用的例子一直是Java HTTPservlet响应者,它需要通过ZeroMQ调用内部服务。我不能确定ZeroMQ是否具有异步API。
几点人
几点人
翻译于 2013/08/03 15:06
1

好,现在看看真实的代码。

(defn connect
  [server-url]
  (let [ctx (ZMQ/context 1)
        worker-url (str "inproc://" (java.util.UUID/randomUUID))
        queue-thread (Thread.
                      (fn []
                        (let [client-sock (.socket ctx ZMQ/DEALER)
                              worker-sock (.socket ctx ZMQ/ROUTER)]
                          (.connect client-sock server-url)
                          (.bind worker-sock worker-url)
                          (.run (ZMQQueue. ctx  client-sock worker-sock)))))]
    (.start queue-thread)
    {:ctx ctx
     :worker-url worker-url
     :queue-thread queue-thread}))

(defn disconnect
  "Useful for tests etc. Pass the map returned by `connect` above."
  [connection]
  (.interrupt (get connection :queue-thread))
  (.term (get connection :ctx)))

(defn with-req-sock
  "Takes the connection and a higher order function that is passed a new REQ
   socket. When this function returns, the REQ socket is destroyed."
  [connection handler]
  (let [socket (.socket (get connection :ctx) ZMQ/REQ)]
    (.connect socket (get connection :worker-url))
    (try
      (handler socket)
      (finally (.close socket)))))

;; Usage
(def connection (connect "tcp://127.0.0.1:1337"))

(dotimes [n 5]
  (.start
   (Thread.
    (fn []
      (with-req-sock connection
        (fn [sock]
          (.send sock (.getBytes (str "Hello, " n)) 0)
          (let [res (.recv sock 0)]
            (println (String. res)))))))))
(connectserver-url)创建了一些新连接。我们为我们的进程创建了一个单独的连接,让后在我们需要进行请求的时候把请求传给它。我们对每个请求创建一个新的REQ套接字,然后使用"inproc://"发送消息给ROUTER。ROUTER然后前转消息给DEALER(像以前一样通过ZMQQueue)。DEALER通过TCP连接真正的服务器。 要注意的几个事项:
  • 如果你运行这段代码,那么在你的终端将交叉的插入输出。这是因为请求和应答同时发生。
  • 另外,线程间除了inproc的url外没有其他状态共享,我们真正要做的一切是在它们之间发送ZeroMQ消息。
  • 正如上面已经提到的,我们可以直接对服务器发送REQ,不过这意味着我们需要对每个请求执行完整的TCP连接。在这样的部署里,DEALER是一个单独的TCP连接,其他的所有都是inproc。我们可以结束在同一个TCP连接上运行的并发请求。DEALER不真正关心消息类型,因此它可以发送5个请求,得到2个应答,然后发送2个或者更多请求等等。ROUTER套接字根据内部的套接字ID来关注发送消息给正确的地方。
  • 正如代码所呈现那样,现在我们创建了多个处在独立线程里的REQ套接字,并执行了请求。通常你对每个进入的请求使用(with-req-sock)创建新的REQ。
几点人
几点人
翻译于 2013/08/03 15:09
1

实现真正的 RPC

迄今为止,你已经学习到的所有东西就是如何使用ZeroMQ来来回回的发送消息。我们已经看到我们能在一个TCP连接上并发地发送任意数量的请求和应答。我们启动一个客户端,然后发送请求,接着我们启动服务器,然后即使在我们没有代理的情况下,一切运行正常。我们不需要管理连接而且也不需要手动地重复多次的处理了。

然而怎样才能真正的实现RPC呢?迄今为止,我们已经发送了字节并显示了这些字节。我们需要有一种调用多个过程的方法,而且不需要过程调用还可以发送数据。这里我们应该做什么才能发送这些有用的东西呢?

让我们效仿一下HTTP!我喜欢这种方法、路径和内容体的语义,因此让我们保留这些。类似于HTTP,我们总是应答,而且有时候是含有错误的应答-不存在“null"应答。由于ZeroMQ系统字节,所以我们使用SMILE做为数据格式,它是类JSON的格式,使用这种格式它能知道如何把映射、列表、集合和字符串编码为后来可以解码的东西 。这就是所谓的“二进制”格式,因此它可以传送原始字节,这是就不存在任何字符串编码的问题了。

几点人
几点人
翻译于 2013/08/03 15:29
1
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接。
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
加载中

评论(3)

qwfys
qwfys
~~
写下带不走的风
写下带不走的风
线程模型是什么?
七七是久
七七是久
这翻译,字都认识,合起来就是看不懂...
返回顶部
顶部