开篇
基于我上篇关于学习的规划,最近给自己的学习计划就是 Swoole 。然后做一些有趣的 demo。毕竟平常开发除了用用第三方队列,其他时间都是在写同步阻塞的代码。这篇文章基于 Websocket 协议,利用了 Swoole 中的 异步任务 Task 以及 毫秒精度的 Timer 定时器。这些知识点我会在后面的学习中指出。至于 Swoole 的一些介绍,例如启动时开启了哪些进程,线程,他们各自的职责,它的运行流程,我想没有比官方介绍还官方的吧。这篇文章就带大家了解一下Swoole应用场景中如何基于Websocket实现弹幕。
谈谈 Websocket
对于 Websocket 协议,这位老哥 Dennis_Ritchie 这篇文章就写的很好,老司机带你用 PHP 实现 Websocket 协议,文章不能只看,要动手实验,不然你哪知道别人是不是在坑你哈哈哈,其实学习是从模仿开始的,就像社区的课程,不得不说,质量很高,我今年入门 Laravel 的时候就是买的课程,本质上不也是一个模仿学习的过程吗,最后自己再进行一些扩展,简直完美。扯远了...... , 另外为了方便下文,对于 Websocket 我统称 ws。
ws 是 H5 规范中的一个部分,为 web 应用程序客户端和服务器端提供全双工的通信方式,它也是新的一种应用层协议。通常它的表现为: ws://echo.websocket.org/?encoding=text HTTP/1.1 可以看出除了最前面的协议名和 HTTP 不同,其他看起来就是一个 url 地址。
ws 和 http 之间的联系。
ws 的目的是取代 HTTP 在双向通信场景下的使用。首先他们都位于 OSI 模型中的最高层:应用层。ws 借助 HTTP 完成连接,客户端的握手消息就是一个「普通的,带有 Upgrade 头的,HTTP Request 消息」。对于 HTTP 来说,就是一个你问我答的模式: Request/Response, 就算实现了 HTTP 的长连接,它的底层依旧是你问我答。只是保持住了长连接的一条线,是阻塞的 I/O,但是 ws 就不一样了,握手成功之后就是全双工的 TCP 通道,服务器端可以主动的发送消息给客户端。在 ws 出来之前,很多都是使用的轮训来实现实时交互一些业务场景。
Websocket 和 HTTP 相同点和不同点
相同点
都是基于应用层的协议
都使用 Request/Response 模式建立连接
在连接建立过程中,对错误的处理方式相同
都可以在网络中传输数据
不同点
ws 使用 HTTP 建立连接,但是定义了新的一系列头域,这些域在 HTTP 中不会使用
ws 的连接不能通过中间人来进行转发,必须是一个点对点的连接
ws 连接之后,通信的双方都可以随时给另一方发送数据
ws 连接之后,数据的传输是通过桢的形式发送的,不再需要 Request
弹幕
直播不用我说了吧。都懂的吧,在看直播的时候,你可以看到屏幕前各种... 好吧,暂且用一个不堪入目来形容评论 emmm。用了 ws 那就简单了。客户端发送弹幕,转交给 ws 服务器,ws 服务器做了一些处理,然后再广播给所有在这个直播间的人。也就实现了,边看直播边看评论的这一幕。当然,市面上一些直播应用,基本上在安全方面做了很多功夫,毕竟一个用于生产上的 im,背后一点都不简单。插一句,如果像是聊天室这种对应可能对应到指定的人,可以用 redis 做一些 uid 和 fd 的双向绑定即可。
效果图
既然开始了,还是先看看效果图吧,gif 文件可能有点大,加载会慢一点。然后再具体说一下实现过程。
发送弹幕 (批次)
服务端代码
<?php class WebsocketServer { protected $server; public function __construct() { $this->server = new \Swoole\WebSocket\Server('swoolefor.test', 9508); $this->server->set([ 'worker_num' => 4, 'task_worker_num' => 3, 'max_request' => 5, ]); $this->server->on('open', [$this, 'onOpen']); $this->server->on('message', [$this, 'onMessage']); $this->server->on('task', [$this, 'onTask']); $this->server->on('finish', [$this, 'onFinish']); $this->server->on('close', [$this, 'onClose']); $this->server->start(); } public function onOpen($server, $reuqest) { $this->server->task(['message' => "路人: " . $reuqest->fd . '上线了', 'type' => 1]); } public function onMessage($server, $frame) { $info = json_decode($frame->data); if ($info->type == 3) { $task_id = \Swoole\Timer::tick(200, function () use ($info, $frame) { return $this->server->task(['message' => $frame->fd . ' 说' . $info->message]); }); } else { $task_id = $this->server->task(['message' => $frame->fd . ' 说' . $info->message]); } echo "任务id:{$task_id}投递成功!" . PHP_EOL; } public function onTask($server, $task_id, $from_id, $data) { foreach ($this->server->connections as $fd) { if ($this->server->isEstablished($fd)) { $this->server->push($fd, json_encode(['message' => $data['message']])); } } $this->server->finish($data); } public function onFinish($server, $task_id, $data) { echo '任务: ' . $task_id . ' 执行完毕' . PHP_EOL; } public function onClose($server, $fd) { $this->server->task(['message' => '路人: ' . $fd . ' 下线了' . PHP_EOL, 'type' => 4]); } } $demo = new WebsocketServer();
其实就是一个很简单的 demo。先看构造函数这一块:
$this->server = new \Swoole\WebSocket\Server('swoolefor.test', 9508); $this->server->set([ 'worker_num' => 4, 'task_worker_num' => 3, 'max_request' => 5, ]); $this->server->on('open', [$this, 'onOpen']); $this->server->on('message', [$this, 'onMessage']); $this->server->on('task', [$this, 'onTask']); $this->server->on('finish', [$this, 'onFinish']); $this->server->on('close', [$this, 'onClose']); $this->server->start();
首先实例化一个 ws 服务,Swoole 内置了对 ws 这个服务的支持。通过几行代码就能写出一个异步非阻塞的多进程 ws 服务。简单的介绍一下其他的,一开始的 open 事件名,它的回调是 onOpen,当 ws 客户端与服务器建立连接并完成握手后会回调此函数 。WebSocket\Server 继承自 Http\Server,所以 WebSocket\Server 也可以同时作为 HTTP 服务器。另外,使用了 WebSocket\Server 服务器,那么 onMessge 回调是必须的。即上面的 $this->server->on('message', [$this, 'onMessage'])。
我们先来看看握手这一块。
握手成功,最终服务端的响应吗是 101, 这里我主要说下在握手过程中起作用的几个 header 域:
Upgrade:upgrade 是 HTTP1.1 中用于定义转换协议的 header 域。它表示,如果服务器支持的话,客户端希望使用 > > 现有的「网络层」已经建立好的这个「连接(此处是 TCP 连接)」,切换到另外一个「应用层」(此处是 WebSocket)协议。
Connection:HTTP1.1 中规定 Upgrade 只能应用在「直接连接」中,所以带有 Upgrade 头的 HTTP1.1 消息必须含有 Connection 头,因为 Connection 头的意义就是,任何接收到此消息的人(往往是代理服务器)都要在转发此消息之前处理掉 Connection 中指定的域(不转发 Upgrade 域)。如果客户端和服务器之间是通过代理连接的,那么在发送这个握手消息之前首先要发送 CONNECT 消息来建立直接连接。
Sec-WebSocket-*:Sec-WebSocket-Version 告诉服务器所使用的 WebSocket Draft(版本协议) ,Sec-WebSocket-Key 用来发送给服务器使用(服务器会使用此字段组装成另一个 key 值放在握手返回信息里发送客户端。
Origin:作安全使用,防止跨站攻击,浏览器一般会使用这个来标识原始域。
至于底下的两个事件,其实就是我们的异步任务,这些以及上面的设置参数,我会在后续的文章中说明。所以这里的整个流程就是,不管是连接成功 onOpen,还是发送消息 onMessage, 再或者是关闭连接 onClose,我们的向所有连接的用户推送消息。$this->server->connections 就是遍历所有 ws 连接的用户,至于下面的 isEstablished 就是进一步判断是否是正确的 ws 连接,否则可能会推送失败。至于推送的操作,push($fd,$data) ,第一个参数就是客户端的 fd, 如果此连接并非 ws 客户端,那么推送将失败。第二个参数就是推送的内容,格式化了数据。第三个参数可以指名发送内容的格式,默认是文本,如果想发送二进制的格式,可以使用 WEBSOCKET_OPCODE_BINARY。然后你可以看到,我们的批量弹幕的实现:
public function onMessage($server, $frame) { $info = json_decode($frame->data); $task_id = $this->server->task(['message' => $frame->fd . ' 说' . $info->message, 'type' => $info->type]); echo "任务id:{$task_id}投递成功!" . PHP_EOL; } public function onTask($server, $task_id, $from_id, $data) { if ($data['type'] == 3) { \Swoole\Timer::tick(1000, function () use ($data) { $this->sendAll($data); }); } else { $this->sendAll($data); } }
首先,我是以客户端提交数据的 type 值来确定消息类型的,当 type=3 的时候,就是批量弹幕,那么这里我使用了 Swoole 中提供的牛逼的毫秒精度定时器,所以上面的意思就是当收到客户端的消息,每 0.2 秒投放一个队列任务,把消息广播给所有的人。至于 Timer,我也会在后续文章中以例题 + 思考题的形式介绍。如需了解更多相关资讯内容,请前往并持续关注编程学习网。
扫码二维码 获取免费视频学习资料
- 本文固定链接: http://phpxs.com/post/7701/
- 转载请注明:转载必须在正文中标注并保留原文链接
- 扫码: 扫上方二维码获取免费视频资料