1- import { ChainableCommander } from 'ioredis'
1+ import { Pipeline } from 'ioredis'
22import { RedisStream } from './stream.js'
33import mkDebug from 'debug'
44import { XBatchResult , XStreamResult } from './types.js'
@@ -21,11 +21,6 @@ export async function readAckDelete(
2121 ack ( pipeline , stream )
2222 read ( pipeline , stream )
2323 const responses = await pipeline . exec ( )
24-
25- if ( ! responses ) {
26- return
27- }
28-
2924 //TODO NOGROUP the consumer group this client was blocked on no longer exists
3025 for ( const result of responses ) {
3126 if ( result [ 0 ] && ! result [ 0 ] ?. message . startsWith ( 'BUSYGROUP' ) ) {
@@ -47,10 +42,7 @@ export async function readAckDelete(
4742 }
4843}
4944
50- function ack (
51- client : ChainableCommander ,
52- { deleteOnAck, pendingAcks, group } : RedisStream < KindaAny >
53- ) : void {
45+ function ack ( client : Pipeline , { deleteOnAck, pendingAcks, group } : RedisStream < KindaAny > ) : void {
5446 if ( ! group || ! pendingAcks . size ) return
5547 for ( const [ stream , ids ] of pendingAcks ) {
5648 client . xack ( stream , group , ...ids )
@@ -59,14 +51,11 @@ function ack(
5951 pendingAcks . clear ( )
6052}
6153
62- function xgroup (
63- client : ChainableCommander ,
64- { group, streams, first } : RedisStream < KindaAny >
65- ) : void {
54+ function xgroup ( client : Pipeline , { group, streams, first } : RedisStream < KindaAny > ) : void {
6655 if ( ! first || ! group ) return
6756 for ( const [ key , start ] of streams ) {
6857 debug ( `xgroup create ${ key } ${ group } ${ start } mkstream` )
69- client . xgroup ( 'CREATE ' , key , group , start , 'MKSTREAM ' )
58+ client . xgroup ( 'create ' , key , group , start , 'mkstream ' )
7059 }
7160}
7261
0 commit comments