在前面的章节2.3介绍了协程的原理及PHP用户空间如何实现协程,本节将重点介绍MSF框架的协程如何使用,同时会剖析PHP工程级协程调度器的实现及调度算法。
至于为什么使用协程,可能大家看法还不统一,这里举例来说明,我们实现一个接口,此接口内包含:
2次http请求其他接口A、B,
1次Redis请求C
他们之间的依赖关系为:((A && B) || C)
<?php
/**
* 协程示例控制器
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
use PG\MSF\Client\Http\Client;
class CoroutineTest extends Controller
{
/**
* 异步回调的方式实现(A && B) || C
*/
public function actionCallBackMode()
{
$client = new \swoole_redis;
$client->connect('127.0.0.1', 6379, function (\swoole_redis $client, $result) {
$client->get('apiCacheForABCallBack', function (\swoole_redis $client, $result) {
if (!$result) {
swoole_async_dns_lookup("www.baidu.com", function($host, $ip) use ($client) {
$cli = new \swoole_http_client($ip, 443, true);
$cli->setHeaders([
'Host' => $host,
]);
$apiA = "";
$cli->get('/', function ($cli) use ($client, $apiA) {
$apiA = $cli->body;
swoole_async_dns_lookup("www.qiniu.com", function($host, $ip) use ($client, $apiA) {
$cli = new \swoole_http_client($ip, 443, true);
$cli->setHeaders([
'Host' => $host,
]);
$apiB = "";
$cli->get('/', function ($cli) use ($client, $apiA, $apiB) {
$apiB = $cli->body;
if ($apiA && $apiB) {
$client->set('apiCacheForABCallBack', $apiA . $apiB, function (\swoole_redis $client, $result) {});
$this->outputJson($apiA . $apiB);
} else {
$this->outputJson('', 'error');
}
});
});
});
});
} else {
$this->outputJson($result);
}
});
});
}
/**
* 协程的方式实现(A && B) || C
*/
public function actionCoroutineMode()
{
// 从Redis获取get apiCacheForABCoroutine
$response = yield $this->getRedisPool('tw')->get('apiCacheForABCoroutine');
if (!$response) {
// 从远程拉取数据
$request = [
'https://www.baidu.com/',
'https://www.qiniu.com/',
];
/**
* @var Client $client
*/
$client = $this->getObject(Client::class);
$results = yield $client->goConcurrent($request);
// 写入redis
$this->getRedisPool('tw')->set('apiCacheForABCoroutine', $results[0]['body'] . $results[1]['body'])->break();
$response = $results[0]['body'] . $results[1]['body'];
}
// 响应结果
$this->outputJson($response);
}
}
示例代码:
https://github.com/pinguo/php-msf-demo/app/Controllers/CoroutineTest.php
http://127.0.0.1:8000/CoroutineTest/CoroutineMode
http://127.0.0.1:8000/CoroutineTest/CallBackMode
- Swoole实现了异步非阻塞的IO模型它是高性能的基础,但是书写逻辑代码非常复杂,需要多层嵌套回调,阅读和维护困难
- 基于Yield的协程可以用同步的代码编写方式,达到异步IO的效果和性能,避免了传统异步回调所带来多层回调而导致代码无法维护
<?php
/**
* 协程示例控制器
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
use PG\MSF\Client\Http\Client;
class CoroutineTest extends Controller
{
// 略...
// 姿势一
public function actionNested()
{
$result = [];
/**
* @var Client $client1
*/
$client1 = $this->getObject(Client::class, ['http://www.baidu.com/']);
yield $client1->goDnsLookup();
/**
* @var Client $client2
*/
$client2 = $this->getObject(Client::class, ['http://www.qq.com/']);
yield $client2->goDnsLookup();
$result[] = yield $client1->goGet('/');
$result[] = yield $client2->goGet('/');
$this->outputJson([strlen($result[0]['body']), strlen($result[1]['body'])]);
}
// 姿势二
public function actionUnNested()
{
$result = [];
/**
* @var Client $client1
*/
$client1 = $this->getObject(Client::class, ['http://www.baidu.com/']);
$dns[] = $client1->goDnsLookup();
/**
* @var Client $client2
*/
$client2 = $this->getObject(Client::class, ['http://www.qq.com/']);
$dns[] = $client2->goDnsLookup();
yield $dns[0];
yield $dns[1];
$req[] = $client1->goGet('/');
$req[] = $client2->goGet('/');
$result[] = yield $req[0];
$result[] = yield $req[1];
$this->outputJson([strlen($result[0]['body']), strlen($result[1]['body'])]);
}
}
两种姿势看上去都使用了协程的yield关键字;姿势一由于协程在调度时,第一个yield没有接收数据时,程序控制流就不会往下继续执行,从而退化为串行请求第三方接口;姿势二由于DNS查询是异步的,就同时进行多个DNS查询,通过yield关键获取协程执行的结果,再同时异步请求多个接口,最后通过yield关键字获取接口响应结果。
通过两种姿势的对比,使用php-msf协程yield关键字,很好的解决了异步IO回调的写法,让程序看上去是同步执行的,yield起来了接收数据的作用,这也是前面所说的yield具有双向通信的最要特性。
姿势一协程调度过程
send http dns1 lookup->rev http dns1 lookup->send http dns2 lookup->rev http dns2 lookup->send get1->rev get1->send get2-> rev get2
姿势二协程调度过程
send http dns1 lookup->send http dns2 lookup->rev http dns1 lookup->rev http dns2 lookup->send get1->send get2->rev get1->rev get2
需要特别注意的是: 如果某个异步IO操作不需要获取返回值,如设置缓存set key val,框架允许不加yield关键字,需要调用break()方法,这可以大大的提升接口的并发能力
通过上述的示例代码,我们不难得出MSF协程的使用方式,通常情况下,我们使用异步客户端发送请求,使用yield关键字获取协程任务的运行结果。
<?php
function Http()
{
// 获取Http客户端
$client = $this->getObject(\PG\MSF\Client\Http\Client::class);
// 单个接口GET请求(自动完成DNS查询->Http请求发送->获取结果)
$res = yield $client->goSingleGet('http://www.baidu.com/');
$client = $this->getObject(\PG\MSF\Client\Http\Client::class);
// 单个接口POST请求(自动完成DNS查询->Http请求发送->获取结果)
$res = yield $client->goSinglePost('http://www.baidu.com/');
// 多个接口同时请求(自动完成DNS查询->Http请求发送->获取结果)
$client = $this->getObject(\PG\MSF\Client\Http\Client::class);
$res = yield $client->goConcurrent(['http://www.baidu.com/', 'http://www.qq.com']);
$client = $this->getObject(\PG\MSF\Client\Http\Client::class);
// 手工DNS查询
yield $client->goDnsLookup('http://www.baidu.com/');
// 手工发送HTTP请求
$res = yield $client->goGet('/');
}
<?php
function Task() {
$idAllloc = $this->getObject(Idallloc::class);
$newId = yield $idAllloc->getNextId();
}
<?php
function RedisCoroutine() {
$sendRedisGet = $this->getRedisPool('tw')->get('apiCacheForABCoroutine');
$cache = yield $sendRedisGet;
}
MSF协程基于Generator/Yield,IO事件触发,是自主研发的调度器,它的核心思想是发生任何异步IO操作之后,程序的控制流就切换到其他请求,待异步IO可读之后,由程序自行调用调度器接口,进行一次调度,并响应请求。MSF协程完全抛弃了定时器每隔一定时间轮询任务的方案,使得调度器的性能更加接近原生的异步回调方式。
- Generator/Yield
- SplStack
- taskMap
- 协程独立堆栈
- 支持嵌套
- 全调用链路异常捕获
- 调度统计
- 多入口调度