Skip to content

Commit

Permalink
feat(pubsub): batch receipt modacks (#10234)
Browse files Browse the repository at this point in the history
* feat(pubsub): batch receipt modacks

* add comment on why we use success ack result

* update streaming pull retry test nack logic

* remove extra gofunc wrapper
  • Loading branch information
hongalex authored May 20, 2024
1 parent e4b2737 commit 4c2cd10
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 22 deletions.
63 changes: 43 additions & 20 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,22 @@ var (
)

type messageIterator struct {
ctx context.Context
cancel func() // the function that will cancel ctx; called in stop
po *pullOptions
ps *pullStream
subc *vkit.SubscriberClient
subName string
kaTick <-chan time.Time // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
nackTicker *time.Ticker // message nacks
pingTicker *time.Ticker // sends to the stream to keep it open
failed chan struct{} // closed on stream error
drained chan struct{} // closed when stopped && no more pending messages
wg sync.WaitGroup

ctx context.Context
cancel func() // the function that will cancel ctx; called in stop
po *pullOptions
ps *pullStream
subc *vkit.SubscriberClient
subName string
kaTick <-chan time.Time // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
nackTicker *time.Ticker // message nacks
pingTicker *time.Ticker // sends to the stream to keep it open
receiptTicker *time.Ticker // sends receipt modacks
failed chan struct{} // closed on stream error
drained chan struct{} // closed when stopped && no more pending messages
wg sync.WaitGroup

// This mutex guards the structs related to lease extension.
mu sync.Mutex
ackTimeDist *distribution.D // dist uses seconds

Expand All @@ -91,7 +93,9 @@ type messageIterator struct {
// ack IDs whose ack deadline is to be modified
// ModAcks don't have AckResults but allows reuse of the SendModAck function.
pendingModAcks map[string]*AckResult
err error // error from stream failure
// ack IDs whose receipt need to be acknowledged with a modack.
pendingReceipts map[string]*AckResult
err error // error from stream failure

eoMu sync.RWMutex
enableExactlyOnceDelivery bool
Expand Down Expand Up @@ -127,6 +131,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
ackTicker := time.NewTicker(100 * time.Millisecond)
nackTicker := time.NewTicker(100 * time.Millisecond)
pingTicker := time.NewTicker(30 * time.Second)
receiptTicker := time.NewTicker(100 * time.Millisecond)
cctx, cancel := context.WithCancel(context.Background())
cctx = withSubscriptionKey(cctx, subName)
it := &messageIterator{
Expand All @@ -140,13 +145,15 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
ackTicker: ackTicker,
nackTicker: nackTicker,
pingTicker: pingTicker,
receiptTicker: receiptTicker,
failed: make(chan struct{}),
drained: make(chan struct{}),
ackTimeDist: distribution.New(int(maxDurationPerLeaseExtension/time.Second) + 1),
keepAliveDeadlines: map[string]time.Time{},
pendingAcks: map[string]*AckResult{},
pendingNacks: map[string]*AckResult{},
pendingModAcks: map[string]*AckResult{},
pendingReceipts: map[string]*AckResult{},
}
it.wg.Add(1)
go it.sender()
Expand Down Expand Up @@ -307,11 +314,15 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
it.mu.Unlock()

if len(ackIDs) > 0 {
// When exactly once delivery is not enabled, modacks are fire and forget.
if !exactlyOnceDelivery {
go func() {
it.sendModAck(ackIDs, deadline, false)
}()
// When exactly once delivery is not enabled, modacks are fire and forget.
// Add pending receipt modacks to queue to batch with other modacks.
it.mu.Lock()
for id := range ackIDs {
// Use a SuccessAckResult (dummy) since we don't propagate modacks back to the user.
it.pendingReceipts[id] = newSuccessAckResult()
}
it.mu.Unlock()
return msgs, nil
}

Expand Down Expand Up @@ -402,6 +413,7 @@ func (it *messageIterator) sender() {
defer it.ackTicker.Stop()
defer it.nackTicker.Stop()
defer it.pingTicker.Stop()
defer it.receiptTicker.Stop()
defer func() {
if it.ps != nil {
it.ps.CloseSend()
Expand All @@ -414,6 +426,7 @@ func (it *messageIterator) sender() {
sendNacks := false
sendModAcks := false
sendPing := false
sendReceipt := false

dl := it.ackDeadline()

Expand Down Expand Up @@ -456,9 +469,12 @@ func (it *messageIterator) sender() {
it.mu.Lock()
// Ping only if we are processing messages via streaming.
sendPing = !it.po.synchronous
case <-it.receiptTicker.C:
it.mu.Lock()
sendReceipt = (len(it.pendingReceipts) > 0)
}
// Lock is held here.
var acks, nacks, modAcks map[string]*AckResult
var acks, nacks, modAcks, receipts map[string]*AckResult
if sendAcks {
acks = it.pendingAcks
it.pendingAcks = map[string]*AckResult{}
Expand All @@ -471,6 +487,10 @@ func (it *messageIterator) sender() {
modAcks = it.pendingModAcks
it.pendingModAcks = map[string]*AckResult{}
}
if sendReceipt {
receipts = it.pendingReceipts
it.pendingReceipts = map[string]*AckResult{}
}
it.mu.Unlock()
// Make Ack and ModAck RPCs.
if sendAcks {
Expand All @@ -486,6 +506,9 @@ func (it *messageIterator) sender() {
if sendPing {
it.pingStream()
}
if sendReceipt {
it.sendModAck(receipts, dl, true)
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions pubsub/streaming_pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,17 @@ func TestStreamingPullRetry(t *testing.T) {
server.wait()
for i := 0; i < len(testMessages); i++ {
id := testMessages[i].AckId
server.mu.Lock()
if i%2 == 0 {
if !server.Acked[id] {
t.Errorf("msg %q should have been acked but wasn't", id)
}
} else {
if dl, ok := server.Deadlines[id]; !ok || dl != 0 {
t.Errorf("msg %q should have been nacked but wasn't", id)
if server.Acked[id] {
t.Errorf("msg %q should have not been acked", id)
}
}
server.mu.Unlock()
}
}

Expand Down

0 comments on commit 4c2cd10

Please sign in to comment.