Skip to content

Commit 66a7bac

Browse files
committed
php#53: Optimize stream_select: eliminate temporary arrays and preserve keys
- Replace inefficient temporary array approach with direct callback-based population - Add key preservation to maintain original array indices per PHP stream_select() spec - Refactor async_stream_callback_t to store references to result arrays - Implement real-time stream insertion during event resolution - Extract duplicate hash insertion logic into inline helper function - Use zend_hash_add() instead of zend_hash_update() for cleared arrays This eliminates 3 temporary HashTables and associated copy operations while ensuring original array keys are preserved as documented.
1 parent b69b338 commit 66a7bac

File tree

1 file changed

+74
-93
lines changed

1 file changed

+74
-93
lines changed

main/network_async.c

Lines changed: 74 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -671,9 +671,25 @@ typedef struct async_stream_callback_s {
671671
zend_async_poll_event_t *event;
672672
async_poll_event events;
673673
bool ready;
674+
zval key; // Original array key (string or numeric)
675+
zval *read_streams; // Reference to read streams result array
676+
zval *write_streams; // Reference to write streams result array
677+
zval *except_streams; // Reference to except streams result array
674678
struct async_stream_callback_s *next; // For linked list
675679
} async_stream_callback_t;
676680

681+
static zend_always_inline void add_stream_to_array(zval *array, zval *key, zval *stream_zval) {
682+
if (array == NULL) return;
683+
684+
zval *dest_elem;
685+
if (Z_TYPE_P(key) == IS_STRING) {
686+
dest_elem = zend_hash_add(Z_ARR_P(array), Z_STR_P(key), stream_zval);
687+
} else {
688+
dest_elem = zend_hash_index_add(Z_ARR_P(array), Z_LVAL_P(key), stream_zval);
689+
}
690+
if (dest_elem) zval_add_ref(dest_elem);
691+
}
692+
677693
static void async_stream_callback_resolve(
678694
zend_async_event_t *event, zend_async_event_callback_t *callback, void *result, zend_object *exception
679695
)
@@ -693,6 +709,24 @@ static void async_stream_callback_resolve(
693709
// Add the event to the waker's triggered events list
694710
zend_async_waker_add_triggered_event(coroutine, event);
695711

712+
zend_async_poll_event_t *poll_event = (zend_async_poll_event_t *)event;
713+
714+
// Immediately add ready stream to appropriate result array with preserved key
715+
zval stream_zval;
716+
php_stream_to_zval(stream_callback->stream, &stream_zval);
717+
718+
if (poll_event->triggered_events & ASYNC_READABLE) {
719+
add_stream_to_array(stream_callback->read_streams, &stream_callback->key, &stream_zval);
720+
}
721+
722+
if (poll_event->triggered_events & ASYNC_WRITABLE) {
723+
add_stream_to_array(stream_callback->write_streams, &stream_callback->key, &stream_zval);
724+
}
725+
726+
if (poll_event->triggered_events & ASYNC_PRIORITIZED) {
727+
add_stream_to_array(stream_callback->except_streams, &stream_callback->key, &stream_zval);
728+
}
729+
696730
// Increment total ready count in waker result
697731
if (Z_TYPE(coroutine->waker->result) == IS_UNDEF) {
698732
ZVAL_LONG(&coroutine->waker->result, 1);
@@ -707,25 +741,28 @@ static void async_stream_callback_resolve(
707741
/**
708742
* Optimized select() for PHP stream arrays using event reuse
709743
*/
710-
static zend_always_inline bool process_stream_array(zval *streams, async_poll_event events, zend_coroutine_t *coroutine, int *result) {
744+
static zend_always_inline bool process_stream_array(zval *streams, async_poll_event events, zend_coroutine_t *coroutine,
745+
zval *read_streams_array, zval *write_streams_array, zval *except_streams_array, int *result) {
711746

712747
if (streams == NULL || Z_TYPE_P(streams) != IS_ARRAY) {
713748
return true;
714749
}
715750

716751
zval *z_stream;
717752
php_stream *stream;
753+
zend_string *key;
754+
zend_ulong num_key;
718755
int count = 0;
719756

720-
ZEND_HASH_FOREACH_VAL(Z_ARR_P(streams), z_stream) {
757+
ZEND_HASH_FOREACH_KEY_VAL(Z_ARR_P(streams), num_key, key, z_stream) {
721758
ZVAL_DEREF(z_stream);
722759
php_stream_from_zval_no_verify(stream, z_stream);
723760
if (stream == NULL) continue;
724761

725762
// Try to get async event handle from socket streams first
726763
zend_async_poll_event_t *event_handle = NULL;
727764
const int stream_result = php_stream_set_option(stream, PHP_STREAM_OPTION_ASYNC_EVENT_HANDLE, events, &event_handle);
728-
bool is_socket = stream_result == PHP_STREAM_OPTION_RETURN_OK && event_handle != NULL;
765+
const bool is_socket = stream_result == PHP_STREAM_OPTION_RETURN_OK && event_handle != NULL;
729766

730767
io_descriptor_t io_descriptor;
731768

@@ -771,6 +808,18 @@ static zend_always_inline bool process_stream_array(zval *streams, async_poll_ev
771808
callback->fd = io_descriptor;
772809
callback->events = events;
773810
callback->ready = false;
811+
812+
// Save original array key
813+
if (key) {
814+
ZVAL_STR_COPY(&callback->key, key);
815+
} else {
816+
ZVAL_LONG(&callback->key, num_key);
817+
}
818+
819+
// Save references to result arrays
820+
callback->read_streams = read_streams_array;
821+
callback->write_streams = write_streams_array;
822+
callback->except_streams = except_streams_array;
774823

775824
zend_async_resume_when(
776825
coroutine,
@@ -786,35 +835,6 @@ static zend_always_inline bool process_stream_array(zval *streams, async_poll_ev
786835
return count;
787836
}
788837

789-
/**
790-
* Modify stream array to contain only ready streams (like stream_array_from_fd_set)
791-
*/
792-
static void modify_stream_array(zval *stream_array, HashTable *ready_streams) {
793-
if (stream_array == NULL || Z_TYPE_P(stream_array) != IS_ARRAY) {
794-
return;
795-
}
796-
797-
// Create new array with only ready streams
798-
HashTable *ht = zend_new_array(zend_hash_num_elements(ready_streams));
799-
800-
zval *elem;
801-
zend_string *key;
802-
zend_ulong num_ind;
803-
804-
ZEND_HASH_FOREACH_KEY_VAL(ready_streams, num_ind, key, elem) {
805-
zval *dest_elem;
806-
if (!key) {
807-
dest_elem = zend_hash_index_update(ht, num_ind, elem);
808-
} else {
809-
dest_elem = zend_hash_update(ht, key, elem);
810-
}
811-
zval_add_ref(dest_elem);
812-
} ZEND_HASH_FOREACH_END();
813-
814-
// Replace old array with new one
815-
zval_ptr_dtor(stream_array);
816-
ZVAL_ARR(stream_array, ht);
817-
}
818838

819839
/**
820840
* Asynchronous select() implementation for PHP stream arrays in coroutine contexts.
@@ -876,80 +896,34 @@ ZEND_API int php_stream_select_async(zval *read_streams, zval *write_streams, zv
876896
// Initialize result counter
877897
ZVAL_LONG(&coroutine->waker->result, 0);
878898

899+
// Clear result arrays first
900+
if (read_streams != NULL && Z_TYPE_P(read_streams) == IS_ARRAY) {
901+
zend_hash_clean(Z_ARR_P(read_streams));
902+
}
903+
if (write_streams != NULL && Z_TYPE_P(write_streams) == IS_ARRAY) {
904+
zend_hash_clean(Z_ARR_P(write_streams));
905+
}
906+
if (except_streams != NULL && Z_TYPE_P(except_streams) == IS_ARRAY) {
907+
zend_hash_clean(Z_ARR_P(except_streams));
908+
}
909+
879910
// Process all stream arrays using the helper function
880-
if (UNEXPECTED(!process_stream_array(read_streams, ASYNC_READABLE, coroutine, &result))) {
911+
if (UNEXPECTED(!process_stream_array(read_streams, ASYNC_READABLE, coroutine, read_streams, write_streams, except_streams, &result))) {
881912
goto cleanup;
882913
}
883-
if (UNEXPECTED(!process_stream_array(write_streams, ASYNC_WRITABLE, coroutine, &result))) {
914+
if (UNEXPECTED(!process_stream_array(write_streams, ASYNC_WRITABLE, coroutine, read_streams, write_streams, except_streams, &result))) {
884915
goto cleanup;
885916
}
886-
if (UNEXPECTED(!process_stream_array(except_streams, ASYNC_PRIORITIZED, coroutine, &result))) {
917+
if (UNEXPECTED(!process_stream_array(except_streams, ASYNC_PRIORITIZED, coroutine, read_streams, write_streams, except_streams, &result))) {
887918
goto cleanup;
888919
}
889920

890921
// Suspend until events occur or timeout
891922
ZEND_ASYNC_SUSPEND();
892923
IF_EXCEPTION_GOTO_ERROR;
893924

894-
// Get result count
925+
// Get result count - arrays are already filled by callbacks
895926
result = Z_LVAL(coroutine->waker->result);
896-
897-
// Collect ready streams and modify arrays
898-
if (result > 0 && coroutine->waker->triggered_events != NULL) {
899-
HashTable ready_read_streams, ready_write_streams, ready_except_streams;
900-
zend_hash_init(&ready_read_streams, 0, NULL, ZVAL_PTR_DTOR, 0);
901-
zend_hash_init(&ready_write_streams, 0, NULL, ZVAL_PTR_DTOR, 0);
902-
zend_hash_init(&ready_except_streams, 0, NULL, ZVAL_PTR_DTOR, 0);
903-
904-
// Collect ready streams from triggered_events
905-
async_stream_callback_t *cb;
906-
ZEND_HASH_FOREACH_PTR(coroutine->waker->triggered_events, cb) {
907-
if (cb->ready) {
908-
HashTable *target = NULL;
909-
if (cb->event->triggered_events & ASYNC_READABLE) {
910-
zval *dest_elem = zend_hash_next_index_insert_ptr(&ready_read_streams, cb->stream);
911-
if (dest_elem) {
912-
zval_add_ref(dest_elem);
913-
} else {
914-
zend_throw_exception(NULL, "Failed to insert ready read stream", 0);
915-
goto error;
916-
}
917-
} else if (cb->event->triggered_events & ASYNC_WRITABLE) {
918-
zval *dest_elem = zend_hash_next_index_insert_ptr(&ready_read_streams, cb->stream);
919-
if (dest_elem) {
920-
zval_add_ref(dest_elem);
921-
} else {
922-
zend_throw_exception(NULL, "Failed to insert ready write stream", 0);
923-
goto error;
924-
}
925-
} else if (cb->event->triggered_events & ASYNC_PRIORITIZED) {
926-
zval *dest_elem = zend_hash_next_index_insert_ptr(&ready_read_streams, cb->stream);
927-
if (dest_elem) {
928-
zval_add_ref(dest_elem);
929-
} else {
930-
zend_throw_exception(NULL, "Failed to insert ready except stream", 0);
931-
goto error;
932-
}
933-
}
934-
}
935-
} ZEND_HASH_FOREACH_END();
936-
937-
// Modify original arrays
938-
if (read_streams != NULL && zend_hash_num_elements(&ready_read_streams) > 0) {
939-
modify_stream_array(read_streams, &ready_read_streams);
940-
}
941-
if (write_streams != NULL && zend_hash_num_elements(&ready_write_streams) > 0) {
942-
modify_stream_array(write_streams, &ready_write_streams);
943-
}
944-
if (except_streams != NULL && zend_hash_num_elements(&ready_except_streams) > 0) {
945-
modify_stream_array(except_streams, &ready_except_streams);
946-
}
947-
948-
// Cleanup ready stream tables
949-
zend_hash_destroy(&ready_read_streams);
950-
zend_hash_destroy(&ready_write_streams);
951-
zend_hash_destroy(&ready_except_streams);
952-
}
953927

954928
goto cleanup;
955929

@@ -958,6 +932,13 @@ ZEND_API int php_stream_select_async(zval *read_streams, zval *write_streams, zv
958932
handle_exception_and_errno();
959933

960934
cleanup:
935+
// Clean up zval keys in callbacks before waker cleanup
936+
if (coroutine->waker != NULL && coroutine->waker->triggered_events != NULL) {
937+
async_stream_callback_t *cb;
938+
ZEND_HASH_FOREACH_PTR(coroutine->waker->triggered_events, cb) {
939+
zval_ptr_dtor(&cb->key);
940+
} ZEND_HASH_FOREACH_END();
941+
}
961942

962943
zend_async_waker_clean(coroutine);
963944
return result;

0 commit comments

Comments
 (0)