php做sockets稳定吗,求代码改良

hphper 发布于 2015/02/06 10:54
阅读 1K+
收藏 4
PHP

下面代码要实现的是,后端向前端页面推送消息,通过websocket,服务端代码 是拿网上代码改的

服务端 server.php


<?php
error_reporting ( E_ALL );
ob_implicit_flush ();

$sk = new Sock ( '127.0.0.1', 8888 );
$sk->run ();
class Sock {
	public $sockets;
	public $users;
	public $master;
	public $notice = true; //是否告知由于 不在线推送失败
	
	public function __construct($address, $port) {
		$this->master = $this->WebSocket ( $address, $port );
		$this->sockets = array ('s' => $this->master );
	}
	
	public function run() {
		while ( true ) {
			$this->reset ();
			$changes = $this->sockets;
			socket_select ( $changes, $write = NULL, $except = NULL, NULL );
			foreach ( $changes as $sock ) {
				if ($sock == $this->master) {
					$client = socket_accept ( $this->master );
					$this->sockets [] = $client;
					$this->users [] = array ('socket' => $client, 'shou' => false );
				} else {
					$len = socket_recv ( $sock, $buffer, 2048, 0 );
					$k = $this->search ( $sock ); //当前socket连接
					if ($len < 7) { //监听到页面  sockets链接关闭时
						$name = $this->users [$k] ['user'];
						$this->close ( $sock );
						$this->send2 ( $name, $k );
						continue;
					}
					if (! $this->users [$k] ['shou']) {
						$this->woshou ( $k, $buffer );
					} else {
						$buffer = $this->uncode ( $buffer );
						//链接页面在刷新时处理
						if (! $buffer) {
							$this->woshou ( $k, $buffer );
							continue;
						}
						$this->send ( $k, $buffer );
					}
				}
			}
		
		}
	
	}
	
	public function close($sock) {
		$k = array_search ( $sock, $this->sockets );
		socket_close ( $sock );
		unset ( $this->sockets [$k] );
		unset ( $this->users [$k] );
		$this->e ( "key:$k close" );
	}
	
	public function search($sock) {
		foreach ( $this->users as $k => $v ) {
			if ($sock == $v ['socket'])
				return $k;
		}
		return false;
	}
	
	public function send($k, $msg) {
		parse_str ( $msg, $g );
		$this->e ( $msg );
		$ar = array ();
		$close = false;
		if ($g ['type'] == 'add') { //展示页面的打开
			$key = 'all';
			$this->checkUser ( $g ['user'] );
			$this->users [$k] ['user'] = $g ['user'];
			$this->users = array_values ( $this->users );
			$ar ['add'] = true;
			$ar ['data'] = $g ['user'] . '   come!';
			$ar ['users'] = $this->getusers ();
		} else if ($g ['type'] == 'push') { //发送完就立刻断掉socket连接
			$touser = $g ['to'];
			$key = $this->getSockKey ( $touser ); //要推送到socket连接索引
			$to = $this->getuser ( $key );
			$ar ['data'] = $g ['data'];
			$close = true;
		}
		$msg = json_encode ( $ar );
		$this->e ( $msg );
		$msg = $this->code ( $msg );
		$this->send1 ( $k, $msg, $key, $close );
	}
	//加入同名的用户连接时把之前的连接信息删除     连接关闭
	public function checkUser($user) {
		$flag = false;
		foreach ( $this->users as $k => $info ) {
			if (array_key_exists ( 'user', $info ) && $info ['user'] == $user) {
				socket_close ( $info ['socket'] );
				unset ( $this->users [$k] );
				$flag = true;
			}
		}
		if ($flag) {
			$sockets ['s'] = $this->sockets ['s'];
			foreach ( $this->users as $uu ) {
				$sockets [] = $uu ['socket'];
			}
			$this->sockets = $sockets;
		}
	}
	
	/**
	 * 通过用户名获取 sock的数组索引
	 */
	public function getSockKey($user) {
		foreach ( $this->users as $key => $info ) {
			if (array_key_exists ( 'user', $info ) && $info ['user'] == $user) {
				return $key;
			}
		}
		return false;
	}
	
	public function send1($k, $str, $key = 'all', $close = true) {
		$keybefore = $key;
		if ($key === false) { //收信者不存在
			$this->resetSocket ( $k, false );
			return false;
		}
		
		if ($key === 'all') {
			foreach ( $this->users as $v ) {
				socket_write ( $v ['socket'], $str, strlen ( $str ) );
			}
		} else {
			if ($k != $key) {
				//socket_write($this->users[$k]['socket'],$str,strlen($str));
				socket_write ( $this->users [$key] ['socket'], $str, strlen ( $str ) );
				
				if ($close) { //关闭一次性的Push连接socket
					$this->resetSocket ( $k );
				}
			}
		}
	}
	/**
	 * 关闭某个已认证的连接资源 并重组 sockets和users连接资源
	 * @param $index int  在sockets里的索引
	 */
	public function resetSocket($index, $exist = true) {
		//由于要推送到的人 未在线 告知推送失败
		if (! $exist && $this->notice) {
			$msg = $this->code ( "push failed" );
			socket_write ( $this->sockets [$index], $msg, strlen ( $msg ) );
		}
		
		socket_close ( $this->sockets [$index] );
		unset ( $this->users [$index] );
		$this->users = array_values ( $this->users );
		$sockets = array ();
		$sockets ['s'] = $this->sockets ['s'];
		foreach ( $this->users as $info ) {
			$sockets [] = $info ['socket'];
		}
		$this->sockets = $sockets;
	}
	//重排  
	public function reset() {
		if ($this->users) {
			$this->users = array_values ( $this->users );
			$sockets ['s'] = $this->sockets ['s'];
			foreach ( $this->users as $info ) {
				$sockets [] = $info ['socket'];
			}
			$this->sockets = $sockets;
		}
	}
	
	public function send2($user, $k) {
		$ar ['remove'] = true;
		$ar ['removekey'] = $k;
		$ar ['nrong'] = $user . '退出';
		$str = $this->code ( json_encode ( $ar ) );
		$this->send1 ( false, $str, 'all' );
	}
	
	public function e($str) {
		$path = dirname ( __FILE__ ) . '/log.txt';
		$str = $str . "\n\r";
		error_log ( $str, 3, $path );
		echo iconv ( 'utf-8', 'gbk//IGNORE', $str );
	}
	public function WebSocket($address, $port) {
		$server = socket_create ( AF_INET, SOCK_STREAM, SOL_TCP );
		socket_set_option ( $server, SOL_SOCKET, SO_REUSEADDR, 1 );
		socket_bind ( $server, $address, $port );
		socket_listen ( $server );
		$this->e ( 'Server Started : ' . date ( 'Y-m-d H:i:s' ) );
		$this->e ( 'Listening on   : ' . $address . ' port ' . $port );
		return $server;
	}
	
	public function woshou($k, $buffer) {
		$buf = substr ( $buffer, strpos ( $buffer, 'Sec-WebSocket-Key:' ) + 18 );
		$key = trim ( substr ( $buf, 0, strpos ( $buf, "\r\n" ) ) );
		$new_key = base64_encode ( sha1 ( $key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true ) );
		$new_message = "HTTP/1.1 101 Switching Protocols\r\n";
		$new_message .= "Upgrade: websocket\r\n";
		$new_message .= "Sec-WebSocket-Version: 13\r\n";
		$new_message .= "Connection: Upgrade\r\n";
		$new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";
		
		socket_write ( $this->users [$k] ['socket'], $new_message, strlen ( $new_message ) );
		$this->users [$k] ['shou'] = true;
		return true;
	
	}
	
	public function uncode($str) {
		$buffer = "[buffer]\r\n" . var_export ( $str, true ) . "\r\n";
		$mask = array ();
		$data = '';
		$msg = unpack ( 'H*', $str );
		$head = substr ( $msg [1], 0, 2 );
		if (hexdec ( $head {1} ) === 8) {
			$data = false;
		} else if (hexdec ( $head {1} ) === 1) {
			$mask [] = hexdec ( substr ( $msg [1], 4, 2 ) );
			$mask [] = hexdec ( substr ( $msg [1], 6, 2 ) );
			$mask [] = hexdec ( substr ( $msg [1], 8, 2 ) );
			$mask [] = hexdec ( substr ( $msg [1], 10, 2 ) );
			
			$s = 12;
			$e = strlen ( $msg [1] ) - 2;
			$n = 0;
			for($i = $s; $i <= $e; $i += 2) {
				$data .= chr ( $mask [$n % 4] ^ hexdec ( substr ( $msg [1], $i, 2 ) ) );
				$n ++;
			}
		}
		return $data;
	}
	
	public function code($msg) {
		$msg = preg_replace ( array ('/\r$/', '/\n$/', '/\r\n$/' ), '', $msg );
		$frame = array ();
		$frame [0] = '81';
		$len = strlen ( $msg );
		$frame [1] = $len < 16 ? '0' . dechex ( $len ) : dechex ( $len );
		$frame [2] = $this->ord_hex ( $msg );
		$data = implode ( '', $frame );
		return pack ( "H*", $data );
	}
	
	public function ord_hex($data) {
		$msg = '';
		$l = strlen ( $data );
		for($i = 0; $i < $l; $i ++) {
			$msg .= dechex ( ord ( $data {$i} ) );
		}
		return $msg;
	}
	/**
	 * 获取所有连接在线的用户
	 */
	public function getusers() {
		$ar = array ();
		foreach ( $this->users as $k => $v ) {
			$ar [$k] = $v ['user'];
		}
		return $ar;
	}
	public function getuser($k) {
		if (is_numeric ( $k )) {
			return $this->users [$k] ['user'];
		} else {
			return 'all';
		}
	}
}
?>



前端页面 u0.html,接受推送的页面



<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<script type="text/javascript" src='jquery.min.js'></script>
<title>mywebsockets</title>
</head>
<body>
<script>
function callback(obj){
	if(obj.smallimg){
		$('#smallimg').html(obj.smallimg);
	}
	if(obj.bigimg){
		$('#bigimg').html(obj.bigimg);
	}

}
var WebSock={
 	init:function(url,user,callback){
 		var url=url||'ws://localhost:8888';
 		var sock=new WebSocket(url);
 		sock.onopen=function(){
 			if(sock.readyState==1){
 				sock.send('type=add&user='+user);
 				console.log('socket connect ok!');
 			}else{
 				console.log("socket connect error!");
 			}
 		};
 		sock.onmessage=function(evt){
 			var data=evt.data;
 			var obj=JSON.parse(data);
 			if(obj){
 				if(obj.add){
 					console.log(obj.users.join("\r\n"));
 				}
 				if(obj.data){
 					callback(obj.data);
 				}
 			}
 		};
 		sock.onerror=function(evt){
			console.log("socker error!");
		}
 	}
 };

WebSock.init('ws://localhost:8888','u0',callback);
</script>
<div id='smallimg'>small</div>
<div id='bigimg'>big</div>
</body>
</html>

推送方pusher.php


推送完立马断掉连接

<?php
function push($event){
	$dataStr=http_build_query($event);
	$str=<<<E
	<script type="text/javascript">
	var url="ws://localhost:8888";
	var socket=new WebSocket(url);
	socket.onopen=function(){
		if(socket.readyState==1){
			socket.send("type=push&{$dataStr}");
		}else{
			alert('push failed!');
		}
	}
	socket.onerror=function(){
		alert('connect err!');
	}
	socket.onmessage=function(evt){
		var data=evt.data;
		alert(data);
	}
	
</script>
E;
echo $str;
}

$event=array(
	'to'=>'u0', //推送给某个用户
	'data'=>array('smallimg'=>'au1','bigimg'=>date("H:i:s"))
);
push($event);



像u0.html这样的客户端很多的情况下,服务端那样写稳定吗,u0.html在刷新时 关闭掉以前的链接再新建一个连接,网页很多时 连接和用户都堆积在server里,最多能堆多少啊?


以下是问题补充:

@hphper:求解 为什么我在push里传的数据多点时,server里的uncode方法就转不过来了,求大家给小弟指点一下 (2015/02/06 14:20)
加载中
1
batcom
batcom

php已经有很成熟的解决方案了,高性能的有swoole 和 workerman 都是基于pecl方式的socket实现,支持异步。php层只用关心业务逻辑就可以了

hongmajia
hongmajia
我用的workerman,稳定性没的说,文档写的也不错
hphper
hphper
回复 @eechen : 哦,能帮忙看下我问题补充里的 问题吗
eechen
eechen
回复 @hphper : Swoole不是框架,是一个类似mysqli之类的PECL扩展,安装并开启扩展后直接 $http = new swoole_http_server("0.0.0.0", 9501);就能提供HTTP服务了, 可以看一下Swoole HTTP WebSocket聊天室PHPWebIM: http://webim.swoole.com/
batcom
batcom
回复 @hphper : 个人觉得上面两个方案已经很简单了,和框架没什么关系吧,装好扩展就可以用了,扩展和框架还是有区别的。
hphper
hphper
我就只需要一个很小的功能,用框架啥的是不是大才小用了呢?
0
yunfound
yunfound

不推荐用PHP做Socket服务器,PHP不支持多线程,其他实现方式比较蛋疼。

还有PHP的内存回收机制也比较蛋疼。

eechen
eechen
PHP的pthreads扩展支持多线程编程.不过Swoole扩展的多线程不依赖pthreads,其线程和进程控制都是从头实现.PHP多线程示例代码: http://my.oschina.net/eechen/tweet/4687991
hphper
hphper
回复 @yunfound : 恩,我试试
yunfound
yunfound
回复 @hphper : 哈哈,换一个语言。要么就找一些成熟的实用于PHP的Socket框架吧。
hphper
hphper
那该怎么办呢?之前用pusher.js插件,得连接人家的服务器,还在国外,很慢
0
聽雨人
聽雨人
稳妥点,还是别用PHP做这个吧
0
netstu_
netstu_
用 php 的话,建议使用 swoole。要么就用 socket.io
0
hphper
hphper
求大家指点 我问题补充里的疑问,谢谢
hphper
hphper
回复 @酒逍遥 : 数据短的话,解包没问题,多了的话就有问题了
hphper
hphper
回复 @酒逍遥 : 应该是解包问题吧
hphper
hphper
回复 @酒逍遥 : 这里是websocket向服务端传送数据,传过去后 socket_recv ( $sock, $buffer, 204800, 0 )里读到的buffer确实是二进制的,
酒逍遥
酒逍遥
回复 @hphper : 哦..我以前做都是直接json字符串传递..没用到二进制.二进制的话 那要看看是接收的内容不全还是解包的时候出问题了.
hphper
hphper
回复 @酒逍遥 : 是用websocket推送的,直接读取的$buffer是二进制好像,所以得uncode下
下一页
0
aliang032
aliang032
workerman 和 swoole都很不错哦,不用自己写socket,关注业务就行了
hphper
hphper
现在socket不怎么懂,想了解一点[30]
返回顶部
顶部