-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
separate transfer queue to active and standby processor #661
Conversation
c42b31d
to
0283818
Compare
separate queue ack mgr into separate file
0571cf6
to
e230c92
Compare
e230c92
to
d6c62b8
Compare
|
||
// error which will be thrown if the timer / transfer task should be | ||
// retries due to various of reasons | ||
taskRetryError struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's reuse the same error used by timerQueueProcessor.
} | ||
|
||
transferQueueProcessor interface { | ||
common.Daemon | ||
NotifyNewTask() | ||
NotifyNewTask(clusterName string) | ||
SetCurrentTime(clusterName string, currentTime time.Time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TransferQueueProcessor does not use time at all. It uses a constant value for visibilityTime for all transfer and replicator tasks.
What's the purpose of this API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to delay the processing of the standby transfer tasks.
@@ -118,12 +132,11 @@ type ( | |||
} | |||
|
|||
timerQueueAckMgr interface { | |||
readRetryTimerTasks() []*persistence.TimerTaskInfo | |||
getFinishedChan() <-chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you will remove it from this PR. I think the timer changes covers this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plz ignore the timer* files
@@ -99,6 +109,8 @@ func (p *replicatorQueueProcessorImpl) Process(qTask queueTaskInfo) error { | |||
|
|||
if err != nil { | |||
p.metricsClient.IncCounter(scope, metrics.TaskRequests) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you change this to TaskFailure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@@ -99,6 +109,8 @@ func (p *replicatorQueueProcessorImpl) Process(qTask queueTaskInfo) error { | |||
|
|||
if err != nil { | |||
p.metricsClient.IncCounter(scope, metrics.TaskRequests) | |||
} else { | |||
p.queueAckMgr.completeTask(task.TaskID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why move this out of base processor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i believe there should be some logical separation of base processor and actual processor.
the base processor should only deal with retry, when seeing a error, and the actual task processor (the actual processing logic) should deal with errors such as entity not exists.
separate queue ack mgr into separate file
Review #648 first, since this PR contains some code from #648
solve #564
partially solve #566