diff --git a/message/sequencer.go b/message/sequencer.go index a1e0c873..81730e14 100644 --- a/message/sequencer.go +++ b/message/sequencer.go @@ -401,40 +401,21 @@ func (w *Sequencer) Step() error { w.dequeuedClock = GetClock(uuid) if w.dequeuedClock != 0 && w.dequeuedClock <= w.emit.minClock { - if w.replayIt != nil { - continue // These can happen during replays. - } else { - // We don't allow duplicates within the ring in the first place, - // with one exception: messages with zero-valued Clocks are not - // expected to be consistently ordered on clock. - // In QueueUncommitted we synthetically assigned a clock value. - // Log a bunch of diagnostics using separate - log.WithFields(log.Fields{ - "uuid": uuid, - "dequeuedClock": w.dequeuedClock, - "emit": fmt.Sprintf("%+v", w.emit), - "partialsCount": len(w.partials), - "pendingCount": len(w.pending), - }).Error("ring clock <= emit.minClock (will log diagnostics then panic)") - log.WithField("offsets", w.offsets).Error("sequencer offsets") - log.WithField("dequeued", w.Dequeued).Error("dequeued message") - - for producer, partial := range w.partials { - log.WithFields(log.Fields{ - "producer": producer, - "partial": partial, - }).Error("partials") - } - var i = 0 - for pending, _ := range w.pending { - log.WithFields(log.Fields{ - "i": i, - "pending": pending, - }).Error("pending") - i++ - } - panic("ring clock <= emit.minClock") - } + // We don't allow duplicates within the ring with one exception: + // messages with zero-valued Clocks are not expected to be + // consistently ordered on clock (in QueueUncommitted we + // synthetically assigned a clock value). + // + // However: + // - Duplicated sequences CAN happen during replays. + // - They can ALSO happen if we dequeued during replay, + // and then observe the sequence duplicated again in the ring. + // + // While ordinarily we would discard such duplicates during ring + // maintenance operations, if the duplication straddles a recovered + // checkpoint boundary then all bets are off because checkpoints + // track only the last ACK and not the partial minClock. + continue } else if w.dequeuedClock > w.emit.maxClock { continue // ACK'd clock tells us not to commit. } diff --git a/message/sequencer_test.go b/message/sequencer_test.go index c15688e5..accde9fe 100644 --- a/message/sequencer_test.go +++ b/message/sequencer_test.go @@ -614,6 +614,51 @@ func TestSequencerProducerStatesRoundTripDuringDequeue(t *testing.T) { expectDeque(t, seq2, a5, a6ACK) } +func TestSequencerProducerStatesStraddleDuplicate(t *testing.T) { + var ( + generate = newTestMsgGenerator() + seq1 = NewSequencer(nil, nil, 12) + A = NewProducerID() + a1 = generate(A, 1, Flag_CONTINUE_TXN) + a2 = generate(A, 2, Flag_CONTINUE_TXN) + a3 = generate(A, 3, Flag_CONTINUE_TXN) + a1Dup = generate(A, 1, Flag_CONTINUE_TXN) + a2Dup = generate(A, 2, Flag_CONTINUE_TXN) + a3Dup = generate(A, 3, Flag_CONTINUE_TXN) + a4ACK = generate(A, 4, Flag_ACK_TXN) + ) + + require.Equal(t, []QueueOutcome{ + QueueContinueBeginSpan, + QueueContinueExtendSpan, + QueueContinueExtendSpan, + }, queue(seq1, a1, a2, a3)) + require.Nil(t, seq1.emit) // No messages to dequeue. + + // Take a checkpoint and then restore it. + var offsets, states = seq1.Checkpoint(0) + var seq2 = NewSequencer(offsets, states, 12) + + require.Equal(t, []QueueOutcome{ + QueueContinueTxnClockLarger, + QueueContinueTxnClockLarger, + QueueContinueTxnClockLarger, + QueueAckCommitRing, + }, queue(seq1, a1Dup, a2Dup, a3Dup, a4ACK)) + + require.Equal(t, []QueueOutcome{ + QueueContinueExtendSpan, + QueueContinueExtendSpan, + QueueContinueExtendSpan, + QueueAckCommitReplay, + }, queue(seq2, a1Dup, a2Dup, a3Dup, a4ACK)) + + expectReplay(t, seq2, a1.Begin, a1Dup.Begin, a1, a2, a3) + + expectDeque(t, seq1, a1, a2, a3, a4ACK) + expectDeque(t, seq2, a1, a2, a3, a4ACK) +} + func TestSequencerProducerPruning(t *testing.T) { var ( generate = newTestMsgGenerator()