Skip to content

Commit ef41a68

Browse files
committed
Event loop improvements
1 parent 248b4ef commit ef41a68

File tree

5 files changed

+104
-209
lines changed

5 files changed

+104
-209
lines changed

http_api.php

Lines changed: 7 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -11,125 +11,20 @@
1111
require_once __DIR__ . '/src/WordPress/AsyncHttp/InflateStreamWrapperData.php';
1212

1313
$requests = [
14-
// new Request("https://playground.internal"),
15-
// (new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.1'),
16-
// (new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.0'),
17-
(new Request("http://127.0.0.1:3000/")) //->set_http_version('1.0'),
14+
new Request("https://playground.internal"),
15+
(new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.1'),
16+
(new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.0'),
17+
(new Request("http://127.0.0.1:3000/"))->set_http_version('1.0'),
1818
];
19-
// list($streams, $headers, $errors) = streams_send_http_requests($requests);
20-
// print_r($streams);
21-
// print_r($errors);
2219

2320
// var_dump(streams_http_response_read_bytes($streams, 1024));
2421
// Enqueuing another request here is instant and won't start the download yet.
2522
$client = new Client();
26-
$queue = $client->enqueue( $requests );
27-
var_dump($client->read_bytes($requests[0], 10, Client::READ_POLL_ANY));
28-
var_dump($client->read_bytes($requests[0], 1024, Client::READ_NON_BLOCKING));
29-
var_dump($client->read_bytes($requests[0], 1024, Client::READ_POLL_ANY));
30-
// var_dump($client->read_bytes($requests[0], 1024));
31-
32-
die();
33-
34-
// @TODO: handle wait_for_all_requested_bytes for more than content-length bytes
35-
var_dump(stream_get_contents($requests[1]->get_response()->body_stream));
36-
// var_dump($client->read_bytes($requests[1], 359, [
37-
// 'mode' => 'poll_once',
38-
// ]));
39-
// var_dump($client->read_bytes($requests[1], 359, [
40-
// 'mode' => 'poll_once',
41-
// ]));
42-
// var_dump($client->read_bytes($requests[1], 359, [
43-
// 'mode' => 'poll_once',
44-
// ]));
45-
// @TODO: poll_once should eventully mark the request as finished
46-
var_dump("----");
47-
var_dump($client->read_bytes($requests[2], 1024, [
48-
'mode' => 'return',
49-
]));
50-
var_dump($client->read_bytes($requests[2], 1024, [
51-
'mode' => 'poll_once',
52-
]));
53-
// var_dump($queue);
54-
// var_dump($queue[0]);
55-
// var_dump($client->read_bytes($requests[0], 1024, [
56-
// 'mode' => 'return',
57-
// ]));
58-
// var_dump(fread($queue[0]->get_body_stream(), 1));
59-
// var_dump(fread($queue[0]->get_body_stream(), 1));
60-
// var_dump(fread($queue[0]->get_body_stream(), 1));
61-
die();
62-
// var_dump($queue[0]);
63-
var_dump($client->read_bytes($requests[0], 186, [
64-
'mode' => 'return',
65-
]));
66-
var_dump($client->read_bytes($requests[0], 186, [
67-
'mode' => 'return',
68-
]));
69-
// var_dump($queue[0]->get_status_code());
70-
// var_dump($queue[0]->get_headers());
71-
72-
// var_dump(stream_get_contents($queue[0]->response->body_stream));
73-
die();
74-
$client = new Client();
7523
$client->set_progress_callback( function ( Request $request, $downloaded, $total ) {
76-
echo "$request->url – Downloaded: $downloaded / $total\n";
24+
// echo "$request->url – Downloaded: $downloaded / $total\n";
7725
} );
7826

79-
$requests = [
80-
new Request("https://anglesharp.azurewebsites.net/Chunked")
81-
// new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ),
82-
// new Request( "https://downloads.wordpress.org/theme/pendant.zip" ),
83-
];
8427
$queue = $client->enqueue( $requests );
85-
var_dump($queue[0]);
86-
die();
87-
// Enqueuing another request here is instant and won't start the download yet.
88-
//$streams2 = $client->enqueue( [
89-
// new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ),
90-
//] );
91-
92-
try {
93-
$client->read_bytes($requests[0], 4096);
94-
// var_dump(stream_get_contents($streams1[0]));
95-
} catch (Exception $e) {
96-
echo $e->getMessage();
97-
}
98-
print_r($client);
99-
print_r(stream_context_get_options($streams1[0]));
100-
// Stream a single file, while streaming all the files
101-
// file_put_contents( 'output-round1-0.zip', stream_get_contents( $streams1[0] ) );
102-
//file_put_contents( 'output-round1-1.zip', stream_get_contents( $streams1[1] ) );
103-
die();
104-
// Initiate more HTTPS requests
105-
$streams3 = $client->enqueue( [
106-
new Request( "https://downloads.wordpress.org/plugin/akismet.4.1.12.zip" ),
107-
new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ),
108-
new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ),
109-
] );
110-
111-
// Download the rest of the files. Foreach() seems like downloading things
112-
// sequentially, but we're actually streaming all the files in parallel.
113-
$streams = array_merge( $streams2, $streams3 );
114-
foreach ( $streams as $k => $stream ) {
115-
file_put_contents( 'output-round2-' . $k . '.zip', stream_get_contents( $stream ) );
116-
}
117-
118-
echo "Done! :)";
119-
120-
// ----------------------------
121-
//
122-
// Previous explorations:
123-
124-
// Non-blocking parallel processing – the fastest method.
125-
//while ( $results = sockets_http_response_read_bytes( $streams, 8096 ) ) {
126-
// foreach ( $results as $k => $chunk ) {
127-
// file_put_contents( 'output' . $k . '.zip', $chunk, FILE_APPEND );
128-
// }
129-
//}
13028

131-
// Blocking sequential processing – the slowest method.
132-
//foreach ( $streams as $k => $stream ) {
133-
// stream_set_blocking( $stream, 1 );
134-
// file_put_contents( 'output' . $k . '.zip', stream_get_contents( $stream ) );
135-
//}
29+
$client->wait_for_headers($requests[3]);
30+
var_dump($requests[3]->get_response()->get_headers());

src/WordPress/AsyncHttp/Client.php

Lines changed: 87 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,13 @@
8787
* **Supports custom request headers and body**
8888
*/
8989
class Client {
90-
protected $concurrency = 2;
91-
protected $requests;
92-
protected $onProgress;
93-
protected $is_processing_queue = false;
90+
91+
const STREAM_SELECT_READ = 1;
92+
const STREAM_SELECT_WRITE = 2;
93+
94+
const READ_NON_BLOCKING = 'READ_NON_BLOCKING';
95+
const READ_POLL_ANY = 'READ_POLL_ANY';
96+
const READ_POLL_ALL = 'READ_POLL_ALL';
9497

9598
/**
9699
* Microsecond is 1 millionth of a second.
@@ -104,6 +107,11 @@ class Client {
104107
*/
105108
const NONBLOCKING_TIMEOUT_MICROSECONDS = 0.05 * self::MICROSECONDS_TO_SECONDS;
106109

110+
protected $concurrency = 2;
111+
protected $requests;
112+
protected $onProgress;
113+
protected $is_processing_queue = false;
114+
107115
public function __construct() {
108116
$this->requests = [];
109117
$this->onProgress = function () {
@@ -153,29 +161,6 @@ public function enqueue( $requests ) {
153161
return $enqueued_streams;
154162
}
155163

156-
/**
157-
* Returns the response stream associated with the given Request object.
158-
* Reading from that stream also runs this Client's event loop.
159-
*
160-
* @param Request $request
161-
*
162-
* @return resource
163-
*/
164-
public function get_stream( $request ) {
165-
throw new Exception('Not implemented yet');
166-
// if ( ! isset( $this->requests[ $request ] ) ) {
167-
// $this->enqueue_request( $request );
168-
// }
169-
170-
// if ( $this->queue_needs_processing ) {
171-
// $this->process_queue();
172-
// }
173-
174-
// StreamWrapper::create_resource(
175-
// new StreamData($request, $client)
176-
// )
177-
}
178-
179164
/**
180165
* @param \WordPress\AsyncHttp\Request $request
181166
*/
@@ -184,10 +169,6 @@ protected function enqueue_request( $request ) {
184169
return $request->get_response();
185170
}
186171

187-
188-
const READ_NON_BLOCKING = 'READ_NON_BLOCKING';
189-
const READ_POLL_ANY = 'READ_POLL_ANY';
190-
const READ_POLL_ALL = 'READ_POLL_ALL';
191172
/**
192173
* Reads $length bytes from the given request while also running
193174
* non-blocking event loop operations.
@@ -218,21 +199,74 @@ public function read_bytes( $request, $length, $mode = self::READ_NON_BLOCKING )
218199
) {
219200
break;
220201
}
221-
} while ($this->event_loop_pass());
202+
} while ($this->event_loop_tick());
222203

223204
return $buffered;
224205
}
225206

226-
public function event_loop_pass()
207+
public function wait_for_headers( $request )
227208
{
228-
if(count($this->get_concurrent_requests()) === 0) {
209+
if(!in_array($request, $this->requests, true)) {
210+
trigger_error('Request not found in the client', E_USER_WARNING);
229211
return false;
230212
}
231-
echo "event_loop_pass\n";
232-
foreach($this->requests as $request) {
233-
echo "request state: $request->state\n";
213+
214+
while($this->event_loop_tick() && $request->state !== Request::STATE_FAILED) {
215+
if($request->get_response()->get_headers()) {
216+
return true;
217+
}
234218
}
235-
sleep(1);
219+
220+
return false;
221+
}
222+
223+
public function wait_for_response_body_stream( $request )
224+
{
225+
if(!in_array($request, $this->requests, true)) {
226+
trigger_error('Request not found in the client', E_USER_WARNING);
227+
return false;
228+
}
229+
230+
while($this->event_loop_tick() && $request->state !== Request::STATE_FAILED) {
231+
if($request->get_response()->decoded_response_stream) {
232+
return true;
233+
}
234+
}
235+
236+
return false;
237+
}
238+
239+
/**
240+
* Returns the response stream associated with the given Request object.
241+
* Reading from that stream also runs this Client's event loop.
242+
*
243+
* @param Request $request
244+
*
245+
* @return resource|bool
246+
*/
247+
// public function get_response_stream( $request ) {
248+
// if(!in_array($request, $this->requests, true)) {
249+
// trigger_error('Request not found in the client', E_USER_WARNING);
250+
// return false;
251+
// }
252+
253+
// if(
254+
// $request->state !== Request::STATE_RECEIVING_BODY &&
255+
// $request->state !== Request::STATE_FINISHED
256+
// ) {
257+
// trigger_error('Request is not in a state where the response stream is available', E_USER_WARNING);
258+
// return false;
259+
// }
260+
261+
// return $request->get_response()->event_loop_decoded_response_stream;
262+
// }
263+
264+
public function event_loop_tick()
265+
{
266+
if(count($this->get_concurrent_requests()) === 0) {
267+
return false;
268+
}
269+
236270
static::open_nonblocking_http_sockets(
237271
$this->get_concurrent_requests( Request::STATE_ENQUEUED )
238272
);
@@ -275,16 +309,14 @@ protected function get_concurrent_requests($states=null)
275309
Request::STATE_RECEIVING_BODY,
276310
Request::STATE_RECEIVED,
277311
]);
312+
$available_slots = $this->concurrency - count($processed_requests);
278313
$enqueued_requests = $this->get_requests(Request::STATE_ENQUEUED);
279-
$backfill_enqueued_nb = min(
280-
count($enqueued_requests),
281-
$this->concurrency - count($processed_requests)
282-
);
283-
284-
for($i = 0; $i < $backfill_enqueued_nb; $i++) {
314+
for($i = 0; $i < $available_slots; $i++) {
315+
if(!isset($enqueued_requests[$i])) {
316+
break;
317+
}
285318
$processed_requests[] = $enqueued_requests[$i];
286319
}
287-
288320
if($states !== null) {
289321
$processed_requests = static::filter_requests($processed_requests, $states);
290322
}
@@ -476,17 +508,13 @@ static private function receive_response_headers( $requests ) {
476508
$response->statusMessage = $parsed['status']['message'];
477509
$response->protocol = $parsed['status']['protocol'];
478510

479-
$content_length = $response->get_header('content-length');
480-
$transfer_encoding = $response->get_header('transfer-encoding');
481-
// If we're expecting a body, let's start receiving it.
482-
if(
483-
$transfer_encoding === 'chunked' ||
484-
($content_length !== null && (int) $content_length > 0)
485-
) {
486-
$request->state = Request::STATE_RECEIVING_BODY;
487-
} else {
511+
// If we're being redirected, we don't need to wait for the body.
512+
if($response->statusCode >= 300 && $response->statusCode < 400) {
488513
$request->state = Request::STATE_RECEIVED;
514+
break;
489515
}
516+
517+
$request->state = Request::STATE_RECEIVING_BODY;
490518
break;
491519
}
492520
}
@@ -498,6 +526,9 @@ static private function receive_response_headers( $requests ) {
498526
* @param array $requests An array of requests.
499527
*/
500528
private function receive_response_body( $requests ) {
529+
// @TODO: Assume body is received when either
530+
// * Content-Length is reached
531+
// * The last chunk in Transfer-Encoding: chunked is received
501532
foreach (static::stream_select($requests, static::STREAM_SELECT_READ) as $request) {
502533
$response = $request->get_response();
503534
if (!$response->decoded_response_stream) {
@@ -700,17 +731,15 @@ static private function filter_requests( array $requests, $states ) {
700731
$states = [$states];
701732
}
702733
$results = [];
703-
foreach($requests as $k => $request) {
734+
foreach($requests as $request) {
704735
if(in_array($request->state, $states)) {
705-
$results[$k] = $request;
736+
$results[] = $request;
706737
}
707738
}
708739
return $results;
709740
}
710741

711742

712-
const STREAM_SELECT_READ = 1;
713-
const STREAM_SELECT_WRITE = 2;
714743
static private function stream_select( $requests, $mode ) {
715744
if(empty($requests)) {
716745
return [];

src/WordPress/AsyncHttp/Request.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,6 @@ public function set_error($error)
100100
$this->error = $error;
101101
$this->state = self::STATE_FAILED;
102102

103-
$this->response->error = $error;
104-
$this->response->state = self::STATE_FAILED;
105-
106103
if($this->http_socket) {
107104
fclose($this->http_socket);
108105
$this->http_socket = null;

src/WordPress/AsyncHttp/Response.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public function get_protocol()
4242
}
4343

4444
public function get_header( $name ) {
45-
if($this->headers === null) {
45+
if(false === $this->get_headers()) {
4646
return false;
4747
}
4848

@@ -51,6 +51,10 @@ public function get_header( $name ) {
5151

5252
public function get_headers()
5353
{
54+
if(!$this->headers) {
55+
return false;
56+
}
57+
5458
return $this->headers;
5559
}
5660

0 commit comments

Comments
 (0)