@@ -14,6 +14,12 @@ export type RealtimeStreamsOptions = {
1414// Legacy constant for backward compatibility (no longer written, but still recognized when reading)
1515const END_SENTINEL = "<<CLOSE_STREAM>>" ;
1616
17+ // Internal types for stream pipeline
18+ type StreamChunk =
19+ | { type : "ping" }
20+ | { type : "data" ; redisId : string ; data : string }
21+ | { type : "legacy-data" ; redisId : string ; data : string } ;
22+
1723// Class implementing both interfaces
1824export class RedisRealtimeStreams implements StreamIngestor , StreamResponder {
1925 private logger : Logger ;
@@ -28,22 +34,40 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
2834 request : Request ,
2935 runId : string ,
3036 streamId : string ,
31- signal : AbortSignal
37+ signal : AbortSignal ,
38+ lastEventId ?: string
3239 ) : Promise < Response > {
3340 const redis = new Redis ( this . options . redis ?? { } ) ;
3441 const streamKey = `stream:${ runId } :${ streamId } ` ;
3542 let isCleanedUp = false ;
3643
37- const stream = new ReadableStream ( {
44+ const stream = new ReadableStream < StreamChunk > ( {
3845 start : async ( controller ) => {
39- let lastId = "0" ;
46+ // Start from lastEventId if provided, otherwise from beginning
47+ let lastId = lastEventId || "0" ;
4048 let retryCount = 0 ;
4149 const maxRetries = 3 ;
4250 let lastDataTime = Date . now ( ) ;
51+ let lastEnqueueTime = Date . now ( ) ;
4352 const blockTimeMs = 5000 ;
53+ const pingIntervalMs = 10000 ; // 10 seconds
54+
55+ if ( lastEventId ) {
56+ this . logger . debug ( "[RealtimeStreams][streamResponse] Resuming from lastEventId" , {
57+ streamKey,
58+ lastEventId,
59+ } ) ;
60+ }
4461
4562 try {
4663 while ( ! signal . aborted ) {
64+ // Check if we need to send a ping
65+ const timeSinceLastEnqueue = Date . now ( ) - lastEnqueueTime ;
66+ if ( timeSinceLastEnqueue >= pingIntervalMs ) {
67+ controller . enqueue ( { type : "ping" } ) ;
68+ lastEnqueueTime = Date . now ( ) ;
69+ }
70+
4771 try {
4872 const messages = await redis . xread (
4973 "COUNT" ,
@@ -88,9 +112,16 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
88112 continue ;
89113 }
90114
91- controller . enqueue ( data ) ;
115+ // Enqueue structured chunk with Redis stream ID
116+ controller . enqueue ( {
117+ type : "data" ,
118+ redisId : id ,
119+ data,
120+ } ) ;
121+
92122 foundData = true ;
93123 lastDataTime = Date . now ( ) ;
124+ lastEnqueueTime = Date . now ( ) ;
94125
95126 if ( signal . aborted ) {
96127 controller . close ( ) ;
@@ -161,12 +192,31 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
161192 await cleanup ( ) ;
162193 } ,
163194 } )
164- . pipeThrough ( new LineTransformStream ( ) )
165195 . pipeThrough (
166- new TransformStream ( {
196+ // Transform 1: Split data content by newlines, preserving metadata
197+ new TransformStream < StreamChunk , StreamChunk & { line ?: string } > ( {
198+ transform ( chunk , controller ) {
199+ if ( chunk . type === "ping" ) {
200+ controller . enqueue ( chunk ) ;
201+ } else if ( chunk . type === "data" || chunk . type === "legacy-data" ) {
202+ // Split data by newlines, emit separate chunks with same metadata
203+ const lines = chunk . data . split ( "\n" ) . filter ( ( line ) => line . trim ( ) . length > 0 ) ;
204+ for ( const line of lines ) {
205+ controller . enqueue ( { ...chunk , line } ) ;
206+ }
207+ }
208+ } ,
209+ } )
210+ )
211+ . pipeThrough (
212+ // Transform 2: Format as SSE
213+ new TransformStream < StreamChunk & { line ?: string } , string > ( {
167214 transform ( chunk , controller ) {
168- for ( const line of chunk ) {
169- controller . enqueue ( `data: ${ line } \n\n` ) ;
215+ if ( chunk . type === "ping" ) {
216+ controller . enqueue ( `: ping\n\n` ) ;
217+ } else if ( ( chunk . type === "data" || chunk . type === "legacy-data" ) && chunk . line ) {
218+ // Use Redis stream ID as SSE event ID
219+ controller . enqueue ( `id: ${ chunk . redisId } \ndata: ${ chunk . line } \n\n` ) ;
170220 }
171221 } ,
172222 } )
0 commit comments