前面我们花了一段时间来搭建高性能的socket服务,可以同时处理大量的连接,但这是在没有具体业务的情况下。
如果我们启用了一个单进程的server,但里面的一个业务耗时1秒,那么在这1秒内是阻塞的,后续的请求会等待,如果并发三个请求,那么三个请求的执行时间会分别昌1秒,2秒,3秒.提高并发的方法有以下几种:
1:多启动进程,提高并发数
2:优化业务,减少耗时间相当于减少阻塞时间,提高并发数
3:异步编程,避免阻塞,提高并发数
这里我们重点介绍第三种方法,以访问第三方http为例。
代码如下:
- //同步读取
- function get_data_blocking(){
- $socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6);
- fwrite($socket, "GET /sleep1.php HTTP/1.0/r/nHost: test.raventech.cn/r/nAccept: */*/r/n/r/n");
- $str = "";
- while (!feof($socket)) {
- $str .= fgets($socket, 1024);
- }
- fclose($socket);
- return $str;
- }
- //异步读取
- function get_data_unblocking(){
- $socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6);
- stream_set_blocking($socket, 0);
- fwrite($socket, "GET /sleep1.php HTTP/1.0/r/nHost: test.raventech.cn/r/nAccept: */*/r/n/r/n");
- $write = NULL;
- $except = NULL;
- while( $socket ){
- $read = array($socket);
- $num_changed_streams = stream_select($read, $write, $except, 0);
- if ( $num_changed_streams > 0 ) {
- foreach($read as $r){
- $str = fread($r,2048);
- fclose($socket);
- $socket = false;
- return $str;
- }
- }
- usleep(100);
- }
- }
- //真正的异步读取--利用server的IO复用事件来提高并发
- class Get_data_event{
- public $onMessage = null;
- private $str='';
- function __construct(&$server){
- $socket = stream_socket_client("tcp://test.xtgxiso.cn:80", $errno, $errstr, 6);
- stream_set_blocking($socket, 0);
- fwrite($socket, "GET /sleep1.php HTTP/1.0/r/nHost: test.xtgxiso.cn/r/nAccept: */*/r/n/r/n");
- $server->add_socket($socket, array($this, 'read'));
- }
- public function read($socket){
- while (1) {
- $buffer = fread($socket, 1024);
- if ($buffer === '' || $buffer === false) {
- break;
- }
- $this->str .= $buffer;
- }
- if( $this->onMessage && $this->str ) {
- call_user_func($this->onMessage, $this->str);
- }
- $this->str = '';
- return false;
- }
- }
- /**
- * 单进程IO复用select
- */
- class Xtgxiso_server
- {
- public $socket = false;
- public $master = array();
- public $onConnect = null;
- public $onMessage = null;
- public $other_socket_callback = array();
- function __construct($host="0.0.0.0",$port=1215)
- {
- $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
- if (!$this->socket) die($errstr."--".$errno);
- stream_set_blocking($this->socket,0);
- $id = (int)$this->socket;
- $this->master[$id] = $this->socket;
- }
- public function add_socket($socket,$callback){
- $id = (int)$socket;
- $this->master[$id] = $socket;
- $this->other_socket_callback[$id] = $callback;
- }
- public function run(){
- $read = $this->master;
- $receive = array();
- echo "start run.../n";
- while ( 1 ) {
- $read = $this->master;
- //echo "waiting.../n";
- $mod_fd = @stream_select($read, $_w = NULL, $_e = NULL, 60);
- if ($mod_fd === FALSE) {
- break;
- }
- foreach ( $read as $k => $v ) {
- $id = (int)$v;
- if ( $v === $this->socket ) {
- //echo "new conn/n";
- $conn = stream_socket_accept($this->socket);
- if ($this->onConnect) {
- call_user_func($this->onConnect, $conn);
- }
- $id = (int)$conn;
- $this->master[$id] = $conn;
- } else if ( @$this->other_socket_callback[$id] ){
- call_user_func_array($this->other_socket_callback[$id], array($v));
- } else {
- //echo "read data/n";
- if ( !isset($receive[$k]) ){
- $receive[$k]="";
- }
- $buffer = fread($v, 1024);
- //echo $buffer."/n";
- if ( strlen($buffer) === 0 ) {
- if ( $this->onClose ){
- call_user_func($this->onClose,$v);
- }
- fclose($v);
- $id = (int)$v;
- unset($this->master[$id]);
- } else if ( $buffer === FALSE ) {
- if ( $this->onClose ){
- call_user_func($this->onClose, $this->master[$key_to_del]);
- }
- fclose($v);
- $id = (int)$v;
- unset($this->master[$id]);
- } else {
- $pos = strpos($buffer, "/r/n/r/n");
- if ( $pos === false) {
- $receive[$k] .= $buffer;
- //echo "received:".$buffer.";not all package,continue recdiveing/n";
- }else{
- $receive[$k] .= trim(substr ($buffer,0,$pos+4));
- $buffer = substr($buffer,$pos+4);
- if($this->onMessage) {
- call_user_func($this->onMessage,$v,$receive[$k]);
- }
- $receive[$k]='';
- }
- }
- }
- }
- usleep(10000);
- }
- }
- }
- $server = new Xtgxiso_server();
- $server->onConnect = function($conn){
- echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "/n";
- };
- $server->onMessage = function($conn,$msg) use ( $server ) {
- /*
- $respone ="";//响应内容
- $respone = "HTTP/1.1 200 OK/r/n";
- $respone .= "Server: openresty/r/n";
- $respone .= "Content-Type: text/html; charset=utf-8/r/n";
- $body = time().rand(111111,999999);
- $len = strlen($body);
- $respone .= "Content-Length:$len/r/n";
- $respone .= "Connection: close/r/n";
- $respone .= "/r/n$body/r/n/r/n";
- echo "onMessage --" . $msg . "/n";
- */
- //同步读取
- //$respone = get_data_blocking();
- //fwrite($conn,$respone);
- //异步读取
- //$respone = get_data_unblocking();
- //fwrite($conn,$respone);
- //真正异步
- $data = new Get_data_event($server);
- $data->onMessage = function($str) use($conn){
- fwrite($conn,$str);
- };
- };
- $server->onClose = function($conn){
- echo "onClose --" . "/n";
- };
- $server->run();
第三方服务sleep1.php的代码比较简单:
- sleep(1);//模拟耗时
- echo "OK";
通过以上代码示例,我们分别注释运行 同步读取,异步读取,真正异步,来观察server的并发.测试方法可以写个test.html来模拟三个并发.
- <script src="http://127.0.0.1:1215/?id=1"></script>
- <script src="http://127.0.0.1:1215/?id=2"></script>
- <script src="http://127.0.0.1:1215/?id=3"></script>
通过测试发现,真正异步的是并发的,每个请求耗时1秒,这样我们总算明白什么是真正的非阻塞异步编程了,关键就在共用IO复用.
新闻热点
疑难解答