发布一个node.js编写的消息服务器,支持最新的websocket草案

mallon 发布于 2012/01/20 14:02
阅读 5K+
收藏 21

简介

Json Messaging是使用node.js技术构建的发布/订阅类型的消息服务器,具有如下特性:

1、支持TCPWebSocket协议;

2、传输帧使用Json格式;

3、可以使用正则表达式订阅消息目的地,正则表达式中可以包含“捕获”,所有目的地匹配该正则表达式的消息,连同目的地的“捕获”都将发送到订阅方;

4、一个客户端可以订阅多个消息目的地;

5、为了简化设计,服务器端不持久化消息。

项目

https://sourceforge.net/projects/jsonmessaging/

下载

https://sourceforge.net/projects/jsonmessaging/files/

致谢

Json Messaging消息服务器使用了很多第三方的框架和技术,感谢他们辛勤的工作。

Jsonhttp://www.json.org

node.jshttp://nodejs.org

node-uuidhttps://github.com/broofa/node-uuid

WebSocket-Nodehttps://github.com/Worlize/WebSocket-Node

使用方法

1、编译安装node.js

2、打开server/config.js,可以配置TCPWebSocket端口;

3、启动消息服务器:node server/server.js

例子

使用最新版本的Firefox或者Chrome打开下面的HTML文件可以发送和接收消息。

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Json Messaging Example</title>
    <style>
        div#output {
            border: 1px solid #000;
            width: 960px;
            height: 450px;
            overflow: auto;
            background-color: #333;
            color: #6cf;
        }

        strong {
            color: #f66;
        }
        
        input#input {
            border: 1px solid #000;
            width: 640px;
        }

        button {
            border: 1px solid #000;
            width: 100px;
        }
    </style>
    <script>
        // connect to the Json Messaging server and return an 'connection' object
        function connect(host, port, messageListener, errorListener) {
            window.WebSocket = window.WebSocket || window.MozWebSocket;

            if (!window.WebSocket) {
                alert('Your browser does not support WebSocket.');
                return null;
            }

            var connection = new WebSocket('ws://' + host + ':' + port);

            connection.onmessage = function(message) {
                try {
                    var parsed = JSON.parse(message.data);
                    switch (parsed.type) {
                        case 'message':
                            if (messageListener) {
                                messageListener(parsed.content, parsed.match);
                            }
                            break;
                        case 'error':
                            if (errorListener) {
                                errorListener(parsed.content);
                            }
                            break;
                        default:
                            throw new Error('Unknown message type ' + parsed.type);
                            break;
                    }
                } catch (e) {
                    console.warn(e);
                    alert(e);
                }
            };

            connection.publish = function(content, destination) {
                connection.send(JSON.stringify({
                    type: 'publish',
                    destination: destination,
                    content: content
                }));
            };

            connection.subscribe = function(destination) {
                connection.send(JSON.stringify({
                    type: 'subscribe',
                    destination: destination
                }));
            };

            connection.unsubscribe = function(destination) {
                connection.send(JSON.stringify({
                    type: 'unsubscribe',
                    destination: destination
                }));
            };

            return connection;
        }

        // the 'connection' object
        var connection = null;

        var output = null;

        var input = null;

        // initialize
        window.onload = function() {
            output = document.getElementById('output');
            input = document.getElementById('input');

            // connect to the local server
            connection = connect(
                    'localhost',
                    8155,
                    // message handler
                    function(content, match) {
                        output.innerHTML += ('<strong>Message: </strong>' + content + '<br>\n');
                    },
                    // error handler
                    function(content) {
                        output.innerHTML += ('<strong>Error: </strong>' + content + '<br>\n');
                    }
            );

            // subscribe a topic
            connection.onopen = function() {
                connection.subscribe('test');
            };
        };

        function _send() {
            connection.publish(input.value, 'test');
        }

        function _clear() {
            output.innerHTML = '';
        }
    </script>
</head>
<body>
<div id="output"></div>
<input type="text" id="input">
<button id="send" onclick="_send()">Send</button>
<button id="clear" onclick="_clear()">Clear</button>
</body>
</html>

下面的C程序发送三条HelloWorld消息,第一条是英文,第二条是中文,第三条是Unicode转义的中文。注意源代码必须以UTF-8编码保存。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>

int main(int argc, char ** argv)
{
    int fd;
    struct sockaddr_in addr;
    int ret;
    const char publish_frame_1[] =
    "{\"type\":\"publish\",\"destination\":\"test\",\"content\":\"Hello World\"}";
    const char publish_frame_2[] =
    "{\"type\":\"publish\",\"destination\":\"test\",\"content\":\"你好世界\"}";
    const char publish_frame_3[] =
    "{\"type\":\"publish\",\"destination\":\"test\",\"content\":\"\\u4f60\\u597d\\u4e16\\u754c\"}";
    
    fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    addr.sin_port = htons(8153);
    
    ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
    printf("%d\n", ret);
    
    ret = write(fd, publish_frame_1, sizeof(publish_frame_1));
    printf("%d\n", ret);
    
    ret = write(fd, publish_frame_2, sizeof(publish_frame_2));
    printf("%d\n", ret);
    
    ret = write(fd, publish_frame_3, sizeof(publish_frame_3));
    printf("%d\n", ret);
}

帧格式

消息服务器的应用层数据帧采用Json格式,使用UTF-8编码的纯文本,在TCP协议中,使用'\0'作为帧间分隔,在WebSocket协议中遵循WebSocket草案标准。

帧格式有5类,其中,客户端到服务器端的3类,服务器端到客户端的2类。

客户端到服务器端

发布帧

客户端发送一条消息到服务器的目的地中,所有连接到服务器并且订阅了该目的地(正则表达式匹配)的客户端都能接收到该消息。

帧格式为:

{

"type": "publish",

"destination": <消息目的地>,

"content": <消息内容>

}

其中,消息目的地为字符串类型;消息内容同样也必须是Json格式的。

订阅帧

客户端订阅服务器的一个目的地,所有匹配该目的地的消息都会发送到该客户端。

帧格式为:

{

"type": "subscribe",

"destination": <消息目的地>

}

其中,消息目的地可以是正则表达式,且正则表达式中可以含有“捕获”,服务器会使用该正则表达式匹配发送的消息目的地,如果符合,则会把该消息连同匹配结果一并发给客户端,在下面的“消息帧”介绍中有具体的例子。

同一个客户端可以订阅多个目的地。

取消订阅帧

客户端取消订阅服务器的一个目的地。

帧格式为:

{

"type": "subscribe",

"destination": <消息目的地>

}

其中,消息目的地等于订阅帧中的消息目的地。

当客户端断开连接后,服务器端会自动取消该客户端的所有订阅。

服务器端到客户端

消息帧

一旦消息目的地匹配,服务器端会把匹配结果连同消息内容发给客户端。

帧格式为:

{

"type": "message",

"match": <匹配结果>,

"content": <消息内容>

}

匹配结果为一个数组,至少包含一个元素,即订阅的消息目的地;如果订阅的消息目的地是正则表达式且其中含有“捕获”,那么从第二往后的元素为捕获结果,参考JavaScript正则表达式规范。

举例:

假设设备的网口状态信息在消息服务中发布,规定目的地格式为:“/devices/<设备名>/<网口名>”;消息内容为:“down”表示停止、“up”表示启动。

下面两个发布帧,表示设备a的第1个网口停止了,而设备b的第个网口启动了:

{"type":"publish","destination":"/devices/a/if1","content":"down"}

{"type":"publish","destination":"/devices/b/if0","content":"up"}

如果客户端订阅目的地为“/devices/.*”,那么它将能收到所有设备的所有网口的状态消息,接收到的消息帧如下:

{"type":"message","match":["/devices/a/if1"],"content":"down"}

{"type":"message","match":["/devices/b/if0"],"content":"up"}

如果想在程序中更方便地对设备和网口做分类处理,可以把订阅目的地改为“/devices/(.*)/(.*)”,其中小括号即为“捕获”。

接收到的消息帧会变为:

{"type":"message","match":["/devices/a/if1","a","if1"],"content":"down"}

{"type":"message","match":["/devices/b/if0","b","if0"],"content":"up"}

可以看到,match中增加了捕获的结果。

错误帧

如果服务器端产生错误,例如客户端发送的帧超长、非Json格式等,将会向客户端返回错误帧。

帧格式为:

{

"type": "error",

"content": <错误内容>

}

客户端可以对错误进行相应的处理。

源代码结构

server.js

程序入口。

config.js

全局配置,其中的udpPort并没有使用,因为UDP难以知晓客户端状态,所以不打算实现UDP协议。

log.js

控制台日志,相比其它第三方的日志模块的特点是使用简单,而且能够输出日志产生的源代码的位置,便于调试。

protocol.js

协议帧的包装。

exchange.js

负责处理发布和订阅的消息,是服务器代码的核心部分。

tcp.js

TCP协议的实现。

ws.js

WebSocket协议的实现。

加载中
0
mallon
mallon

截图:

0
c
codehive
这个必须顶啊,我还在自己搞呢,等下看看如何,我了个去,必须顶下
0
c
codehive
怎么没有源码啊 @mallon 不是开源嘛
0
c
codehive
@mallon 提供源码参考下吧,最近我也在做类似的东西,如果不错,我也参与开发,不如搞到github上吧
0
mallon
mallon
sourceforge上有源代码啊
mallon
mallon
@codehive : 应该可以了,上传的文件需要设置适合的操作系统类型,NND真复杂。
c
codehive
再检查一边吧,只有提示文件大小,好像是没有可下载的,我是未登录用户
0
mallon
mallon

引用来自“codehive”的答案

@mallon 提供源码参考下吧,最近我也在做类似的东西,如果不错,我也参与开发,不如搞到github上吧
github和git不会用,哈哈
0
mallon
mallon

花了一上午学习git github markdown,终于传上去了:

https://github.com/shajunxing/Json_Messaging

0
Injection
Injection

引用来自“mallon”的答案

花了一上午学习git github markdown,终于传上去了:

https://github.com/shajunxing/Json_Messaging

多谢,正在学习html5里面的websocket,但是好多http server不支持.这下好了.

不过能提供完整的http+websocket就好了.

0
mallon
mallon

引用来自“yinhao”的答案

引用来自“mallon”的答案

花了一上午学习git github markdown,终于传上去了:

https://github.com/shajunxing/Json_Messaging

多谢,正在学习html5里面的websocket,但是好多http server不支持.这下好了.

不过能提供完整的http+websocket就好了.

websocket可以和web服务器分开独立架设的
返回顶部
顶部