@@ -17,35 +17,39 @@ class AmqpConnectionFactory implements PsrConnectionFactory
17
17
private $ connection ;
18
18
19
19
/**
20
- * $config = [
20
+ * The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default credentials.
21
+ *
22
+ * [
21
23
* 'host' => amqp.host The host to connect too. Note: Max 1024 characters.
22
24
* 'port' => amqp.port Port on the host.
23
25
* 'vhost' => amqp.vhost The virtual host on the host. Note: Max 128 characters.
24
- * 'login ' => amqp.login The login name to use. Note: Max 128 characters.
25
- * 'password ' => amqp.password Password. Note: Max 128 characters.
26
+ * 'user ' => amqp.user The user name to use. Note: Max 128 characters.
27
+ * 'pass ' => amqp.password Password. Note: Max 128 characters.
26
28
* 'read_timeout' => Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
27
29
* 'write_timeout' => Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
28
30
* 'connect_timeout' => Connection timeout. Note: 0 or greater seconds. May be fractional.
29
31
* 'persisted' => bool, Whether it use single persisted connection or open a new one for every context
30
32
* 'lazy' => the connection will be performed as later as possible, if the option set to true
31
- * ].
33
+ * ]
34
+ *
35
+ * or
36
+ *
37
+ * amqp://user:pass@host:10000/vhost?lazy=true&persisted=false&read_timeout=2
32
38
*
33
- * @param $config
39
+ * @param array|string $config
34
40
*/
35
- public function __construct (array $ config )
41
+ public function __construct ($ config = ' amqp:// ' )
36
42
{
37
- $ this ->config = array_replace ([
38
- 'host ' => null ,
39
- 'port ' => null ,
40
- 'vhost ' => null ,
41
- 'login ' => null ,
42
- 'password ' => null ,
43
- 'read_timeout ' => null ,
44
- 'write_timeout ' => null ,
45
- 'connect_timeout ' => null ,
46
- 'persisted ' => false ,
47
- 'lazy ' => true ,
48
- ], $ config );
43
+ if (empty ($ config )) {
44
+ $ config = [];
45
+ } elseif (is_string ($ config )) {
46
+ $ config = $ this ->parseDsn ($ config );
47
+ } elseif (is_array ($ config )) {
48
+ } else {
49
+ throw new \LogicException ('The config must be eaither an array of options, a DSN string or null ' );
50
+ }
51
+
52
+ $ this ->config = array_replace ($ this ->defaultConfig (), $ config );
49
53
}
50
54
51
55
/**
@@ -64,18 +68,89 @@ public function createContext()
64
68
return new AmqpContext (new \AMQPChannel ($ this ->establishConnection ()));
65
69
}
66
70
71
+ /**
72
+ * @return \AMQPConnection
73
+ */
67
74
private function establishConnection ()
68
75
{
69
76
if (false == $ this ->connection ) {
70
- $ this ->connection = new \AMQPConnection ($ this ->config );
71
-
77
+ $ config = $ this ->config ;
78
+ $ config ['login ' ] = $ this ->config ['user ' ];
79
+ $ config ['password ' ] = $ this ->config ['pass ' ];
80
+ $ this ->connection = new \AMQPConnection ($ config );
72
81
$ this ->config ['persisted ' ] ? $ this ->connection ->pconnect () : $ this ->connection ->connect ();
73
82
}
74
-
75
83
if (false == $ this ->connection ->isConnected ()) {
76
84
$ this ->config ['persisted ' ] ? $ this ->connection ->preconnect () : $ this ->connection ->reconnect ();
77
85
}
78
86
79
87
return $ this ->connection ;
80
88
}
89
+
90
+ /**
91
+ * @param string $dsn
92
+ *
93
+ * @return array
94
+ */
95
+ private function parseDsn ($ dsn )
96
+ {
97
+ if ('amqp:// ' == $ dsn ) {
98
+ return [];
99
+ }
100
+
101
+ $ dsnConfig = parse_url ($ dsn );
102
+ if (false === $ dsnConfig ) {
103
+ throw new \LogicException (sprintf ('Failed to parse DSN "%s" ' , $ dsn ));
104
+ }
105
+
106
+ $ dsnConfig = array_replace ([
107
+ 'scheme ' => null ,
108
+ 'host ' => null ,
109
+ 'port ' => null ,
110
+ 'user ' => null ,
111
+ 'pass ' => null ,
112
+ 'path ' => null ,
113
+ 'query ' => null ,
114
+ ], $ dsnConfig );
115
+
116
+ if ('amqp ' !== $ dsnConfig ['scheme ' ]) {
117
+ throw new \LogicException ('The given DSN scheme "%s" is not supported. Could be "amqp" only. ' );
118
+ }
119
+
120
+ if ($ dsnConfig ['query ' ]) {
121
+ $ query = [];
122
+ parse_str ($ dsnConfig ['query ' ], $ query );
123
+ $ dsnConfig = array_replace ($ query , $ dsnConfig );
124
+ }
125
+
126
+ $ dsnConfig ['vhost ' ] = ltrim ($ dsnConfig ['path ' ], '/ ' );
127
+
128
+ unset($ dsnConfig ['scheme ' ], $ dsnConfig ['query ' ], $ dsnConfig ['fragment ' ], $ dsnConfig ['path ' ]);
129
+
130
+ $ config = array_replace ($ this ->defaultConfig (), $ dsnConfig );
131
+ $ config = array_map (function ($ value ) {
132
+ return urldecode ($ value );
133
+ }, $ config );
134
+
135
+ return $ config ;
136
+ }
137
+
138
+ /**
139
+ * @return array
140
+ */
141
+ private function defaultConfig ()
142
+ {
143
+ return [
144
+ 'host ' => 'localhost ' ,
145
+ 'port ' => 5672 ,
146
+ 'vhost ' => '/ ' ,
147
+ 'user ' => 'guest ' ,
148
+ 'pass ' => 'guest ' ,
149
+ 'read_timeout ' => null ,
150
+ 'write_timeout ' => null ,
151
+ 'connect_timeout ' => null ,
152
+ 'persisted ' => false ,
153
+ 'lazy ' => true ,
154
+ ];
155
+ }
81
156
}
0 commit comments