加入收藏 | 设为首页 | 会员中心 | 我要投稿 PHP编程网 - 黄冈站长网 (http://www.0713zz.com/)- 数据应用、建站、人体识别、智能机器人、语音技术!
当前位置: 首页 > 站长学院 > PHP教程 > 正文

php socket服务的模型以及完成多进程IO复用libevent

发布时间:2022-02-24 06:20:02 所属栏目:PHP教程 来源:互联网
导读:端口复用技术,这样就可以很好的解决惊群问题和stream_socket_server性能瓶颈的问题. /** * 多进程IO复用libevent * 同时处理多个连接 * 端口复用---建议php7 */ class Xtgxiso_server { public $socket = false; public $master = array(); public $onCon
  端口复用技术,这样就可以很好的解决惊群问题和stream_socket_server性能瓶颈的问题.
 
  /**
   * 多进程IO复用libevent
   * 同时处理多个连接
   * 端口复用---建议php7
   */
  class Xtgxiso_server
  {
      public $socket = false;
      public $master = array();
      public $onConnect = null;
      public $onMessage = null;
      public $onClose = null;
      public $process_num = 2;
      private $pids = array();
      public $receive = array();
      private  $host='127.0.0.1';
      private $port = 1215;
  
      function __construct($host="0.0.0.0",$port=1215){
          //产生子进程分支
          $pid = pcntl_fork();
          if ($pid == -1) {
              die("could not fork"); //pcntl_fork返回-1标明创建子进程失败
          } else if ($pid) {
              exit(); //父进程中pcntl_fork返回创建的子进程进程号
          } else {
              // 子进程pcntl_fork返回的时0
          }
          // 从当前终端分离
          if (posix_setsid() == -1) {
              die("could not detach from terminal");
          }
          umask(0);
          $this->host = $host;
          $this->port = $port;
      }
  
      private function start_worker_process(){
          $pid = pcntl_fork();
          switch ($pid) {
              case -1:
                  echo "fork error : {$i} /r/n";
                  exit;
              case 0:
                  $context_option['socket']['so_reuseport'] = 1;
                  $context = stream_context_create($context_option);
                  $this->socket = stream_socket_server("tcp://".$this->host.":".$this->port, $errno, $errstr,STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,$context);
                  if (!$this->socket) die($errstr."--".$errno);
                  stream_set_blocking($this->socket,0);
                  $id = (int)$this->socket;
                  $this->master[$id] = $this->socket;
                  $base = event_base_new();
                  $event = event_new();
                  event_set($event, $this->socket, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base);
                  event_base_set($event, $base);
                  event_add($event);
                  echo   posix_getpid()." start run.../n";
                  event_base_loop($base);
              default:
                  $this->pids[$pid] = $pid;
                  break;
          }
      }
  
      public function run(){
  
          for($i = 1; $i <= $this->process_num; $i++){
              $this->start_worker_process();
          }
  
          while(1){
              foreach ($this->pids as $i => $pid) {
                  if($pid) {
                      $res = pcntl_waitpid($pid, $status,WNOHANG);
  
                      if ( $res == -1 || $res > 0 ){
                          $this->start_worker_process();
                          unset($this->pids[$pid]);
                      }
                  }
              }
              sleep(1);
          }
      }
  
      public function ev_accept($socket, $flag, $base){
          $connection = @stream_socket_accept($socket);
          echo posix_getpid()." -- accepted " . stream_socket_get_name($connection,true) . "/n";
          if ( !$connection ){
              return;
          }
          stream_set_blocking($connection, 0);
          $id = (int)$connection;
          if($this->onConnect) {
              call_user_func($this->onConnect, $connection);
          }
          $buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id);
          event_buffer_base_set($buffer, $base);
          event_buffer_timeout_set($buffer, 30, 30);
          event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
          event_buffer_priority_set($buffer, 10);
          event_buffer_enable($buffer, EV_READ | EV_PERSIST);
          $this->master[$id] = $connection;
          $this->buffer[$id] = $buffer;
          $this->receive[$id] = '';
      }
  
      function ev_read($buffer, $id)
      {
          while( 1 ) {
              $read = event_buffer_read($buffer, 3);
              if($read === '' || $read === false)
              {
                  break;
              }
              $pos = strpos($read, "/n");
              if($pos === false)
              {
                  $this->receive[$id] .= $read;
                  //echo "received:".$read.";not all package,continue recdiveing/n";
              }else{
                  $this->receive[$id] .= trim(substr ($read,0,$pos+1));
                  $read = substr($read,$pos+1);
                  if($this->onMessage)
                  {
                      call_user_func($this->onMessage,$this->master[$id],$this->receive[$id]);
   }
  $server =  new Xtgxiso_server();
  
  $server->onConnect = function($conn){
      echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "/n";
      fwrite($conn,"conn success/n");
  };
  
  $server->onMessage = function($conn,$msg){
      echo "onMessage --" . $msg . "/n";
      fwrite($conn,"received ".$msg."/n");
  }; //Cuoxin.com
  
  $server->onClose = function($conn){
      echo "onClose --" . stream_socket_get_name($conn,true) . "/n";
  };
  
  $server->run();
  经过多次服务模型的演变,基本我们实现了一个高性能的服务模型!

(编辑:PHP编程网 - 黄冈站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读