Skip to content

Commit e9e4c76

Browse files
committed
php#53: Refactoring process_stream_array
1 parent 0622bef commit e9e4c76

File tree

2 files changed

+102
-97
lines changed

2 files changed

+102
-97
lines changed

Zend/zend_async_API.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,34 @@ typedef HANDLE zend_file_descriptor_t;
6363
typedef DWORD zend_process_id_t;
6464
typedef HANDLE zend_process_t;
6565
typedef SOCKET zend_socket_t;
66+
#define INVALID_IO_DESCRIPTOR INVALID_HANDLE_VALUE
6667
#else
6768
typedef int zend_file_descriptor_t;
6869
typedef pid_t zend_process_id_t;
6970
typedef pid_t zend_process_t;
7071
typedef int zend_socket_t;
7172
#define ZEND_FD_NULL 0
73+
#define INVALID_IO_DESCRIPTOR -1;
7274
#endif
7375

76+
typedef enum {
77+
IO_DESCRIPTOR_FD = 1,
78+
IO_DESCRIPTOR_SOCKET,
79+
IO_DESCRIPTOR_PROCESS
80+
} io_descriptor_type;
81+
82+
/**
83+
* A union that can be used as a VOID* in operations that return input/output descriptors.
84+
*/
85+
typedef struct io_descriptor_s {
86+
union {
87+
zend_file_descriptor_t fd;
88+
zend_socket_t socket;
89+
zend_process_t process;
90+
};
91+
io_descriptor_type type;
92+
} io_descriptor_t;
93+
7494
/**
7595
* php_exec
7696
* If type==0, only last line of output is returned (exec)

main/network_async.c

Lines changed: 82 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -666,17 +666,13 @@ ZEND_API int php_select_async(php_socket_t max_fd, fd_set *rfds, fd_set *wfds, f
666666

667667
typedef struct async_stream_callback_s {
668668
zend_coroutine_event_callback_t callback;
669-
zval *stream_elem;
670-
php_socket_t fd;
671-
short events;
669+
io_descriptor_t fd;
670+
zval *stream;
671+
async_poll_event events;
672672
bool ready;
673-
int stream_type; // 0=read, 1=write, 2=except
674673
struct async_stream_callback_s *next; // For linked list
675674
} async_stream_callback_t;
676675

677-
// Global list of active callbacks for current select operation
678-
static async_stream_callback_t *active_callbacks = NULL;
679-
680676
static void async_stream_callback_resolve(
681677
zend_async_event_t *event, zend_async_event_callback_t *callback, void *result, zend_object *exception
682678
)
@@ -692,9 +688,10 @@ static void async_stream_callback_resolve(
692688
if (EXPECTED(coroutine->waker != NULL)) {
693689
async_stream_callback_t *stream_callback = (async_stream_callback_t *)callback;
694690

695-
// Mark this stream as ready
696691
stream_callback->ready = true;
697-
692+
// Add the event to the waker's triggered events list
693+
zend_async_waker_add_triggered_event(coroutine, event);
694+
698695
// Increment total ready count in waker result
699696
if (Z_TYPE(coroutine->waker->result) == IS_UNDEF) {
700697
ZVAL_LONG(&coroutine->waker->result, 1);
@@ -727,71 +724,60 @@ static zend_always_inline bool process_stream_array(zval *streams, async_poll_ev
727724

728725
// Try to get async event handle from socket streams first
729726
zend_async_poll_event_t *event_handle = NULL;
730-
int stream_result = php_stream_set_option(stream, PHP_STREAM_OPTION_ASYNC_EVENT_HANDLE, events, &event_handle);
731-
732-
if (stream_result == PHP_STREAM_OPTION_RETURN_OK && event_handle != NULL) {
733-
// Socket stream - use optimized path with event reuse
734-
php_socket_t fd;
735-
if (SUCCESS == php_stream_cast(stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd != -1) {
736-
async_stream_callback_t *callback = ecalloc(1, sizeof(async_stream_callback_t));
737-
callback->callback.coroutine = coroutine;
738-
callback->callback.base.ref_count = 1;
739-
callback->callback.base.callback = async_stream_callback_resolve;
740-
callback->stream_elem = elem;
741-
callback->fd = fd;
742-
callback->events = events;
743-
callback->ready = false;
744-
callback->stream_type = events; // Use actual event as type
745-
746-
// Add to global list
747-
callback->next = active_callbacks;
748-
active_callbacks = callback;
749-
750-
zend_async_resume_when(
751-
coroutine,
752-
&event_handle->base,
753-
true,
754-
NULL,
755-
&callback->callback
756-
);
757-
count++;
727+
const int stream_result = php_stream_set_option(stream, PHP_STREAM_OPTION_ASYNC_EVENT_HANDLE, events, &event_handle);
728+
bool is_socket = stream_result == PHP_STREAM_OPTION_RETURN_OK && event_handle != NULL;
729+
730+
io_descriptor_t io_descriptor;
731+
732+
if (UNEXPECTED(FAILURE == php_stream_cast(stream,
733+
PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&io_descriptor, 1)
734+
|| io_descriptor.fd == INVALID_IO_DESCRIPTOR)) {
735+
zend_throw_error(NULL, "Failed to cast stream to I/O descriptor");
736+
*result = -1;
737+
return false;
738+
}
739+
740+
// If stream has no event handle, create it.
741+
if (event_handle == NULL) {
742+
if (io_descriptor.type == IO_DESCRIPTOR_SOCKET) {
743+
event_handle = ZEND_ASYNC_NEW_SOCKET_EVENT(io_descriptor.socket, events);
744+
} else {
745+
event_handle = ZEND_ASYNC_NEW_POLL_EVENT(io_descriptor.fd, 0, events);
758746
}
759-
} else {
760-
// Non-socket stream or socket stream that doesn't support event handle
761-
// Use fallback with poll event
762-
php_socket_t fd;
763-
if (SUCCESS == php_stream_cast(stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd != -1) {
764-
zend_async_poll_event_t *poll_event = ZEND_ASYNC_NEW_POLL_EVENT(fd, 0, events);
765-
if (UNEXPECTED(EG(exception) != NULL)) {
766-
errno = ENOMEM;
767-
*result = -1;
768-
return -1;
769-
}
770747

771-
async_stream_callback_t *callback = ecalloc(1, sizeof(async_stream_callback_t));
772-
callback->callback.coroutine = coroutine;
773-
callback->callback.base.ref_count = 1;
774-
callback->callback.base.callback = async_stream_callback_resolve;
775-
callback->stream_elem = elem;
776-
callback->fd = fd;
777-
callback->events = events;
778-
callback->ready = false;
779-
callback->stream_type = events; // Use actual event as type
780-
781-
// Add to global list
782-
callback->next = active_callbacks;
783-
active_callbacks = callback;
784-
785-
zend_async_resume_when(
786-
coroutine,
787-
&poll_event->base,
788-
true,
789-
NULL,
790-
&callback->callback
791-
);
792-
count++;
748+
if (UNEXPECTED(EG(exception) != NULL)) {
749+
*result = -1;
750+
return false;
751+
}
752+
753+
// And save it
754+
if (UNEXPECTED(PHP_STREAM_OPTION_RETURN_OK != php_stream_set_option(
755+
stream, PHP_STREAM_OPTION_ASYNC_EVENT_HANDLE, events, &event_handle)
756+
)) {
757+
zend_throw_error(NULL, "Failed to set async event handle on stream");
758+
*result = -1;
759+
return false;
793760
}
794761
}
762+
763+
async_stream_callback_t *callback = ecalloc(1, sizeof(async_stream_callback_t));
764+
callback->callback.coroutine = coroutine;
765+
callback->callback.base.ref_count = 1;
766+
callback->callback.base.callback = async_stream_callback_resolve;
767+
callback->stream = elem;
768+
callback->fd = io_descriptor;
769+
callback->events = events;
770+
callback->ready = false;
771+
772+
zend_async_resume_when(
773+
coroutine,
774+
&event_handle->base,
775+
true,
776+
NULL,
777+
&callback->callback
778+
);
779+
780+
count++;
795781
} ZEND_HASH_FOREACH_END();
796782

797783
return count;
@@ -869,42 +855,46 @@ ZEND_API int async_select(zval *read_streams, zval *write_streams, zval *except_
869855
result = Z_LVAL(coroutine->waker->result);
870856

871857
// Collect ready streams and modify arrays
872-
if (result > 0) {
858+
if (result > 0 && coroutine->waker->triggered_events != NULL) {
873859
HashTable ready_read_streams, ready_write_streams, ready_except_streams;
874860
zend_hash_init(&ready_read_streams, 0, NULL, ZVAL_PTR_DTOR, 0);
875861
zend_hash_init(&ready_write_streams, 0, NULL, ZVAL_PTR_DTOR, 0);
876862
zend_hash_init(&ready_except_streams, 0, NULL, ZVAL_PTR_DTOR, 0);
877863

878-
// Collect ready streams from callbacks
879-
async_stream_callback_t *cb = active_callbacks;
880-
while (cb != NULL) {
881-
if (cb->ready) {
882-
HashTable *target = NULL;
883-
switch (cb->stream_type) {
884-
case 0: target = &ready_read_streams; break;
885-
case 1: target = &ready_write_streams; break;
886-
case 2: target = &ready_except_streams; break;
887-
}
888-
if (target != NULL) {
889-
// Find original key in source array to preserve it
890-
zval *stream_elem = cb->stream_elem;
891-
zval *dest_elem = zend_hash_next_index_insert(target, stream_elem);
892-
if (dest_elem) {
893-
zval_add_ref(dest_elem);
864+
// Collect ready streams from triggered_events
865+
zval *callback_val;
866+
ZEND_HASH_FOREACH_VAL(coroutine->waker->triggered_events, callback_val) {
867+
if (Z_TYPE_P(callback_val) == IS_PTR) {
868+
async_stream_callback_t *cb = (async_stream_callback_t *)Z_PTR_P(callback_val);
869+
if (cb->ready) {
870+
HashTable *target = NULL;
871+
if (cb->stream_type & ASYNC_READABLE) {
872+
target = &ready_read_streams;
873+
} else if (cb->stream_type & ASYNC_WRITABLE) {
874+
target = &ready_write_streams;
875+
} else if (cb->stream_type & ASYNC_PRIORITIZED) {
876+
target = &ready_except_streams;
877+
}
878+
879+
if (target != NULL) {
880+
zval *stream_elem = cb->stream;
881+
zval *dest_elem = zend_hash_next_index_insert(target, stream_elem);
882+
if (dest_elem) {
883+
zval_add_ref(dest_elem);
884+
}
894885
}
895886
}
896887
}
897-
cb = cb->next;
898-
}
888+
} ZEND_HASH_FOREACH_END();
899889

900890
// Modify original arrays
901-
if (read_streams != NULL && zend_hash_num_elements(&ready_read_streams) >= 0) {
891+
if (read_streams != NULL && zend_hash_num_elements(&ready_read_streams) > 0) {
902892
modify_stream_array(read_streams, &ready_read_streams);
903893
}
904-
if (write_streams != NULL && zend_hash_num_elements(&ready_write_streams) >= 0) {
894+
if (write_streams != NULL && zend_hash_num_elements(&ready_write_streams) > 0) {
905895
modify_stream_array(write_streams, &ready_write_streams);
906896
}
907-
if (except_streams != NULL && zend_hash_num_elements(&ready_except_streams) >= 0) {
897+
if (except_streams != NULL && zend_hash_num_elements(&ready_except_streams) > 0) {
908898
modify_stream_array(except_streams, &ready_except_streams);
909899
}
910900

@@ -921,11 +911,6 @@ ZEND_API int async_select(zval *read_streams, zval *write_streams, zval *except_
921911
handle_exception_and_errno();
922912

923913
cleanup:
924-
// Clean up active callbacks list
925-
while (active_callbacks != NULL) {
926-
async_stream_callback_t *next = active_callbacks->next;
927-
active_callbacks = next;
928-
}
929914

930915
zend_async_waker_clean(coroutine);
931916
return result;

0 commit comments

Comments
 (0)