1
- <?php namespace Async ;
1
+ <?php namespace Async \ TCP ;
2
2
3
- class TCPServer {
4
- public $ connectCallback ;
3
+ function eventReadCallback ($ bufferEvent , $ connection ) {
4
+ $ cb = $ connection ->dataCallback ;
5
+ if (!$ cb )
6
+ return ;
7
+
8
+ $ dataArray = array ();
9
+ while ($ data = event_buffer_read ($ bufferEvent , 256 )) {
10
+ $ dataArray [] = $ data ;
11
+ }
12
+
13
+ $ cb (implode (NULL , $ dataArray ));
14
+ };
15
+
16
+ function eventWriteCallback ($ bufferEvent , $ connection ) {
17
+ $ connection ->writePending = FALSE ;
18
+
19
+ if ($ connection ->closePending ) {
20
+ $ connection ->close ();
21
+ }
22
+ };
23
+
24
+ function eventErrorCallback ($ bufferEvent , $ events , $ connection ) {
25
+ $ connection ->close ();
26
+
27
+ $ cb = $ connection ->disconnectCallback ;
28
+ if ($ cb ) {
29
+ $ cb ();
30
+ }
31
+ };
32
+
33
+
34
+ class Connection {
35
+ public $ socket ;
36
+ public $ eventBuffer ;
37
+ public $ tcpServer ;
5
38
public $ dataCallback ;
6
- public $ closeCallback ;
7
- public $ listenSocket ;
39
+ public $ disconnectCallback ;
40
+ public $ writePending = FALSE ;
41
+ public $ closePending = FALSE ;
42
+
43
+ function __construct ($ socket , $ server ) {
44
+ $ this ->socket = $ socket ;
45
+ $ this ->tcpServer = $ server ;
46
+
47
+ stream_set_blocking ($ socket , 0 );
48
+
49
+ $ this ->eventBuffer = event_buffer_new (
50
+ $ socket , // File descriptor to watch
51
+ '\Async\TCP\eventReadCallback ' , // Read event callback
52
+ '\Async\TCP\eventWriteCallback ' , // Write event callback
53
+ '\Async\TCP\eventErrorCallback ' , // Error callback
54
+ $ this // Custom data to provide to callback
55
+ );
56
+
57
+ event_buffer_base_set ($ this ->eventBuffer , $ server ->eventBase );
58
+ // event_buffer_timeout_set($this->eventBuffer, 30, 30);
59
+ event_buffer_watermark_set ($ this ->eventBuffer , EV_READ | EV_WRITE , 0 , 0xffffff );
60
+ event_buffer_priority_set ($ this ->eventBuffer , 10 );
61
+ event_buffer_enable ($ this ->eventBuffer , EV_READ | EV_WRITE | EV_PERSIST );
62
+ }
63
+
64
+ function write ($ bytes ) {
65
+ $ this ->writePending = TRUE ;
66
+ event_buffer_write ($ this ->eventBuffer , $ bytes );
67
+ }
68
+
69
+ function close () {
70
+ if (!$ this ->writePending )
71
+ $ this ->_close ();
72
+ else {
73
+ echo "WAITING FOR WRITE \n" ;
74
+ $ this ->closePending = TRUE ;
75
+ }
76
+ }
77
+
78
+ function _close () {
79
+ event_buffer_disable ($ this ->eventBuffer , EV_READ | EV_WRITE );
80
+ event_buffer_free ($ this ->eventBuffer );
81
+ fclose ($ this ->socket );
82
+ unset($ this ->eventBuffer , $ this ->socket );
83
+ echo "TCP CONNECTION CLOSED \n" ;
84
+ }
85
+
86
+ function onData ($ function ) {
87
+ $ this ->dataCallback = $ function ;
88
+ }
89
+
90
+ function onDisconnect ($ function ) {
91
+ $ this ->disconnectCallback = $ function ;
92
+ }
93
+ }
94
+
95
+ class Server {
96
+ public $ connectCallback ;
97
+ public $ socket ;
8
98
public $ address ;
9
99
public $ eventBase ;
10
- public $ connections ;
11
- public $ buffers ;
12
- public $ count ;
13
100
public $ evAccept ;
14
- public $ evRead ;
15
- public $ evError ;
16
101
17
102
function __construct ($ address ) {
18
103
$ this ->address = $ address ;
19
- $ this ->connections = array ();
20
- $ this ->buffers = array ();
21
- $ this ->count = 0 ;
104
+ $ this ->eventBase = \event_base_new ();
22
105
23
106
$ this ->evAccept = function ($ fd , $ events , $ server ) {
24
- $ conn = stream_socket_accept ($ fd );
25
- stream_set_blocking ($ conn , 0 );
26
-
27
- $ buffer = event_buffer_new (
28
- $ conn , $ server ->evRead , NULL , $ server ->evError , array ($ server , $ server ->count ));
29
- event_buffer_base_set ($ buffer , $ server ->eventBase );
30
- event_buffer_timeout_set ($ buffer , 30 , 30 );
31
- event_buffer_watermark_set ($ buffer , EV_READ , 0 , 0xffffff );
32
- event_buffer_priority_set ($ buffer , 10 );
33
- event_buffer_enable ($ buffer , EV_READ | EV_PERSIST );
107
+ $ socket = stream_socket_accept ($ fd );
108
+ $ connection = new Connection ($ socket , $ server );
34
109
35
- $ server ->connections [$ server ->count ] = $ conn ;
36
- $ server ->buffers [$ server ->count ] = $ buffer ;
37
-
38
- $ server ->count += 1 ;
39
110
$ cb = $ server ->connectCallback ;
40
111
if ($ cb ) {
41
- $ cb ();
42
- }
43
- };
44
-
45
- $ this ->evRead = function ($ buffer , $ data ) {
46
- $ server = $ data [0 ];
47
- $ cb = $ server ->dataCallback ;
48
- while ($ read = event_buffer_read ($ buffer , 256 )) {
49
- if ($ cb ) {
50
- $ cb ($ read );
51
- }
52
- }
53
- };
54
-
55
- $ this ->evError = function ($ buffer , $ error , $ data ) {
56
- $ server = $ data [0 ];
57
- $ id = $ data [1 ];
58
- event_buffer_disable ($ server ->buffers [$ id ], EV_READ | EV_WRITE );
59
- event_buffer_free ($ server ->buffers [$ id ]);
60
- fclose ($ server ->connections [$ id ]);
61
- unset($ server ->buffers [$ id ], $ server ->connections [$ id ]);
62
- $ cb = $ server ->closeCallback ;
63
- if ($ cb ) {
64
- $ cb ();
112
+ $ cb ($ connection );
65
113
}
66
114
};
67
115
}
@@ -70,23 +118,14 @@ function onConnect($function) {
70
118
$ this ->connectCallback = $ function ;
71
119
}
72
120
73
- function onData ($ function ) {
74
- $ this ->dataCallback = $ function ;
75
- }
76
-
77
- function onClose ($ function ) {
78
- $ this ->closeCallback = $ function ;
79
- }
80
-
81
121
function run () {
82
- $ this ->listenSocket = stream_socket_server ($ this ->address );
122
+ $ this ->socket = stream_socket_server ($ this ->address );
83
123
84
- $ this ->eventBase = event_base_new ();
85
124
$ event = event_new ();
86
125
87
126
event_set (
88
127
$ event , // The libevent Event object
89
- $ this ->listenSocket , // The file descriptor to watch
128
+ $ this ->socket , // The file descriptor to watch
90
129
EV_READ |EV_PERSIST , // Watch for READ events and watch the event forever, not just once
91
130
$ this ->evAccept , // Call this function when the event happens
92
131
$ this ); // Pass this as the 3rd argument to the callback
0 commit comments