Skip to content

Commit 121e960

Browse files
committed
php#53: + 024-stream_select_remote_disconnect.phpt
1 parent 02cecd9 commit 121e960

File tree

3 files changed

+94
-71
lines changed

3 files changed

+94
-71
lines changed

main/network_async.c

Lines changed: 74 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -230,12 +230,12 @@ ZEND_API int network_async_await_stream_socket(php_netstream_data_t *sock, short
230230
}
231231

232232
// Create or reuse event handle
233-
if (sock->event_handle == NULL) {
234-
sock->event_handle = ZEND_ASYNC_NEW_SOCKET_EVENT(
233+
if (sock->poll_event == NULL) {
234+
sock->poll_event = ZEND_ASYNC_NEW_SOCKET_EVENT(
235235
sock->socket, poll2_events_to_async(events)
236236
);
237237

238-
if (UNEXPECTED(EG(exception) != NULL || sock->event_handle == NULL)) {
238+
if (UNEXPECTED(EG(exception) != NULL || sock->poll_event == NULL)) {
239239
errno = ENOMEM;
240240
return -1;
241241
}
@@ -257,7 +257,7 @@ ZEND_API int network_async_await_stream_socket(php_netstream_data_t *sock, short
257257
// Register the event
258258
zend_async_resume_when(
259259
coroutine,
260-
&sock->event_handle->base,
260+
&sock->poll_event->base,
261261
false,
262262
socket_await_callback_resolve,
263263
NULL
@@ -782,17 +782,41 @@ ZEND_API int php_select_async(php_socket_t max_fd, fd_set *rfds, fd_set *wfds, f
782782

783783
typedef struct async_stream_callback_s {
784784
zend_coroutine_event_callback_t callback;
785-
io_descriptor_t fd;
786785
php_stream *stream;
787786
zend_async_poll_event_t *event;
788787
async_poll_event events;
789788
zval key; // Original array key (string or numeric)
790789
zval *read_streams; // Reference to read streams result array
791790
zval *write_streams; // Reference to write streams result array
792791
zval *except_streams; // Reference to except streams result array
793-
struct async_stream_callback_s *next; // For linked list
792+
zend_async_event_callback_dispose_fn prev_dispose;
794793
} async_stream_callback_t;
795794

795+
/**
796+
* Custom dispose function to clean up stream references and keys.
797+
*/
798+
static void async_stream_callback_dispose(zend_async_event_callback_t *base, zend_async_event_t *event)
799+
{
800+
async_stream_callback_t *callback = (async_stream_callback_t *)base;
801+
802+
if (callback->prev_dispose) {
803+
zval_ptr_dtor(&callback->key);
804+
ZVAL_UNDEF(&callback->key);
805+
806+
// Release php stream reference
807+
if (callback->stream) {
808+
zval z_stream;
809+
php_stream_to_zval(callback->stream, &z_stream);
810+
callback->stream = NULL;
811+
zval_ptr_dtor(&z_stream);
812+
}
813+
814+
callback->prev_dispose(base, event);
815+
} else {
816+
return;
817+
}
818+
}
819+
796820
static zend_always_inline void add_stream_to_array(zval *array, zval *key, zval *stream_zval)
797821
{
798822
if (array == NULL) {
@@ -803,14 +827,17 @@ static zend_always_inline void add_stream_to_array(zval *array, zval *key, zval
803827
SEPARATE_ARRAY(array);
804828
}
805829

806-
zval *dest_elem;
830+
zval *destination = NULL;
831+
807832
if (Z_TYPE_P(key) == IS_STRING) {
808-
dest_elem = zend_hash_add(Z_ARR_P(array), Z_STR_P(key), stream_zval);
833+
destination = zend_hash_add(Z_ARR_P(array), Z_STR_P(key), stream_zval);
809834
} else {
810-
dest_elem = zend_hash_index_add(Z_ARR_P(array), Z_LVAL_P(key), stream_zval);
835+
destination = zend_hash_index_add(Z_ARR_P(array), Z_LVAL_P(key), stream_zval);
811836
}
812837

813-
if (dest_elem) zval_add_ref(dest_elem);
838+
if (destination) {
839+
zval_add_ref(stream_zval);
840+
}
814841
}
815842

816843
static void async_stream_callback_resolve(
@@ -834,15 +861,15 @@ static void async_stream_callback_resolve(
834861
zval stream_zval;
835862
php_stream_to_zval(stream_callback->stream, &stream_zval);
836863

837-
if (poll_event->triggered_events & ASYNC_READABLE) {
864+
if (stream_callback->read_streams != NULL && poll_event->triggered_events & ASYNC_READABLE) {
838865
add_stream_to_array(stream_callback->read_streams, &stream_callback->key, &stream_zval);
839866
}
840867

841-
if (poll_event->triggered_events & ASYNC_WRITABLE) {
868+
if (stream_callback->write_streams != NULL && poll_event->triggered_events & ASYNC_WRITABLE) {
842869
add_stream_to_array(stream_callback->write_streams, &stream_callback->key, &stream_zval);
843870
}
844871

845-
if (poll_event->triggered_events & ASYNC_PRIORITIZED) {
872+
if (stream_callback->except_streams != NULL && poll_event->triggered_events & ASYNC_PRIORITIZED) {
846873
add_stream_to_array(stream_callback->except_streams, &stream_callback->key, &stream_zval);
847874
}
848875

@@ -860,8 +887,10 @@ static void async_stream_callback_resolve(
860887
/**
861888
* Optimized select() for PHP stream arrays using event reuse
862889
*/
863-
static zend_always_inline bool process_stream_array(zval *streams, async_poll_event events, zend_coroutine_t *coroutine,
864-
zval *read_streams, zval *write_streams, zval *except_streams, int *result) {
890+
static zend_always_inline bool process_stream_array(
891+
zval *streams, async_poll_event events, zend_coroutine_t *coroutine,
892+
zval *read_streams, zval *write_streams, zval *except_streams, int *result)
893+
{
865894

866895
if (streams == NULL || Z_TYPE_P(streams) != IS_ARRAY) {
867896
return true;
@@ -873,87 +902,69 @@ static zend_always_inline bool process_stream_array(zval *streams, async_poll_ev
873902
zend_ulong num_key;
874903

875904
ZEND_HASH_FOREACH_KEY_VAL(Z_ARR_P(streams), num_key, key, z_stream) {
905+
876906
ZVAL_DEREF(z_stream);
907+
877908
php_stream_from_zval_no_verify(stream, z_stream);
878909

879-
if (stream == NULL) {
910+
if (UNEXPECTED(stream == NULL)) {
880911
return false;
881912
}
882913

883914
// Try to get async event handle from socket streams first
884-
zend_async_poll_event_t *event_handle = NULL;
885-
const int stream_result = php_stream_set_option(stream, PHP_STREAM_OPTION_ASYNC_EVENT_HANDLE, events, &event_handle);
886-
const bool is_socket = stream_result == PHP_STREAM_OPTION_RETURN_OK && event_handle != NULL;
915+
zend_async_poll_event_t *poll_event = NULL;
887916

888-
io_descriptor_t io_descriptor;
917+
php_stream_set_option(
918+
stream, PHP_STREAM_OPTION_ASYNC_EVENT_HANDLE, events, &poll_event
919+
);
889920

890-
if (UNEXPECTED(FAILURE == php_stream_cast(stream,
891-
PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&io_descriptor, 1)
892-
|| io_descriptor.fd == INVALID_IO_DESCRIPTOR)) {
893-
zend_throw_error(NULL, "Failed to cast stream to I/O descriptor");
921+
if (UNEXPECTED(EG(exception))) {
922+
*result = -1;
923+
return false;
924+
} else if (UNEXPECTED(poll_event == NULL)) {
925+
zend_throw_error(NULL, "Stream does not support async I/O");
894926
*result = -1;
895-
896927
return false;
897-
}
898-
899-
io_descriptor.type = is_socket ? IO_DESCRIPTOR_SOCKET : IO_DESCRIPTOR_FD;
900-
901-
// If stream has no event handle, create it.
902-
if (event_handle == NULL) {
903-
if (io_descriptor.type == IO_DESCRIPTOR_SOCKET) {
904-
event_handle = ZEND_ASYNC_NEW_SOCKET_EVENT(io_descriptor.socket, events);
905-
} else {
906-
event_handle = ZEND_ASYNC_NEW_POLL_EVENT(io_descriptor.fd, 0, events);
907-
}
908-
909-
if (UNEXPECTED(EG(exception) != NULL)) {
910-
*result = -1;
911-
return false;
912-
}
913-
914-
// We add the IO descriptor event to the EventLoop without waiting
915-
// for the Waker to initiate work, in order to save on system calls.
916-
event_handle->base.start(&event_handle->base);
917-
918-
// And save it
919-
if (UNEXPECTED(PHP_STREAM_OPTION_RETURN_OK != php_stream_set_option(
920-
stream, PHP_STREAM_OPTION_ASYNC_EVENT_HANDLE, events, &event_handle)
921-
)) {
922-
923-
zend_throw_error(NULL, "Failed to set async event handle on stream");
924-
*result = -1;
925-
return false;
926-
}
927928
}
928929

929930
async_stream_callback_t *callback = ecalloc(1, sizeof(async_stream_callback_t));
930931
callback->callback.coroutine = coroutine;
931932
callback->callback.base.ref_count = 1;
932933
callback->callback.base.callback = async_stream_callback_resolve;
933934
callback->stream = stream;
934-
callback->event = event_handle;
935-
callback->fd = io_descriptor;
935+
callback->event = poll_event;
936936
callback->events = events;
937+
// Save references to result arrays
938+
callback->read_streams = read_streams;
939+
callback->write_streams = write_streams;
940+
callback->except_streams = except_streams;
937941

938942
// Save original array key
939943
if (key) {
940944
ZVAL_STR_COPY(&callback->key, key);
941945
} else {
942946
ZVAL_LONG(&callback->key, num_key);
943947
}
944-
945-
// Save references to result arrays
946-
callback->read_streams = read_streams;
947-
callback->write_streams = write_streams;
948-
callback->except_streams = except_streams;
949948

950949
zend_async_resume_when(
951950
coroutine,
952-
&event_handle->base,
951+
&poll_event->base,
953952
false,
954953
NULL,
955954
&callback->callback
956955
);
956+
957+
if (UNEXPECTED(EG(exception))) {
958+
callback->callback.base.dispose(&callback->callback.base, NULL);
959+
*result = -1;
960+
return false;
961+
}
962+
963+
callback->prev_dispose = callback->callback.base.dispose;
964+
callback->callback.base.dispose = async_stream_callback_dispose;
965+
966+
Z_TRY_ADDREF_P(z_stream);
967+
957968
} ZEND_HASH_FOREACH_END();
958969

959970
if (Z_REFCOUNT_P(streams) > 1) {

main/php_network.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ struct _php_netstream_data_t {
331331
struct timeval timeout;
332332
size_t ownsize;
333333
bool nonblocking_applied;
334-
zend_async_poll_event_t *event_handle;
334+
zend_async_poll_event_t *poll_event;
335335
};
336336
typedef struct _php_netstream_data_t php_netstream_data_t;
337337
PHPAPI extern const php_stream_ops php_stream_socket_ops;

main/streams/xp_socket.c

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,9 @@ static int php_sockop_close(php_stream *stream, int close_handle)
286286
}
287287

288288
/* Cleanup async event handle before freeing socket structure */
289-
if (sock->event_handle) {
290-
sock->event_handle->base.dispose(&sock->event_handle->base);
291-
sock->event_handle = NULL;
289+
if (sock->poll_event) {
290+
sock->poll_event->base.dispose(&sock->poll_event->base);
291+
sock->poll_event = NULL;
292292
}
293293

294294
pefree(sock, php_stream_is_persistent(stream));
@@ -540,14 +540,24 @@ static int php_sockop_set_option(php_stream *stream, int option, int value, void
540540
if (!sock) {
541541
return PHP_STREAM_OPTION_RETURN_NOTIMPL;
542542
}
543+
543544
zend_async_poll_event_t **handle_ptr = (zend_async_poll_event_t **)ptrparam;
544-
if (sock->event_handle == NULL) {
545-
sock->event_handle = ZEND_ASYNC_NEW_SOCKET_EVENT(sock->socket, value);
545+
if (sock->poll_event == NULL) {
546+
sock->poll_event = ZEND_ASYNC_NEW_SOCKET_EVENT(sock->socket, value);
547+
if (UNEXPECTED(EG(exception) != NULL)) {
548+
return PHP_STREAM_OPTION_RETURN_ERR;
549+
}
550+
551+
// We add the IO descriptor event to the EventLoop without waiting
552+
// for the Waker to initiate work, in order to save on system calls.
553+
sock->poll_event->base.start(&sock->poll_event->base);
554+
546555
if (UNEXPECTED(EG(exception) != NULL)) {
547556
return PHP_STREAM_OPTION_RETURN_ERR;
548557
}
549558
}
550-
*handle_ptr = sock->event_handle;
559+
560+
*handle_ptr = sock->poll_event;
551561
return PHP_STREAM_OPTION_RETURN_OK;
552562
}
553563

@@ -942,6 +952,8 @@ static inline int php_tcp_sockop_accept(php_stream *stream, php_netstream_data_t
942952

943953
memcpy(clisockdata, sock, sizeof(*clisockdata));
944954
clisockdata->socket = clisock;
955+
clisockdata->poll_event = NULL;
956+
clisockdata->nonblocking_applied = false;
945957
#ifdef __linux__
946958
/* O_NONBLOCK is not inherited on Linux */
947959
clisockdata->is_blocked = true;
@@ -1026,7 +1038,7 @@ PHPAPI php_stream *php_stream_generic_socket_factory(const char *proto, size_t p
10261038
sock->timeout.tv_sec = FG(default_socket_timeout);
10271039
sock->timeout.tv_usec = 0;
10281040
sock->nonblocking_applied = false;
1029-
sock->event_handle = NULL;
1041+
sock->poll_event = NULL;
10301042

10311043
/* we don't know the socket until we have determined if we are binding or
10321044
* connecting */

0 commit comments

Comments
 (0)