@@ -81,8 +81,29 @@ module.exports = (arg) => {
81
81
82
82
// Drop the request once we are actually done
83
83
if ( ps . listenerCount ( topic ) === 0 ) {
84
- subscriptions [ topic ] . abort ( )
84
+ if ( ! callback ) {
85
+ return new Promise ( ( resolve , reject ) => {
86
+ // When the response stream has ended, resolve the promise
87
+ eos ( subscriptions [ topic ] . res , ( err ) => {
88
+ // FIXME: Artificial timeout needed to ensure unsubscribed
89
+ setTimeout ( ( ) => {
90
+ if ( err ) return reject ( err )
91
+ resolve ( )
92
+ } )
93
+ } )
94
+ subscriptions [ topic ] . req . abort ( )
95
+ subscriptions [ topic ] = null
96
+ } )
97
+ }
98
+
99
+ // When the response stream has ended, call the callback
100
+ eos ( subscriptions [ topic ] . res , ( err ) => {
101
+ // FIXME: Artificial timeout needed to ensure unsubscribed
102
+ setTimeout ( ( ) => callback ( err ) )
103
+ } )
104
+ subscriptions [ topic ] . req . abort ( )
85
105
subscriptions [ topic ] = null
106
+ return
86
107
}
87
108
88
109
if ( ! callback ) {
@@ -154,13 +175,16 @@ module.exports = (arg) => {
154
175
155
176
// Start the request and transform the response
156
177
// stream to Pubsub messages stream
157
- subscriptions [ topic ] = send . andTransform ( request , PubsubMessageStream . from , ( err , stream ) => {
178
+ subscriptions [ topic ] = { }
179
+ subscriptions [ topic ] . req = send . andTransform ( request , PubsubMessageStream . from , ( err , stream ) => {
158
180
if ( err ) {
159
181
subscriptions [ topic ] = null
160
182
ps . removeListener ( topic , handler )
161
183
return callback ( err )
162
184
}
163
185
186
+ subscriptions [ topic ] . res = stream
187
+
164
188
stream . on ( 'data' , ( msg ) => {
165
189
ps . emit ( topic , msg )
166
190
} )
0 commit comments