7
7
use React \MySQL \Io \Connection ;
8
8
use React \MySQL \Io \Executor ;
9
9
use React \MySQL \Io \Parser ;
10
- use React \Promise \Promise ;
10
+ use React \Promise \Deferred ;
11
11
use React \Promise \PromiseInterface ;
12
12
use React \Socket \Connector ;
13
13
use React \Socket \ConnectorInterface ;
@@ -81,6 +81,19 @@ public function __construct(LoopInterface $loop, ConnectorInterface $connector =
81
81
* instance on success or will reject with an `Exception` if the URL is
82
82
* invalid or the connection or authentication fails.
83
83
*
84
+ * The returned Promise is implemented in such a way that it can be
85
+ * cancelled when it is still pending. Cancelling a pending promise will
86
+ * reject its value with an Exception and will cancel the underlying TCP/IP
87
+ * connection attempt and/or MySQL authentication.
88
+ *
89
+ * ```php
90
+ * $promise = $factory->createConnection($url);
91
+ *
92
+ * $loop->addTimer(3.0, function () use ($promise) {
93
+ * $promise->cancel();
94
+ * });
95
+ * ```
96
+ *
84
97
* The `$url` parameter must contain the database host, optional
85
98
* authentication, port and database to connect to:
86
99
*
@@ -113,8 +126,22 @@ public function createConnection($uri)
113
126
return \React \Promise \reject (new \InvalidArgumentException ('Invalid connect uri given ' ));
114
127
}
115
128
116
- $ uri = $ parts ['host ' ] . ': ' . (isset ($ parts ['port ' ]) ? $ parts ['port ' ] : 3306 );
117
- return $ this ->connector ->connect ($ uri )->then (function (ConnectionInterface $ stream ) use ($ parts ) {
129
+ $ connecting = $ this ->connector ->connect (
130
+ $ parts ['host ' ] . ': ' . (isset ($ parts ['port ' ]) ? $ parts ['port ' ] : 3306 )
131
+ );
132
+
133
+ $ deferred = new Deferred (function ($ _ , $ reject ) use ($ connecting ) {
134
+ // connection cancelled, start with rejecting attempt, then clean up
135
+ $ reject (new \RuntimeException ('Connection to database server cancelled ' ));
136
+
137
+ // either close successful connection or cancel pending connection attempt
138
+ $ connecting ->then (function (ConnectionInterface $ connection ) {
139
+ $ connection ->close ();
140
+ });
141
+ $ connecting ->cancel ();
142
+ });
143
+
144
+ $ connecting ->then (function (ConnectionInterface $ stream ) use ($ parts , $ deferred ) {
118
145
$ executor = new Executor ();
119
146
$ parser = new Parser ($ stream , $ executor );
120
147
@@ -126,17 +153,17 @@ public function createConnection($uri)
126
153
));
127
154
$ parser ->start ();
128
155
129
- return new Promise (function ($ resolve , $ reject ) use ($ command , $ connection , $ stream ) {
130
- $ command ->on ('success ' , function () use ($ resolve , $ connection ) {
131
- $ resolve ($ connection );
132
- });
133
- $ command ->on ('error ' , function ($ error ) use ($ reject , $ stream ) {
134
- $ reject ($ error );
135
- $ stream ->close ();
136
- });
156
+ $ command ->on ('success ' , function () use ($ deferred , $ connection ) {
157
+ $ deferred ->resolve ($ connection );
158
+ });
159
+ $ command ->on ('error ' , function ($ error ) use ($ deferred , $ stream ) {
160
+ $ deferred ->reject ($ error );
161
+ $ stream ->close ();
137
162
});
138
- }, function ($ error ) {
139
- throw new \RuntimeException ('Unable to connect to database server ' , 0 , $ error );
163
+ }, function ($ error ) use ( $ deferred ) {
164
+ $ deferred -> reject ( new \RuntimeException ('Unable to connect to database server ' , 0 , $ error) );
140
165
});
166
+
167
+ return $ deferred ->promise ();
141
168
}
142
169
}
0 commit comments