@@ -105,6 +105,13 @@ export class Http2CallStream extends Duplex implements CallStream {
105105 // Status code mapped from :status. To be used if grpc-status is not received
106106 private mappedStatusCode : Status = Status . UNKNOWN ;
107107
108+ // Promise objects that are re-assigned to resolving promises when headers
109+ // or trailers received. Processing headers/trailers is asynchronous, so we
110+ // can use these objects to await their completion. This helps us establish
111+ // order of precedence when obtaining the status of the call.
112+ private handlingHeaders = Promise . resolve ( ) ;
113+ private handlingTrailers = Promise . resolve ( ) ;
114+
108115 // This is populated (non-null) if and only if the call has ended
109116 private finalStatus : StatusObject | null = null ;
110117
@@ -116,6 +123,11 @@ export class Http2CallStream extends Duplex implements CallStream {
116123 this . filterStack = filterStackFactory . createFilter ( this ) ;
117124 }
118125
126+ /**
127+ * On first call, emits a 'status' event with the given StatusObject.
128+ * Subsequent calls are no-ops.
129+ * @param status The status of the call.
130+ */
119131 private endCall ( status : StatusObject ) : void {
120132 if ( this . finalStatus === null ) {
121133 this . finalStatus = status ;
@@ -135,12 +147,46 @@ export class Http2CallStream extends Duplex implements CallStream {
135147 return canPush ;
136148 }
137149
150+ private handleTrailers ( headers : http2 . IncomingHttpHeaders ) {
151+ let code : Status = this . mappedStatusCode ;
152+ let details = '' ;
153+ let metadata : Metadata ;
154+ try {
155+ metadata = Metadata . fromHttp2Headers ( headers ) ;
156+ } catch ( e ) {
157+ metadata = new Metadata ( ) ;
158+ }
159+ let status : StatusObject = { code, details, metadata} ;
160+ this . handlingTrailers = ( async ( ) => {
161+ let finalStatus ;
162+ try {
163+ // Attempt to assign final status.
164+ finalStatus = await this . filterStack . receiveTrailers ( Promise . resolve ( status ) ) ;
165+ } catch ( error ) {
166+ await this . handlingHeaders ;
167+ // This is a no-op if the call was already ended when handling headers.
168+ this . endCall ( {
169+ code : Status . INTERNAL ,
170+ details : 'Failed to process received status' ,
171+ metadata : new Metadata ( )
172+ } ) ;
173+ return ;
174+ }
175+ // It's possible that headers were received but not fully handled yet.
176+ // Give the headers handler an opportunity to end the call first,
177+ // if an error occurred.
178+ await this . handlingHeaders ;
179+ // This is a no-op if the call was already ended when handling headers.
180+ this . endCall ( finalStatus ) ;
181+ } ) ( ) ;
182+ }
183+
138184 attachHttp2Stream ( stream : http2 . ClientHttp2Stream ) : void {
139185 if ( this . finalStatus !== null ) {
140186 stream . rstWithCancel ( ) ;
141187 } else {
142188 this . http2Stream = stream ;
143- stream . on ( 'response' , ( headers ) => {
189+ stream . on ( 'response' , ( headers , flags ) => {
144190 switch ( headers [ HTTP2_HEADER_STATUS ] ) {
145191 // TODO(murgatroid99): handle 100 and 101
146192 case '400' :
@@ -166,57 +212,27 @@ export class Http2CallStream extends Duplex implements CallStream {
166212 }
167213 delete headers [ HTTP2_HEADER_STATUS ] ;
168214 delete headers [ HTTP2_HEADER_CONTENT_TYPE ] ;
169- let metadata : Metadata ;
170- try {
171- metadata = Metadata . fromHttp2Headers ( headers ) ;
172- } catch ( e ) {
173- this . cancelWithStatus ( Status . UNKNOWN , e . message ) ;
174- return ;
175- }
176- this . filterStack . receiveMetadata ( Promise . resolve ( metadata ) )
177- . then (
178- ( finalMetadata ) => {
179- this . emit ( 'metadata' , finalMetadata ) ;
180- } ,
181- ( error ) => {
182- this . cancelWithStatus ( Status . UNKNOWN , error . message ) ;
183- } ) ;
184- } ) ;
185- stream . on ( 'trailers' , ( headers : http2 . IncomingHttpHeaders ) => {
186- let code : Status = this . mappedStatusCode ;
187- let details = '' ;
188- if ( typeof headers [ 'grpc-status' ] === 'string' ) {
189- let receivedCode = Number ( headers [ 'grpc-status' ] ) ;
190- if ( receivedCode in Status ) {
191- code = receivedCode ;
192- } else {
193- code = Status . UNKNOWN ;
215+ if ( flags & http2 . constants . NGHTTP2_FLAG_END_STREAM ) {
216+ this . handleTrailers ( headers ) ;
217+ } else {
218+ let metadata : Metadata ;
219+ try {
220+ metadata = Metadata . fromHttp2Headers ( headers ) ;
221+ } catch ( error ) {
222+ this . endCall ( { code : Status . UNKNOWN , details : error . message , metadata : new Metadata ( ) } ) ;
223+ return ;
194224 }
195- delete headers [ 'grpc-status' ] ;
196- }
197- if ( typeof headers [ 'grpc-message' ] === 'string' ) {
198- details = decodeURI ( headers [ 'grpc-message' ] as string ) ;
225+ this . handlingHeaders =
226+ this . filterStack . receiveMetadata ( Promise . resolve ( metadata ) )
227+ . then ( ( finalMetadata ) => {
228+ this . emit ( 'metadata' , finalMetadata ) ;
229+ } ) . catch ( ( error ) => {
230+ this . destroyHttp2Stream ( ) ;
231+ this . endCall ( { code : Status . UNKNOWN , details : error . message , metadata : new Metadata ( ) } ) ;
232+ } ) ;
199233 }
200- let metadata : Metadata ;
201- try {
202- metadata = Metadata . fromHttp2Headers ( headers ) ;
203- } catch ( e ) {
204- metadata = new Metadata ( ) ;
205- }
206- let status : StatusObject = { code, details, metadata} ;
207- this . filterStack . receiveTrailers ( Promise . resolve ( status ) )
208- . then (
209- ( finalStatus ) => {
210- this . endCall ( finalStatus ) ;
211- } ,
212- ( error ) => {
213- this . endCall ( {
214- code : Status . INTERNAL ,
215- details : 'Failed to process received status' ,
216- metadata : new Metadata ( )
217- } ) ;
218- } ) ;
219234 } ) ;
235+ stream . on ( 'trailers' , this . handleTrailers . bind ( this ) ) ;
220236 stream . on ( 'data' , ( data ) => {
221237 let readHead = 0 ;
222238 let canPush = true ;
@@ -278,7 +294,7 @@ export class Http2CallStream extends Duplex implements CallStream {
278294 this . unpushedReadMessages . push ( null ) ;
279295 }
280296 } ) ;
281- stream . on ( 'streamClosed' , ( errorCode ) => {
297+ stream . on ( 'streamClosed' , async ( errorCode ) => {
282298 let code : Status ;
283299 let details = '' ;
284300 switch ( errorCode ) {
@@ -299,6 +315,13 @@ export class Http2CallStream extends Duplex implements CallStream {
299315 default :
300316 code = Status . INTERNAL ;
301317 }
318+ // This guarantees that if trailers were received, the value of the
319+ // 'grpc-status' header takes precedence for emitted status data.
320+ await this . handlingTrailers ;
321+ // This is a no-op if trailers were received at all.
322+ // This is OK, because status codes emitted here correspond to more
323+ // catastrophic issues that prevent us from receiving trailers in the
324+ // first place.
302325 this . endCall ( { code : code , details : details , metadata : new Metadata ( ) } ) ;
303326 } ) ;
304327 stream . on ( 'error' , ( err : Error ) => {
@@ -323,8 +346,7 @@ export class Http2CallStream extends Duplex implements CallStream {
323346 }
324347 }
325348
326- cancelWithStatus ( status : Status , details : string ) : void {
327- this . endCall ( { code : status , details : details , metadata : new Metadata ( ) } ) ;
349+ private destroyHttp2Stream ( ) {
328350 // The http2 stream could already have been destroyed if cancelWithStatus
329351 // is called in response to an internal http2 error.
330352 if ( this . http2Stream !== null && ! this . http2Stream . destroyed ) {
@@ -334,6 +356,16 @@ export class Http2CallStream extends Duplex implements CallStream {
334356 }
335357 }
336358
359+ cancelWithStatus ( status : Status , details : string ) : void {
360+ this . destroyHttp2Stream ( ) ;
361+ ( async ( ) => {
362+ // If trailers are currently being processed, the call should be ended
363+ // by handleTrailers instead.
364+ await this . handlingTrailers ;
365+ this . endCall ( { code : status , details : details , metadata : new Metadata ( ) } ) ;
366+ } ) ( ) ;
367+ }
368+
337369 getDeadline ( ) : Deadline {
338370 return this . options . deadline ;
339371 }
0 commit comments