77 JsonRpcSuccessResponse ,
88 JsonRpcErrorResponse ,
99 JsonRpcError ,
10- JsonRpcId
10+ JsonRpcId ,
11+ MessageEntry
1112} from "./types.js" ;
1213
1314import contentType from "content-type" ;
@@ -68,6 +69,8 @@ export class HttpStreamTransport extends AbstractTransport {
6869 private _requestStreamMap = new Map < string | number , ActiveSseConnection > ( ) ;
6970 private _pendingBatches = new Map < ServerResponse , BatchResponseState > ( ) ;
7071 private _eventCounter = 0 ;
72+ private _globalMessageStore = new Map < string , Map < string , MessageEntry > > ( ) ;
73+ private _pruneInterval ?: NodeJS . Timeout ;
7174
7275 constructor ( config : HttpStreamTransportConfig = { } ) {
7376 super ( ) ;
@@ -91,9 +94,14 @@ export class HttpStreamTransport extends AbstractTransport {
9194 responseMode : this . _config . responseMode ,
9295 sessionEnabled : this . _config . session . enabled ,
9396 resumabilityEnabled : this . _config . resumability . enabled ,
97+ resumabilityStore : this . _config . resumability . messageStoreType ,
9498 authEnabled : ! ! this . _config . auth ,
9599 corsOrigin : this . _config . cors . allowOrigin
96100 } , null , 2 ) } `) ;
101+
102+ if ( this . _config . resumability . enabled && this . _config . resumability . messageStoreType === 'global' ) {
103+ this . _pruneInterval = setInterval ( ( ) => this . pruneMessageStore ( ) , this . _config . resumability . historyDuration / 3 ) ;
104+ }
97105 }
98106
99107 private getCorsHeaders ( req : IncomingMessage , includeMaxAge : boolean = false ) : Record < string , string > {
@@ -419,7 +427,7 @@ export class HttpStreamTransport extends AbstractTransport {
419427 const streamId = randomUUID ( ) ;
420428 const connection : ActiveSseConnection = {
421429 res, sessionId, streamId, lastEventIdSent : null ,
422- messageHistory : this . _config . resumability . enabled ? [ ] : undefined ,
430+ messageHistory : this . _config . resumability . enabled && this . _config . resumability . messageStoreType === 'connection' ? [ ] : undefined ,
423431 pingInterval : undefined ,
424432 isPostConnection
425433 } ;
@@ -438,7 +446,7 @@ export class HttpStreamTransport extends AbstractTransport {
438446 res . write ( ': stream opened\n\n' ) ;
439447 connection . pingInterval = setInterval ( ( ) => this . sendPing ( connection ) , 15000 ) ;
440448 if ( lastEventId && this . _config . resumability . enabled ) {
441- this . handleResumption ( connection , lastEventId ) . catch ( err => { logger . error ( `Error during stream resumption for ${ streamId } : ${ err . message } ` ) ; this . cleanupConnection ( connection , `Resumption error: ${ err . message } ` ) ; } ) ;
449+ this . handleResumption ( connection , lastEventId , sessionId ) . catch ( err => { logger . error ( `Error during stream resumption for ${ streamId } : ${ err . message } ` ) ; this . cleanupConnection ( connection , `Resumption error: ${ err . message } ` ) ; } ) ;
442450 }
443451 const cleanupHandler = ( reason : string ) => { if ( connection . pingInterval ) { clearInterval ( connection . pingInterval ) ; connection . pingInterval = undefined ; } this . cleanupConnection ( connection , reason ) ; } ;
444452 res . on ( "close" , ( ) => cleanupHandler ( "Client closed connection" ) ) ;
@@ -572,12 +580,16 @@ export class HttpStreamTransport extends AbstractTransport {
572580 if ( this . _config . resumability . enabled ) {
573581 eventId = `${ Date . now ( ) } -${ this . _eventCounter ++ } ` ;
574582 targetConnection . lastEventIdSent = eventId ;
575- if ( targetConnection . messageHistory ) {
583+
584+ this . storeMessage ( message , targetConnection . sessionId , eventId ) ;
585+
586+ if ( this . _config . resumability . messageStoreType === 'connection' && targetConnection . messageHistory ) {
576587 const timestamp = Date . now ( ) ;
577588 targetConnection . messageHistory . push ( { eventId, message, timestamp } ) ;
578589 const cutoff = timestamp - this . _config . resumability . historyDuration ;
579590 targetConnection . messageHistory = targetConnection . messageHistory . filter ( entry => entry . timestamp >= cutoff ) ;
580591 }
592+
581593 logger . debug ( `Sending SSE event ID: ${ eventId } on stream ${ targetConnection . streamId } ` ) ;
582594 targetConnection . res . write ( `id: ${ eventId } \n` ) ;
583595 }
@@ -641,24 +653,64 @@ export class HttpStreamTransport extends AbstractTransport {
641653 return session ;
642654 }
643655
644- private async handleResumption ( connection : ActiveSseConnection , lastEventId : string ) : Promise < void > {
656+ private async handleResumption ( connection : ActiveSseConnection , lastEventId : string , sessionId ?: string ) : Promise < void > {
645657 logger . info ( `Attempting resume stream ${ connection . streamId } from event ${ lastEventId } ` ) ;
646- if ( ! connection . messageHistory || ! this . _config . resumability . enabled ) { logger . warn ( `Resume requested for ${ connection . streamId } , but history unavailable/disabled. Starting fresh.` ) ; return ; }
647- const history = connection . messageHistory ;
648- const lastReceivedIndex = history . findIndex ( entry => entry . eventId === lastEventId ) ;
649- if ( lastReceivedIndex === - 1 ) { logger . warn ( `Event ${ lastEventId } not found in history for ${ connection . streamId } . Starting fresh.` ) ; return ; }
650- const messagesToReplay = history . slice ( lastReceivedIndex + 1 ) ;
651- if ( messagesToReplay . length === 0 ) { logger . info ( `Event ${ lastEventId } was last known event for ${ connection . streamId } . No replay needed.` ) ; return ; }
658+
659+ let messagesToReplay : MessageEntry [ ] = [ ] ;
660+
661+ if ( this . _config . resumability . messageStoreType === 'global' ) {
662+ if ( ! this . _config . resumability . enabled ) {
663+ logger . warn ( `Resume requested for ${ connection . streamId } , but resumability is disabled. Starting fresh.` ) ;
664+ return ;
665+ }
666+
667+ messagesToReplay = this . getMessagesAfterEvent ( sessionId , lastEventId ) ;
668+
669+ if ( messagesToReplay . length === 0 ) {
670+ logger . warn ( `Event ${ lastEventId } not found in global message store for session ${ sessionId || 'N/A' } . Starting fresh.` ) ;
671+ return ;
672+ }
673+ } else if ( this . _config . resumability . messageStoreType === 'connection' ) {
674+ if ( ! connection . messageHistory || ! this . _config . resumability . enabled ) {
675+ logger . warn ( `Resume requested for ${ connection . streamId } , but history unavailable/disabled. Starting fresh.` ) ;
676+ return ;
677+ }
678+
679+ const history = connection . messageHistory ;
680+ const lastReceivedIndex = history . findIndex ( entry => entry . eventId === lastEventId ) ;
681+
682+ if ( lastReceivedIndex === - 1 ) {
683+ logger . warn ( `Event ${ lastEventId } not found in history for ${ connection . streamId } . Starting fresh.` ) ;
684+ return ;
685+ }
686+
687+ messagesToReplay = history . slice ( lastReceivedIndex + 1 ) ;
688+ }
689+
690+ if ( messagesToReplay . length === 0 ) {
691+ logger . info ( `Event ${ lastEventId } was last known event for ${ connection . streamId } . No replay needed.` ) ;
692+ return ;
693+ }
694+
652695 logger . info ( `Replaying ${ messagesToReplay . length } messages for stream ${ connection . streamId } ` ) ;
696+
653697 for ( const entry of messagesToReplay ) {
654- if ( ! connection . res || connection . res . writableEnded ) { logger . warn ( `Stream ${ connection . streamId } closed during replay. Aborting.` ) ; return ; }
698+ if ( ! connection . res || connection . res . writableEnded ) {
699+ logger . warn ( `Stream ${ connection . streamId } closed during replay. Aborting.` ) ;
700+ return ;
701+ }
655702 try {
656703 logger . debug ( `Replaying event ${ entry . eventId } ` ) ;
657704 connection . res . write ( `id: ${ entry . eventId } \n` ) ;
658705 connection . res . write ( `data: ${ JSON . stringify ( entry . message ) } \n\n` ) ;
659706 connection . lastEventIdSent = entry . eventId ;
660- } catch ( error : any ) { logger . error ( `Error replaying message ${ entry . eventId } to ${ connection . streamId } : ${ error . message } . Aborting.` ) ; this . cleanupConnection ( connection , `Replay write error: ${ error . message } ` ) ; return ; }
707+ } catch ( error : any ) {
708+ logger . error ( `Error replaying message ${ entry . eventId } to ${ connection . streamId } : ${ error . message } . Aborting.` ) ;
709+ this . cleanupConnection ( connection , `Replay write error: ${ error . message } ` ) ;
710+ return ;
711+ }
661712 }
713+
662714 logger . info ( `Finished replaying messages for stream ${ connection . streamId } ` ) ;
663715 }
664716
@@ -699,7 +751,14 @@ export class HttpStreamTransport extends AbstractTransport {
699751
700752 async close ( ) : Promise < void > {
701753 logger . info ( "Closing HttpStreamTransport..." ) ;
754+
755+ if ( this . _pruneInterval ) {
756+ clearInterval ( this . _pruneInterval ) ;
757+ this . _pruneInterval = undefined ;
758+ }
759+
702760 this . cleanupAllConnections ( ) ;
761+
703762 return new Promise ( ( resolve , reject ) => {
704763 if ( this . _server ) {
705764 const server = this . _server ; this . _server = undefined ;
@@ -709,4 +768,61 @@ export class HttpStreamTransport extends AbstractTransport {
709768 } ) ;
710769 }
711770 isRunning ( ) : boolean { return Boolean ( this . _server ?. listening ) ; }
771+
772+ private storeMessage ( message : JsonRpcMessage , sessionId : string | undefined , eventId : string ) : void {
773+ if ( ! this . _config . resumability . enabled ) return ;
774+
775+ const timestamp = Date . now ( ) ;
776+ const messageEntry : MessageEntry = { eventId, message, timestamp } ;
777+
778+ if ( this . _config . resumability . messageStoreType === 'global' && sessionId ) {
779+ if ( ! this . _globalMessageStore . has ( sessionId ) ) {
780+ this . _globalMessageStore . set ( sessionId , new Map ( ) ) ;
781+ }
782+ this . _globalMessageStore . get ( sessionId ) ! . set ( eventId , messageEntry ) ;
783+ }
784+ }
785+
786+ private pruneMessageStore ( ) : void {
787+ if ( ! this . _config . resumability . enabled || this . _config . resumability . messageStoreType !== 'global' ) return ;
788+
789+ const cutoff = Date . now ( ) - this . _config . resumability . historyDuration ;
790+
791+ for ( const [ sessionId , messages ] of this . _globalMessageStore . entries ( ) ) {
792+ let expired = 0 ;
793+ for ( const [ eventId , entry ] of messages . entries ( ) ) {
794+ if ( entry . timestamp < cutoff ) {
795+ messages . delete ( eventId ) ;
796+ expired ++ ;
797+ }
798+ }
799+
800+ if ( messages . size === 0 ) {
801+ this . _globalMessageStore . delete ( sessionId ) ;
802+ } else if ( expired > 0 ) {
803+ logger . debug ( `Pruned ${ expired } expired messages for session ${ sessionId } ` ) ;
804+ }
805+ }
806+ }
807+
808+ private getMessagesAfterEvent ( sessionId : string | undefined , lastEventId : string ) : MessageEntry [ ] {
809+ if ( ! sessionId || ! this . _config . resumability . enabled ||
810+ this . _config . resumability . messageStoreType !== 'global' ||
811+ ! this . _globalMessageStore . has ( sessionId ) ) {
812+ return [ ] ;
813+ }
814+
815+ const messages = this . _globalMessageStore . get ( sessionId ) ! ;
816+
817+ const allEntries = Array . from ( messages . values ( ) )
818+ . sort ( ( a , b ) => a . timestamp - b . timestamp ) ;
819+
820+ const lastReceivedIndex = allEntries . findIndex ( entry => entry . eventId === lastEventId ) ;
821+
822+ if ( lastReceivedIndex === - 1 ) {
823+ return [ ] ;
824+ }
825+
826+ return allEntries . slice ( lastReceivedIndex + 1 ) ;
827+ }
712828}
0 commit comments