Skip to content

Commit

Permalink
Adds fix for domain ack level issue (#5001)
Browse files Browse the repository at this point in the history
Adds fix for domain replication by ensuring (non-transactionally) that replication messages are higher then ack levels
  • Loading branch information
davidporter-id-au authored Sep 26, 2022
1 parent 6952083 commit 2a0afef
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 3 deletions.
22 changes: 20 additions & 2 deletions common/persistence/nosql/nosqlQueueStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func (q *nosqlQueueStore) createQueueMetadataEntryIfNotExist() error {
return nil
}

// Warning: This is not a safe concurrent operation in its current state.
// It's only used for domain replication at the moment, but needs a conditional write guard
// for concurrent use
func (q *nosqlQueueStore) EnqueueMessage(
ctx context.Context,
messagePayload []byte,
Expand All @@ -100,8 +103,11 @@ func (q *nosqlQueueStore) EnqueueMessage(
if err != nil {
return err
}

_, err = q.tryEnqueue(ctx, q.queueType, lastMessageID+1, messagePayload)
ackLevels, err := q.GetAckLevels(ctx)
if err != nil {
return err
}
_, err = q.tryEnqueue(ctx, q.queueType, getNextID(ackLevels, lastMessageID), messagePayload)
return err
}

Expand Down Expand Up @@ -375,3 +381,15 @@ func (q *nosqlQueueStore) updateAckLevel(
}
return nil
}

// if, for whatever reason, the ack-levels get ahead of the actual messages
// then ensure the next ID follows
func getNextID(acks map[string]int64, lastMessageID int64) int64 {
o := lastMessageID
for _, v := range acks {
if v > o {
o = v
}
}
return o + 1
}
59 changes: 59 additions & 0 deletions common/persistence/nosql/nosqlQueueStore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package nosql

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetNextID(t *testing.T) {
tests := map[string]struct {
acks map[string]int64
lastID int64
expected int64
}{
"expected case - last ID is equal to ack-levels": {
acks: map[string]int64{"a": 3},
lastID: 3,
expected: 4,
},
"expected case - last ID is equal to ack-levels haven't caught up": {
acks: map[string]int64{"a": 2},
lastID: 3,
expected: 4,
},
"error case - ack-levels are ahead for some reason": {
acks: map[string]int64{"a": 3},
lastID: 2,
expected: 4,
},
}

for name, td := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, td.expected, getNextID(td.acks, td.lastID))
})
}
}
19 changes: 18 additions & 1 deletion common/persistence/sql/sqlQueueStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ func (q *sqlQueueStore) EnqueueMessage(
}
}

_, err = tx.InsertIntoQueue(ctx, newQueueRow(q.queueType, lastMessageID+1, messagePayload))
ackLevels, err := tx.GetAckLevels(ctx, q.queueType, true)
if err != nil {
return err
}

_, err = tx.InsertIntoQueue(ctx, newQueueRow(q.queueType, getNextID(ackLevels, lastMessageID), messagePayload))
return err
})
}
Expand Down Expand Up @@ -272,3 +277,15 @@ func (q *sqlQueueStore) GetDLQSize(
func (q *sqlQueueStore) getDLQTypeFromQueueType() persistence.QueueType {
return -q.queueType
}

// if, for whatever reason, the ack-levels get ahead of the actual messages
// then ensure the next ID follows
func getNextID(acks map[string]int64, lastMessageID int64) int64 {
o := lastMessageID
for _, v := range acks {
if v > o {
o = v
}
}
return o + 1
}
59 changes: 59 additions & 0 deletions common/persistence/sql/sqlQueueStore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package sql

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetNextID(t *testing.T) {
tests := map[string]struct {
acks map[string]int64
lastID int64
expected int64
}{
"expected case - last ID is equal to ack-levels": {
acks: map[string]int64{"a": 3},
lastID: 3,
expected: 4,
},
"expected case - last ID is equal to ack-levels haven't caught up": {
acks: map[string]int64{"a": 2},
lastID: 3,
expected: 4,
},
"error case - ack-levels are ahead for some reason": {
acks: map[string]int64{"a": 3},
lastID: 2,
expected: 4,
},
}

for name, td := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, td.expected, getNextID(td.acks, td.lastID))
})
}
}

0 comments on commit 2a0afef

Please sign in to comment.