Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] (2.11) Upgrade path for deterministic clustered dedupe #6426

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ const (
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
BinaryStreamSnapshot // New stream snapshot capability.
AccountNRG // Move NRG traffic out of system account.
DeferClusteredDedupe // Defer de-duplication to replicas under certain conditions.
)

// Set JetStream capability.
Expand Down Expand Up @@ -301,6 +302,17 @@ func (si *ServerInfo) AccountNRG() bool {
return si.Flags&AccountNRG != 0
}

// Set deferred clustered dedupe capability.
func (si *ServerInfo) SetDeferClusteredDedupe() {
si.Flags |= DeferClusteredDedupe
}

// DeferClusteredDedupe indicates whether we support deferring de-duplication
// of multiple inflight messages with the same ID to replicas.
func (si *ServerInfo) DeferClusteredDedupe() bool {
return si.Flags&DeferClusteredDedupe != 0
}

// ClientInfo is detailed information about the client forming a connection.
type ClientInfo struct {
Start *time.Time `json:"start,omitempty"`
Expand Down Expand Up @@ -527,6 +539,7 @@ RESET:
if s.accountNRGAllowed.Load() {
si.SetAccountNRG()
}
si.SetDeferClusteredDedupe()
}
}
var b []byte
Expand Down Expand Up @@ -1691,6 +1704,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
accountNRG,
si.DeferClusteredDedupe(),
})
if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG {
// One of the servers we received statsz from changed its mind about
Expand Down Expand Up @@ -1745,6 +1759,7 @@ func (s *Server) processNewServer(si *ServerInfo) {
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
si.AccountNRG(),
si.DeferClusteredDedupe(),
})
}
}
Expand Down
27 changes: 27 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7857,6 +7857,29 @@ func (mset *stream) supportsBinarySnapshotLocked() bool {
return true
}

// Determine if all peers in our set support having multiple inflight proposals for the same ID
// while the deduplication is not yet finalized. During this condition multiple proposals for
// the same ID would be sent, and it relies on all replicas to de-dupe on their own during this period.
// Lock should be held.
func (mset *stream) supportsDeferredDeduplication() bool {
s, n := mset.srv, mset.node
if s == nil || n == nil {
return false
}
// Grab our peers and walk them to make sure we can all support binary stream snapshots.
id, peers := n.ID(), n.Peers()
for _, p := range peers {
if p.ID == id {
// We know we support ourselves.
continue
}
if sir, ok := s.nodeToInfo.Load(p.ID); ok && sir != nil && !sir.(nodeInfo).deferDedupe {
return false
}
}
return true
}

// StreamSnapshot is used for snapshotting and out of band catch up in clustered mode.
// Legacy, replace with binary stream snapshots.
type streamSnapshot struct {
Expand Down Expand Up @@ -8062,6 +8085,10 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// We used to stage with zero, but it's hard to correctly remove it during leader elections
// while taking quorum/truncation into account. So instead let duplicates through and handle
// duplicates later. Only if we know the sequence we can start blocking above.
// Unless, not all servers support this feature, in which case we still stage zero and block above.
if !mset.supportsDeferredDeduplication() {
mset.storeMsgIdLocked(&ddentry{msgId, 0, time.Now().UnixNano()})
}
mset.mu.Unlock()
}

Expand Down
87 changes: 87 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3219,6 +3219,16 @@ func TestJetStreamClusterPubAckSequenceDupeAsync(t *testing.T) {
})
require_NoError(t, err)

sl := c.streamLeader(globalAccountName, "TEST_STREAM")
acc, err := sl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST_STREAM")
require_NoError(t, err)
mset.mu.RLock()
supportsDeferredDeduplication := mset.supportsDeferredDeduplication()
mset.mu.RUnlock()
require_True(t, supportsDeferredDeduplication)

msgData := []byte("...")

for seq := uint64(1); seq < 10; seq++ {
Expand Down Expand Up @@ -3301,6 +3311,83 @@ func TestJetStreamClusterPubAckSequenceDupeDeterministic(t *testing.T) {
require_NoError(t, err)
}

// Similar implementation to TestJetStreamClusterPubAckSequenceDupeAsync, but confirming
// the old behavior is kept during upgrades.
// TODO: 123
func TestJetStreamClusterPubAckSequenceDupeUpgradePath(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST_STREAM",
Subjects: []string{"TEST_SUBJECT"},
Replicas: 3,
Duplicates: 1 * time.Minute,
})
require_NoError(t, err)

sl := c.streamLeader(globalAccountName, "TEST_STREAM")
acc, err := sl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST_STREAM")
require_NoError(t, err)

rn := mset.raftNode().(*raft)
rn.Lock()
peers := rn.peerNames()
rn.Unlock()
rn.UpdateKnownPeers(append(peers, "test"))

sl.nodeToInfo.Store("test", nodeInfo{})

mset.mu.RLock()
supportsDeferredDeduplication := mset.supportsDeferredDeduplication()
mset.mu.RUnlock()
require_False(t, supportsDeferredDeduplication)

msgData := []byte("...")

for seq := uint64(1); seq < 10; seq++ {

msgSubject := "TEST_SUBJECT"
msgIdOpt := nats.MsgId(nuid.Next())

wg := sync.WaitGroup{}
wg.Add(2)

// Fire off 2 publish requests in parallel
// The first one "stages" a duplicate entry before even proposing the message
// The second one gets a pubAck with sequence zero by hitting the staged duplicated entry

pubAcks := [2]*nats.PubAck{}
for i := 0; i < 2; i++ {
go func(i int) {
defer wg.Done()
var err error
pubAcks[i], err = js.Publish(msgSubject, msgData, msgIdOpt)
require_NoError(t, err)
}(i)
}

wg.Wait()

// Exactly one of the pubAck should have the expected sequence, and the other a staged zero sequence.
require_True(t, (pubAcks[0].Sequence == 0 && pubAcks[1].Sequence == seq) || (pubAcks[0].Sequence == seq && pubAcks[1].Sequence == 0))

// Exactly one of the pubAck should be marked dupe
require_True(t, (pubAcks[0].Duplicate || pubAcks[1].Duplicate) && (pubAcks[0].Duplicate != pubAcks[1].Duplicate))
}

// Ensure there are no duplicate entries in the de-dupe map.
mset.mu.RLock()
defer mset.mu.RUnlock()
require_Len(t, len(mset.ddmap), 9)
require_Len(t, len(mset.ddarr), 9)
}

func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {

const (
Expand Down
2 changes: 1 addition & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -2272,7 +2272,7 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod
// check to be consistent and future proof. but will be same domain
if s.sameDomain(info.Domain) {
s.nodeToInfo.Store(rHash,
nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false})
nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false, false})
}
}

Expand Down
3 changes: 2 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ type nodeInfo struct {
js bool
binarySnapshots bool
accountNRG bool
deferDedupe bool
}

// Make sure all are 64bits for atomic use
Expand Down Expand Up @@ -787,7 +788,7 @@ func NewServer(opts *Options) (*Server, error) {
opts.Tags,
&JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true},
nil,
false, true, true, true,
false, true, true, true, true,
})
}

Expand Down
8 changes: 6 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4889,7 +4889,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if isClustered {
mset.purgeMsgIdsAtLocked(ts)
}
if dde := mset.checkMsgId(msgId); dde != nil {
if dde := mset.checkMsgId(msgId); dde != nil && dde.seq > 0 {
mset.mu.Unlock()
bumpCLFS()
if canRespond {
Expand Down Expand Up @@ -5200,7 +5200,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,

// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
if dde := mset.ddmap[msgId]; dde != nil {
dde.seq, dde.ts = seq, ts
} else {
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
}
}

// If here we succeeded in storing the message.
Expand Down