Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter replication task on source cluster #3641

Merged
merged 5 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ const (

// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
// ReplicatorMaxSkipTaskCount is maximum number of tasks that can be skipped during tasks pagination due to not meeting filtering conditions (e.g. missed namespace).
ReplicatorMaxSkipTaskCount = "history.replicatorMaxSkipTaskCount"
// ReplicatorTaskWorkerCount is number of worker for ReplicatorProcessor
ReplicatorTaskWorkerCount = "history.replicatorTaskWorkerCount"
// ReplicatorTaskMaxRetryCount is max times of retry for ReplicatorProcessor
Expand Down
10 changes: 10 additions & 0 deletions common/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ func (ns *Namespace) ClusterNames() []string {
return out
}

// IsOnCluster returns true is namespace is registered on cluster otherwise false.
func (ns *Namespace) IsOnCluster(clusterName string) bool {
for _, namespaceCluster := range ns.replicationConfig.Clusters {
if namespaceCluster == clusterName {
return true
}
}
return false
}

// ConfigVersion return the namespace config version
func (ns *Namespace) ConfigVersion() int64 {
return ns.configVersion
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type Config struct {
ReplicatorProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
ReplicatorProcessorEnablePriorityTaskProcessor dynamicconfig.BoolPropertyFn
ReplicatorProcessorFetchTasksBatchSize dynamicconfig.IntPropertyFn
ReplicatorProcessorMaxSkipTaskCount dynamicconfig.IntPropertyFn

// System Limits
MaximumBufferedEventsBatch dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -417,6 +418,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ReplicatorProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxReschedulerSize, 10000),
ReplicatorProcessorEnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.ReplicatorProcessorEnablePriorityTaskProcessor, false),
ReplicatorProcessorFetchTasksBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 25),
ReplicatorProcessorMaxSkipTaskCount: dc.GetIntProperty(dynamicconfig.ReplicatorMaxSkipTaskCount, 250),
ReplicationTaskProcessorHostQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorHostQPS, 1500),
ReplicationTaskProcessorShardQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 30),

Expand Down
93 changes: 63 additions & 30 deletions service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -73,7 +74,9 @@ type (
metricsHandler metrics.MetricsHandler
logger log.Logger
retryPolicy backoff.RetryPolicy
pageSize int
namespaceRegistry namespace.Registry
pageSize dynamicconfig.IntPropertyFn
maxSkipTaskCount dynamicconfig.IntPropertyFn

sync.Mutex
// largest replication task ID generated
Expand All @@ -85,7 +88,6 @@ type (

var (
errUnknownReplicationTask = serviceerror.NewInternal("unknown replication task")
emptyReplicationTasks = []*replicationspb.ReplicationTask{}
)

func NewAckManager(
Expand All @@ -111,7 +113,9 @@ func NewAckManager(
metricsHandler: shard.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.ReplicatorQueueProcessorScope)),
logger: log.With(logger, tag.ComponentReplicatorQueue),
retryPolicy: retryPolicy,
pageSize: config.ReplicatorProcessorFetchTasksBatchSize(),
namespaceRegistry: shard.GetNamespaceRegistry(),
pageSize: config.ReplicatorProcessorFetchTasksBatchSize,
maxSkipTaskCount: config.ReplicatorProcessorMaxSkipTaskCount,

maxTaskID: nil,
sanityCheckTime: time.Time{},
Expand Down Expand Up @@ -223,9 +227,9 @@ func (p *ackMgrImpl) GetTasks(
minTaskID, maxTaskID := p.taskIDsRange(queryMessageID)
replicationTasks, lastTaskID, err := p.getTasks(
ctx,
pollingCluster,
minTaskID,
maxTaskID,
p.pageSize,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -254,49 +258,65 @@ func (p *ackMgrImpl) GetTasks(

func (p *ackMgrImpl) getTasks(
ctx context.Context,
pollingCluster string,
minTaskID int64,
maxTaskID int64,
batchSize int,
) ([]*replicationspb.ReplicationTask, int64, error) {
if minTaskID > maxTaskID {
return nil, 0, serviceerror.NewUnavailable("min task ID < max task ID, probably due to shard re-balancing")
return nil, 0, serviceerror.NewUnavailable("min task ID > max task ID, probably due to shard re-balancing")
} else if minTaskID == maxTaskID {
return []*replicationspb.ReplicationTask{}, maxTaskID, nil
return nil, maxTaskID, nil
}

replicationTasks := make([]*replicationspb.ReplicationTask, 0, batchSize)
iter := collection.NewPagingIterator(p.getPaginationFn(ctx, minTaskID, maxTaskID, batchSize))
for iter.HasNext() && len(replicationTasks) < batchSize {
replicationTasks := make([]*replicationspb.ReplicationTask, 0, p.pageSize())
skippedTaskCount := 0
lastTaskID := maxTaskID // If no tasks are returned, then it means there are no tasks bellow maxTaskID.
iter := collection.NewPagingIterator(p.getReplicationTasksFn(ctx, minTaskID, maxTaskID, p.pageSize()))
// iter.HasNext() should be the last check to avoid extra page read in case if replicationTasks is already full.
for len(replicationTasks) < p.pageSize() && skippedTaskCount <= p.maxSkipTaskCount() && iter.HasNext() {
task, err := iter.Next()
if err != nil {
p.logger.Error("replication task reader encounter error, return earlier", tag.Error(err))
if len(replicationTasks) == 0 {
return nil, 0, err
} else {
return replicationTasks, replicationTasks[len(replicationTasks)-1].GetSourceTaskId(), nil
}
return p.swallowPartialResultsError(replicationTasks, lastTaskID, err)
}

if replicationTask, err := p.toReplicationTask(ctx, task); err != nil {
p.logger.Error("replication task reader encounter error, return earlier", tag.Error(err))
if len(replicationTasks) == 0 {
return nil, 0, err
} else {
return replicationTasks, replicationTasks[len(replicationTasks)-1].GetSourceTaskId(), nil
// If, for any reason, task is skipped:
// - lastTaskID needs to be updated because this task should not be read next time,
// - skippedTaskCount needs to be incremented to prevent timeout on caller side (too many tasks are skipped).
// If error has occurred though, lastTaskID shouldn't be updated, and next time task needs to be read again.

ns, err := p.namespaceRegistry.GetNamespaceByID(namespace.ID(task.GetNamespaceID()))
if err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); !isNotFound {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this could happen after host restart and before ns refresh fetched the namespace info?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First call to load namespaces is from registry.Start method, which is blocking and called by fx during container is getting built.

return p.swallowPartialResultsError(replicationTasks, lastTaskID, err)
}
} else if replicationTask != nil {
replicationTasks = append(replicationTasks, replicationTask)
// Namespace doesn't exist on this cluster (i.e. deleted). It is safe to skip the task.
lastTaskID = task.GetTaskID()
skippedTaskCount++
continue
}
// If namespace doesn't exist on polling cluster, there is no reason to send the task.
if !ns.IsOnCluster(pollingCluster) {
lastTaskID = task.GetTaskID()
skippedTaskCount++
continue
}
}

if len(replicationTasks) == 0 {
return emptyReplicationTasks, maxTaskID, nil
} else {
return replicationTasks, replicationTasks[len(replicationTasks)-1].GetSourceTaskId(), nil
replicationTask, err := p.toReplicationTask(ctx, task)
if err != nil {
return p.swallowPartialResultsError(replicationTasks, lastTaskID, err)
} else if replicationTask == nil {
lastTaskID = task.GetTaskID()
skippedTaskCount++
continue
}
lastTaskID = task.GetTaskID()
replicationTasks = append(replicationTasks, replicationTask)
}

return replicationTasks, lastTaskID, nil
}

func (p *ackMgrImpl) getPaginationFn(
func (p *ackMgrImpl) getReplicationTasksFn(
ctx context.Context,
minTaskID int64,
maxTaskID int64,
Expand All @@ -318,6 +338,19 @@ func (p *ackMgrImpl) getPaginationFn(
}
}

func (p *ackMgrImpl) swallowPartialResultsError(
replicationTasks []*replicationspb.ReplicationTask,
lastTaskID int64,
err error,
) ([]*replicationspb.ReplicationTask, int64, error) {

p.logger.Error("Replication tasks reader encountered error, return earlier.", tag.Error(err), tag.Value(len(replicationTasks)))
if len(replicationTasks) == 0 {
return nil, 0, err
}
return replicationTasks, lastTaskID, nil
}

func (p *ackMgrImpl) taskIDsRange(
lastReadMessageID int64,
) (minTaskID int64, maxTaskID int64) {
Expand Down
Loading