From 1373a8fdd89b5cd0f6e53749265ef1b42bd4c3bd Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 29 Jan 2025 15:20:29 +0100 Subject: [PATCH] [FIXED] Upgrade path for deterministic clustered dedupe Signed-off-by: Maurice van Veen --- server/events.go | 15 ++++++ server/jetstream_cluster.go | 27 ++++++++++ server/jetstream_cluster_4_test.go | 87 ++++++++++++++++++++++++++++++ server/route.go | 2 +- server/server.go | 3 +- server/stream.go | 8 ++- 6 files changed, 138 insertions(+), 4 deletions(-) diff --git a/server/events.go b/server/events.go index 871c381fce4..695cb6b1e0f 100644 --- a/server/events.go +++ b/server/events.go @@ -265,6 +265,7 @@ const ( JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled. BinaryStreamSnapshot // New stream snapshot capability. AccountNRG // Move NRG traffic out of system account. + DeferClusteredDedupe // Defer de-duplication to replicas under certain conditions. ) // Set JetStream capability. @@ -301,6 +302,17 @@ func (si *ServerInfo) AccountNRG() bool { return si.Flags&AccountNRG != 0 } +// Set deferred clustered dedupe capability. +func (si *ServerInfo) SetDeferClusteredDedupe() { + si.Flags |= DeferClusteredDedupe +} + +// DeferClusteredDedupe indicates whether we support deferring de-duplication +// of multiple inflight messages with the same ID to replicas. +func (si *ServerInfo) DeferClusteredDedupe() bool { + return si.Flags&DeferClusteredDedupe != 0 +} + // ClientInfo is detailed information about the client forming a connection. type ClientInfo struct { Start *time.Time `json:"start,omitempty"` @@ -527,6 +539,7 @@ RESET: if s.accountNRGAllowed.Load() { si.SetAccountNRG() } + si.SetDeferClusteredDedupe() } } var b []byte @@ -1691,6 +1704,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su si.JetStreamEnabled(), si.BinaryStreamSnapshot(), accountNRG, + si.DeferClusteredDedupe(), }) if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG { // One of the servers we received statsz from changed its mind about @@ -1745,6 +1759,7 @@ func (s *Server) processNewServer(si *ServerInfo) { si.JetStreamEnabled(), si.BinaryStreamSnapshot(), si.AccountNRG(), + si.DeferClusteredDedupe(), }) } } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 18c26bca656..3adbe25b254 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7857,6 +7857,29 @@ func (mset *stream) supportsBinarySnapshotLocked() bool { return true } +// Determine if all peers in our set support having multiple inflight proposals for the same ID +// while the deduplication is not yet finalized. During this condition multiple proposals for +// the same ID would be sent, and it relies on all replicas to de-dupe on their own during this period. +// Lock should be held. +func (mset *stream) supportsDeferredDeduplication() bool { + s, n := mset.srv, mset.node + if s == nil || n == nil { + return false + } + // Grab our peers and walk them to make sure we can all support binary stream snapshots. + id, peers := n.ID(), n.Peers() + for _, p := range peers { + if p.ID == id { + // We know we support ourselves. + continue + } + if sir, ok := s.nodeToInfo.Load(p.ID); ok && sir != nil && !sir.(nodeInfo).deferDedupe { + return false + } + } + return true +} + // StreamSnapshot is used for snapshotting and out of band catch up in clustered mode. // Legacy, replace with binary stream snapshots. type streamSnapshot struct { @@ -8062,6 +8085,10 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // We used to stage with zero, but it's hard to correctly remove it during leader elections // while taking quorum/truncation into account. So instead let duplicates through and handle // duplicates later. Only if we know the sequence we can start blocking above. + // Unless, not all servers support this feature, in which case we still stage zero and block above. + if !mset.supportsDeferredDeduplication() { + mset.storeMsgIdLocked(&ddentry{msgId, 0, time.Now().UnixNano()}) + } mset.mu.Unlock() } diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index a60f314059d..02120d979d6 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3219,6 +3219,16 @@ func TestJetStreamClusterPubAckSequenceDupeAsync(t *testing.T) { }) require_NoError(t, err) + sl := c.streamLeader(globalAccountName, "TEST_STREAM") + acc, err := sl.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST_STREAM") + require_NoError(t, err) + mset.mu.RLock() + supportsDeferredDeduplication := mset.supportsDeferredDeduplication() + mset.mu.RUnlock() + require_True(t, supportsDeferredDeduplication) + msgData := []byte("...") for seq := uint64(1); seq < 10; seq++ { @@ -3301,6 +3311,83 @@ func TestJetStreamClusterPubAckSequenceDupeDeterministic(t *testing.T) { require_NoError(t, err) } +// Similar implementation to TestJetStreamClusterPubAckSequenceDupeAsync, but confirming +// the old behavior is kept during upgrades. +// TODO: 123 +func TestJetStreamClusterPubAckSequenceDupeUpgradePath(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST_STREAM", + Subjects: []string{"TEST_SUBJECT"}, + Replicas: 3, + Duplicates: 1 * time.Minute, + }) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST_STREAM") + acc, err := sl.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST_STREAM") + require_NoError(t, err) + + rn := mset.raftNode().(*raft) + rn.Lock() + peers := rn.peerNames() + rn.Unlock() + rn.UpdateKnownPeers(append(peers, "test")) + + sl.nodeToInfo.Store("test", nodeInfo{}) + + mset.mu.RLock() + supportsDeferredDeduplication := mset.supportsDeferredDeduplication() + mset.mu.RUnlock() + require_False(t, supportsDeferredDeduplication) + + msgData := []byte("...") + + for seq := uint64(1); seq < 10; seq++ { + + msgSubject := "TEST_SUBJECT" + msgIdOpt := nats.MsgId(nuid.Next()) + + wg := sync.WaitGroup{} + wg.Add(2) + + // Fire off 2 publish requests in parallel + // The first one "stages" a duplicate entry before even proposing the message + // The second one gets a pubAck with sequence zero by hitting the staged duplicated entry + + pubAcks := [2]*nats.PubAck{} + for i := 0; i < 2; i++ { + go func(i int) { + defer wg.Done() + var err error + pubAcks[i], err = js.Publish(msgSubject, msgData, msgIdOpt) + require_NoError(t, err) + }(i) + } + + wg.Wait() + + // Exactly one of the pubAck should have the expected sequence, and the other a staged zero sequence. + require_True(t, (pubAcks[0].Sequence == 0 && pubAcks[1].Sequence == seq) || (pubAcks[0].Sequence == seq && pubAcks[1].Sequence == 0)) + + // Exactly one of the pubAck should be marked dupe + require_True(t, (pubAcks[0].Duplicate || pubAcks[1].Duplicate) && (pubAcks[0].Duplicate != pubAcks[1].Duplicate)) + } + + // Ensure there are no duplicate entries in the de-dupe map. + mset.mu.RLock() + defer mset.mu.RUnlock() + require_Len(t, len(mset.ddmap), 9) + require_Len(t, len(mset.ddarr), 9) +} + func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { const ( diff --git a/server/route.go b/server/route.go index be63abaae6a..36dca213586 100644 --- a/server/route.go +++ b/server/route.go @@ -2272,7 +2272,7 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false}) + nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false, false}) } } diff --git a/server/server.go b/server/server.go index 030dea5f5bf..41c9cefb5f9 100644 --- a/server/server.go +++ b/server/server.go @@ -387,6 +387,7 @@ type nodeInfo struct { js bool binarySnapshots bool accountNRG bool + deferDedupe bool } // Make sure all are 64bits for atomic use @@ -787,7 +788,7 @@ func NewServer(opts *Options) (*Server, error) { opts.Tags, &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, nil, - false, true, true, true, + false, true, true, true, true, }) } diff --git a/server/stream.go b/server/stream.go index d27b10b2d46..7f39b09b648 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4889,7 +4889,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if isClustered { mset.purgeMsgIdsAtLocked(ts) } - if dde := mset.checkMsgId(msgId); dde != nil { + if dde := mset.checkMsgId(msgId); dde != nil && dde.seq > 0 { mset.mu.Unlock() bumpCLFS() if canRespond { @@ -5200,7 +5200,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // If we have a msgId make sure to save. if msgId != _EMPTY_ { - mset.storeMsgIdLocked(&ddentry{msgId, seq, ts}) + if dde := mset.ddmap[msgId]; dde != nil { + dde.seq, dde.ts = seq, ts + } else { + mset.storeMsgIdLocked(&ddentry{msgId, seq, ts}) + } } // If here we succeeded in storing the message.