From 2a0afef4784c915eeae4f6072ce0d57de16ddbb8 Mon Sep 17 00:00:00 2001 From: David Porter Date: Tue, 27 Sep 2022 07:36:15 +1000 Subject: [PATCH] Adds fix for domain ack level issue (#5001) Adds fix for domain replication by ensuring (non-transactionally) that replication messages are higher then ack levels --- common/persistence/nosql/nosqlQueueStore.go | 22 ++++++- .../persistence/nosql/nosqlQueueStore_test.go | 59 +++++++++++++++++++ common/persistence/sql/sqlQueueStore.go | 19 +++++- common/persistence/sql/sqlQueueStore_test.go | 59 +++++++++++++++++++ 4 files changed, 156 insertions(+), 3 deletions(-) create mode 100644 common/persistence/nosql/nosqlQueueStore_test.go create mode 100644 common/persistence/sql/sqlQueueStore_test.go diff --git a/common/persistence/nosql/nosqlQueueStore.go b/common/persistence/nosql/nosqlQueueStore.go index 8d1f800197c..65d4f86ce70 100644 --- a/common/persistence/nosql/nosqlQueueStore.go +++ b/common/persistence/nosql/nosqlQueueStore.go @@ -92,6 +92,9 @@ func (q *nosqlQueueStore) createQueueMetadataEntryIfNotExist() error { return nil } +// Warning: This is not a safe concurrent operation in its current state. +// It's only used for domain replication at the moment, but needs a conditional write guard +// for concurrent use func (q *nosqlQueueStore) EnqueueMessage( ctx context.Context, messagePayload []byte, @@ -100,8 +103,11 @@ func (q *nosqlQueueStore) EnqueueMessage( if err != nil { return err } - - _, err = q.tryEnqueue(ctx, q.queueType, lastMessageID+1, messagePayload) + ackLevels, err := q.GetAckLevels(ctx) + if err != nil { + return err + } + _, err = q.tryEnqueue(ctx, q.queueType, getNextID(ackLevels, lastMessageID), messagePayload) return err } @@ -375,3 +381,15 @@ func (q *nosqlQueueStore) updateAckLevel( } return nil } + +// if, for whatever reason, the ack-levels get ahead of the actual messages +// then ensure the next ID follows +func getNextID(acks map[string]int64, lastMessageID int64) int64 { + o := lastMessageID + for _, v := range acks { + if v > o { + o = v + } + } + return o + 1 +} diff --git a/common/persistence/nosql/nosqlQueueStore_test.go b/common/persistence/nosql/nosqlQueueStore_test.go new file mode 100644 index 00000000000..d92f317fdb1 --- /dev/null +++ b/common/persistence/nosql/nosqlQueueStore_test.go @@ -0,0 +1,59 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package nosql + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetNextID(t *testing.T) { + tests := map[string]struct { + acks map[string]int64 + lastID int64 + expected int64 + }{ + "expected case - last ID is equal to ack-levels": { + acks: map[string]int64{"a": 3}, + lastID: 3, + expected: 4, + }, + "expected case - last ID is equal to ack-levels haven't caught up": { + acks: map[string]int64{"a": 2}, + lastID: 3, + expected: 4, + }, + "error case - ack-levels are ahead for some reason": { + acks: map[string]int64{"a": 3}, + lastID: 2, + expected: 4, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + assert.Equal(t, td.expected, getNextID(td.acks, td.lastID)) + }) + } +} diff --git a/common/persistence/sql/sqlQueueStore.go b/common/persistence/sql/sqlQueueStore.go index 884653fdfcf..4abcdb38eda 100644 --- a/common/persistence/sql/sqlQueueStore.go +++ b/common/persistence/sql/sqlQueueStore.go @@ -68,7 +68,12 @@ func (q *sqlQueueStore) EnqueueMessage( } } - _, err = tx.InsertIntoQueue(ctx, newQueueRow(q.queueType, lastMessageID+1, messagePayload)) + ackLevels, err := tx.GetAckLevels(ctx, q.queueType, true) + if err != nil { + return err + } + + _, err = tx.InsertIntoQueue(ctx, newQueueRow(q.queueType, getNextID(ackLevels, lastMessageID), messagePayload)) return err }) } @@ -272,3 +277,15 @@ func (q *sqlQueueStore) GetDLQSize( func (q *sqlQueueStore) getDLQTypeFromQueueType() persistence.QueueType { return -q.queueType } + +// if, for whatever reason, the ack-levels get ahead of the actual messages +// then ensure the next ID follows +func getNextID(acks map[string]int64, lastMessageID int64) int64 { + o := lastMessageID + for _, v := range acks { + if v > o { + o = v + } + } + return o + 1 +} diff --git a/common/persistence/sql/sqlQueueStore_test.go b/common/persistence/sql/sqlQueueStore_test.go new file mode 100644 index 00000000000..f5ed3cadad8 --- /dev/null +++ b/common/persistence/sql/sqlQueueStore_test.go @@ -0,0 +1,59 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package sql + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetNextID(t *testing.T) { + tests := map[string]struct { + acks map[string]int64 + lastID int64 + expected int64 + }{ + "expected case - last ID is equal to ack-levels": { + acks: map[string]int64{"a": 3}, + lastID: 3, + expected: 4, + }, + "expected case - last ID is equal to ack-levels haven't caught up": { + acks: map[string]int64{"a": 2}, + lastID: 3, + expected: 4, + }, + "error case - ack-levels are ahead for some reason": { + acks: map[string]int64{"a": 3}, + lastID: 2, + expected: 4, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + assert.Equal(t, td.expected, getNextID(td.acks, td.lastID)) + }) + } +}