Skip to content

Commit

Permalink
Fix update domain replication ack level (#4212)
Browse files Browse the repository at this point in the history
* Fix update domain replication ack level
  • Loading branch information
yux0 authored May 24, 2021
1 parent edfb9ed commit 61399e5
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 45 deletions.
50 changes: 17 additions & 33 deletions common/domain/replication_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,25 @@ func NewReplicationQueue(
logger log.Logger,
) ReplicationQueue {
return &replicationQueueImpl{
queue: queue,
clusterName: clusterName,
metricsClient: metricsClient,
logger: logger,
encoder: codec.NewThriftRWEncoder(),
ackNotificationChan: make(chan bool),
done: make(chan bool),
status: common.DaemonStatusInitialized,
queue: queue,
clusterName: clusterName,
metricsClient: metricsClient,
logger: logger,
encoder: codec.NewThriftRWEncoder(),
done: make(chan bool),
status: common.DaemonStatusInitialized,
}
}

type (
replicationQueueImpl struct {
queue persistence.QueueManager
clusterName string
metricsClient metrics.Client
logger log.Logger
encoder codec.BinaryEncoder
ackLevelUpdated bool
ackNotificationChan chan bool
done chan bool
status int32
queue persistence.QueueManager
clusterName string
metricsClient metrics.Client
logger log.Logger
encoder codec.BinaryEncoder
done chan bool
status int32
}

// ReplicationQueue is used to publish and list domain replication tasks
Expand Down Expand Up @@ -177,16 +174,10 @@ func (q *replicationQueueImpl) UpdateAckLevel(
clusterName string,
) error {

err := q.queue.UpdateAckLevel(ctx, lastProcessedMessageID, clusterName)
if err != nil {
if err := q.queue.UpdateAckLevel(ctx, lastProcessedMessageID, clusterName); err != nil {
return fmt.Errorf("failed to update ack level: %v", err)
}

select {
case q.ackNotificationChan <- true:
default:
}

return nil
}

Expand Down Expand Up @@ -305,16 +296,9 @@ func (q *replicationQueueImpl) purgeProcessor() {
case <-q.done:
return
case <-ticker.C:
if q.ackLevelUpdated {
err := q.purgeAckedMessages()
if err != nil {
q.logger.Warn("Failed to purge acked domain replication messages.", tag.Error(err))
} else {
q.ackLevelUpdated = false
}
if err := q.purgeAckedMessages(); err != nil {
q.logger.Warn("Failed to purge acked domain replication messages.", tag.Error(err))
}
case <-q.ackNotificationChan:
q.ackLevelUpdated = true
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/cassandra/cassandraQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (q *nosqlQueue) updateAckLevel(
}

// Ignore possibly delayed message
if queueMetadata.ClusterAckLevels[clusterName] > messageID {
if ackLevel, ok := queueMetadata.ClusterAckLevels[clusterName]; ok && ackLevel >= messageID {
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions common/persistence/persistence-tests/queuePersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (s *QueuePersistenceSuite) TestQueueMetadataOperations() {
err = s.UpdateAckLevel(ctx, 25, "test2")
s.Require().NoError(err)

err = s.UpdateAckLevel(ctx, 24, "test2")
s.Require().NoError(err)

clusterAckLevels, err = s.GetAckLevels(ctx)
s.Require().NoError(err)
s.Assert().Len(clusterAckLevels, 2)
Expand Down
5 changes: 2 additions & 3 deletions common/persistence/sql/sqlQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ package sql

import (
"context"
"fmt"

"database/sql"
"fmt"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -133,7 +132,7 @@ func (q *sqlQueue) UpdateAckLevel(
}

// Ignore possibly delayed message
if clusterAckLevels[clusterName] > messageID {
if ackLevel, ok := clusterAckLevels[clusterName]; ok && ackLevel >= messageID {
return nil
}

Expand Down
12 changes: 4 additions & 8 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,14 +657,10 @@ func (adh *adminHandlerImpl) GetDomainReplicationMessages(
if request.LastProcessedMessageID != nil {
lastProcessedMessageID = request.GetLastProcessedMessageID()
}

if lastProcessedMessageID != defaultLastMessageID {
err := adh.GetDomainReplicationQueue().UpdateAckLevel(ctx, lastProcessedMessageID, request.GetClusterName())
if err != nil {
adh.GetLogger().Warn("Failed to update domain replication queue ack level.",
tag.TaskID(int64(lastProcessedMessageID)),
tag.ClusterName(request.GetClusterName()))
}
if err := adh.GetDomainReplicationQueue().UpdateAckLevel(ctx, lastProcessedMessageID, request.GetClusterName()); err != nil {
adh.GetLogger().Warn("Failed to update domain replication queue ack level.",
tag.TaskID(int64(lastProcessedMessageID)),
tag.ClusterName(request.GetClusterName()))
}

return &types.GetDomainReplicationMessagesResponse{
Expand Down
4 changes: 4 additions & 0 deletions service/worker/replicator/domain_replication_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (

func newDomainReplicationProcessor(
sourceCluster string,
currentCluster string,
logger log.Logger,
remotePeer admin.Client,
metricsClient metrics.Client,
Expand All @@ -67,6 +68,7 @@ func newDomainReplicationProcessor(
serviceResolver: serviceResolver,
status: common.DaemonStatusInitialized,
sourceCluster: sourceCluster,
currentCluster: currentCluster,
logger: logger,
remotePeer: remotePeer,
taskExecutor: taskExecutor,
Expand All @@ -85,6 +87,7 @@ type (
serviceResolver membership.ServiceResolver
status int32
sourceCluster string
currentCluster string
logger log.Logger
remotePeer admin.Client
taskExecutor domain.ReplicationTaskExecutor
Expand Down Expand Up @@ -141,6 +144,7 @@ func (p *domainReplicationProcessor) fetchDomainReplicationTasks() {
request := &types.GetDomainReplicationMessagesRequest{
LastRetrievedMessageID: common.Int64Ptr(p.lastRetrievedMessageID),
LastProcessedMessageID: common.Int64Ptr(p.lastProcessedMessageID),
ClusterName: p.currentCluster,
}
response, err := p.remotePeer.GetDomainReplicationMessages(ctx, request)
defer cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type domainReplicationSuite struct {
controller *gomock.Controller

sourceCluster string
currentCluster string
taskExecutor *domain.MockReplicationTaskExecutor
remoteClient *admin.MockClient
domainReplicationQueue *domain.MockReplicationQueue
Expand All @@ -62,13 +63,15 @@ func (s *domainReplicationSuite) SetupTest() {
resource := resource.NewTest(s.controller, metrics.Worker)

s.sourceCluster = "active"
s.currentCluster = "standby"
s.taskExecutor = domain.NewMockReplicationTaskExecutor(s.controller)
s.domainReplicationQueue = domain.NewMockReplicationQueue(s.controller)
s.remoteClient = resource.RemoteAdminClient
serviceResolver := resource.WorkerServiceResolver
serviceResolver.EXPECT().Lookup(s.sourceCluster).Return(resource.GetHostInfo(), nil).AnyTimes()
s.replicationProcessor = newDomainReplicationProcessor(
s.sourceCluster,
s.currentCluster,
resource.GetLogger(),
s.remoteClient,
resource.GetMetricsClient(),
Expand Down
1 change: 1 addition & 0 deletions service/worker/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (r *Replicator) Start() error {
if clusterName != currentClusterName {
processor := newDomainReplicationProcessor(
clusterName,
currentClusterName,
r.logger.WithTags(tag.ComponentReplicationTaskProcessor, tag.SourceCluster(clusterName)),
r.clientBean.GetRemoteAdminClient(clusterName),
r.metricsClient,
Expand Down

0 comments on commit 61399e5

Please sign in to comment.