基于Comet的技术的Tornado 长连接实现

基于Comet的技术主要分为流(streaming)方式和长轮询(long-polling)方式。

客户端发起的请求是长连的。即用户发起请求后就挂起,等待服务器返回数据,在此期间不会断开连接。流方式和长轮询方式的区别就是:对于流方式,客户端发起连接就不会断开连接,而是由服务器端进行控制。当服务器端有更新时,刷新数据,客户端进行更新;而对于长轮询,当服务器端有更新返回,客户端先断开连接,进行处理,然后重新发起连接。

为什么需要流(streaming)和长轮询(long-polling)两种方式呢?

是因为:对于流方式,有诸多限制。如果使用AJAX方式,需要判断XMLHttpRequest 的 readystate,即readystate==3时(数据仍在传输),客户端可以读取数据,而不用关闭连接。问题也在这里,IE 在 readystate 为 3 时,不能读取服务器返回的数据,所以目前 IE 不支持基于 Streaming AJAX,而长轮询由于是普通的AJAX请求,所以没有浏览器兼容问题。另外,由于使用streaming方式,控制权在服务器端,并且在长连接期间,并没有客户端到服务器端的数据,所以不能根据客户端的数据进行即时的适应(比如检查cookie等等),而对于long polling方式,在每次断开连接之后可以进行判断。所以综合来说,long polling是现在比较主流的做法(如fb,Plurk)。

流方式首先一种常用的做法是使用AJAX的流方式(如先前所说,此方法主要判断readystate==3时的情况,所以不能适用于IE)。

服务器端代码像这样:

class StreamingHandler(tornado.web.RequestHandler):
    '''使用asynchronus装饰器使得post方法变成无阻塞'''
    @tornado.web.asynchronous
    def post(self):
        self.get_data(callback=self.on_finish)
             
    def get_data(self, callback):
        if self.request.connection.stream.closed():
            return
             
        num = random.randint(1, 100) #生成随机数
        callback(num) #调用回调函数
             
    def on_finish(self, data):
        self.write("Server says: %d" % data)
        self.flush()
         
        tornado.ioloop.IOLoop.instance().add_timeout(
            time.time()+3, 
            lambda: self.get_data(callback=self.on_finish)
        )

对于服务器端,仍然是生成随机数字,由于要不断输出数据,于是在回调函数里延迟3秒,然后继续调用get_data方法。在这里要注意的是,不能使用time.sleep(),由于tornado是单线程的,使用sleep方法会block主线程。因此要调用IOLoop的add_timeout方法(参数0:执行时间戳,参数1:回调函数)。于是服务器端会生成一个随机数字,延迟3秒再生成随机数字,循环往复。

于是前端js就是:

try {
    var request = new XMLHttpRequest(); 
} catch (e) {
    alert("Browser doesn't support window.XMLHttpRequest");
}
                         
var pos = 0;
request.onreadystatechange = function () {
    if (request.readyState === 3) { //在 Interactive 模式处理
        var data = request.responseText; 
        $("p").append(data.substring(pos)+"<br>");
        pos = data.length;
    }
};
request.open("POST", "/streaming", true);
request.send(null);

对于tornado来说,调用flush方法,会将先前write的所有数据都发送客户端,也就是response的数据处于累加的状态,所以在js脚本里,我们使用了pos变量作为cursor来存放每次flush数据结束位置。

另外一种常用方法是使用IFrame的streaming方式,这也是早先的常用做法。首先我们在页面里放置一个iframe,它的src设置为一个长连接的请求地址。Server端的代码基本一致,只是输出的格式改为HTML,用来输出一行行的Inline Javascript。由于输出就得到执行,因此就少了存储游标(pos)的过程。服务器端代码像这样:

lass IframeStreamingHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.get_data(callback=self.on_finish)
             
    def get_data(self, callback):
        if self.request.connection.stream.closed():
            return
             
        num = random.randint(1, 100)
        callback(num)
             
    def on_finish(self, data):
        self.write("<script>parent.add_content('Server says: %d<br />');</script>"  % data)
        # 输出的立刻执行,调用父窗口js函数add_content
        self.flush()
         
        tornado.ioloop.IOLoop.instance().add_timeout(
            time.time()+3, 
            lambda: self.get_data(callback=self.on_finish)
        )

在客户端我们只需定义add_content函数:

var add_content = function(str){
    $("p").append(str);
};

由此可以看出,采用IFrame的streaming方式解决了浏览器兼容问题。但是由于传统的Web服务器每次连接都会占用一个连接线程,这样随着增加的客户端长连接到服务器时,线程池里的线程最终也就会用光。因此,Comet长连接只有对于非阻塞异步Web服务器才会产生作用。这也是为什么选择tornado的原因。

使用iframe方式一个问题就是浏览器会一直处于加载状态。

长轮询是现在最为常用的方式,和流方式的区别就是服务器端在接到请求后挂起,有更新时返回连接即断掉,然后客户端再发起新的连接。于是Server端代码就简单好多,和上面的任务类似:

class LongPollingHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def post(self):
        self.get_data(callback=self.on_finish)
             
    def get_data(self, callback):
        if self.request.connection.stream.closed():
            return
             
        num = random.randint(1, 100)
        tornado.ioloop.IOLoop.instance().add_timeout(
            time.time()+3,
            lambda: callback(num)
        ) # 间隔3秒调用回调函数
             
    def on_finish(self, data):
        self.write("Server says: %d" % data)
        self.finish() # 使用finish方法断开连接

Browser方面,我们封装成一个updater对象:

var updater = {
    poll: function(){
        $.ajax({url: "/longpolling", 
                type: "POST", 
                dataType: "text",
                success: updater.onSuccess,
                error: updater.onError});
    },
    onSuccess: function(data, dataStatus){
        try{
            $("p").append(data+"<br>");
        }
        catch(e){
            updater.onError();
            return;
        }
        interval = window.setTimeout(updater.poll, 0);
    },
    onError: function(){
        console.log("Poll error;");
    }
};

要启动长轮询只要调用

updater.poll();

长轮询与普通的轮询相比更有效率(只有数据更新时才返回数据),减少不必要的带宽的浪费;同时,长轮询又改进了streaming方式对于browser端判断并更新不足的问题。

WebSocket:未来方向

以上不管是Comet的何种方式,其实都只是单向通信,直到WebSocket的出现,才是B/S之间真正的全双工通信。不过目前WebSocket协议仍在开发中,目前Chrome和Safri浏览器默认支持WebSocket,而FF4和Opera出于安全考虑,默认关闭了WebSocket,IE则不支持(包括9),目前WebSocket协议最新的为“76号草案”。有兴趣可以关注http://dev.w3.org/html5/websockets/%E3%80%82

在每次WebSocket发起后,B/S端进行握手,然后就可以实现通信,和socket通信原理是一样的。目前,tornado2.0版本也是实现了websocket的“76号草案”。详细可以参阅文档。我们还是只是在通信打开之后发送一堆随机数字,仅演示之用。

import tornado.websocket
 
class WebSocketHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        for i in xrange(10):
            num = random.randint(1, 100)
            self.write_message(str(num))
         
    def on_message(self, message):
        logging.info("getting message %s", message)
        self.write_message("You say:" + message)

客户端代码也很简单和直观:

var wsUpdater = {
    socket: null,
    start: function(){
        if ("WebSocket" in window) {
            wsUpdater.socket = new WebSocket("ws://localhost:8889/websocket");
        } 
        else {
            wsUpdater.socket = new MozWebSocket("ws://localhost:8889/websocket");
        }
        wsUpdater.socket.onmessage = function(event) {
            $("p").append(event.data+"<br>");
        };
    }
};
wsUpdater.start();
Relative Articles