-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
V0.3.12 patch #828
V0.3.12 patch #828
Conversation
@@ -2272,7 +2278,7 @@ func (d *cassandraPersistence) GetTasks(request *GetTasksRequest) (*GetTasksResp | |||
rowTypeTask, | |||
request.ReadLevel, | |||
request.MaxReadLevel, | |||
request.BatchSize) | |||
).PageSize(request.BatchSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are using paginated query but not returning pageToken back to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is for the task, by the matching side.
for anything other than timer, we use the task id for querying
@@ -133,15 +132,21 @@ TaskFilterLoop: | |||
return tasks, morePage, nil | |||
} | |||
|
|||
func (a *queueAckMgrImpl) completeTask(taskID int64) { | |||
func (a *queueAckMgrImpl) completeQueueTask(taskID int64) error { | |||
err := a.processor.completeTask(taskID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused here. I thought we are going to cleanup the task as a separate background go-routine. Each individual processor only updates the acklevel for the cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be actually called by each processor.
the replicator queue processor will actually complete the task
the transfer queue processor will do nothing
@@ -175,7 +175,7 @@ func (p *replicatorQueueProcessorImpl) readTasks(readLevel int64) ([]queueTaskIn | |||
tasks[i] = response.Tasks[i] | |||
} | |||
|
|||
return tasks, len(tasks) >= batchSize, nil | |||
return tasks, len(response.NextPageToken) != 0, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't use the nextPageToken for the next query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for anything other than timer, we use the task id for querying
@@ -114,8 +114,8 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { | |||
TimerProcessorGetFailureRetryCount: 5, | |||
TimerProcessorCompleteTimerFailureRetryCount: 10, | |||
TimerProcessorUpdateShardTaskCount: 100, | |||
TimerProcessorUpdateAckInterval: 1 * time.Minute, | |||
TimerProcessorCompleteTimerInterval: 1 * time.Second, | |||
TimerProcessorUpdateAckInterval: 5 * time.Second, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating shard is a very expensive query. I'm not sure changing update interval for all 16K shards is a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have the shard level min update interval to guarantee shard is not updated frequently (on the database level), however, on the application level (in memory), as frequent as possible
@@ -125,16 +125,16 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { | |||
TransferProcessorCompleteTransferFailureRetryCount: 10, | |||
TransferProcessorUpdateShardTaskCount: 100, | |||
TransferProcessorMaxPollInterval: 60 * time.Second, | |||
TransferProcessorUpdateAckInterval: 1 * time.Minute, | |||
TransferProcessorCompleteTransferInterval: 1 * time.Second, | |||
TransferProcessorUpdateAckInterval: 5 * time.Second, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as timer update interval.
TransferProcessorStandbyTaskDelay: 0 * time.Minute, | ||
ReplicatorTaskBatchSize: 10, | ||
ReplicatorTaskWorkerCount: 10, | ||
ReplicatorTaskMaxRetryCount: 100, | ||
ReplicatorProcessorMaxPollRPS: 100, | ||
ReplicatorProcessorUpdateShardTaskCount: 100, | ||
ReplicatorProcessorMaxPollInterval: 60 * time.Second, | ||
ReplicatorProcessorUpdateAckInterval: 1 * time.Minute, | ||
ReplicatorProcessorUpdateAckInterval: 5 * time.Second, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
@@ -182,6 +182,7 @@ func (t *timerQueueProcessorImpl) completeTimers() error { | |||
} | |||
} | |||
|
|||
t.logger.Infof("Start completing timer task from: %v, to %v.", lowerAckLevel, upperAckLevel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log will be very noisy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i can remove this
if upperAckLevel > ackLevel { | ||
upperAckLevel = ackLevel | ||
} | ||
} | ||
} | ||
|
||
t.logger.Infof("Start completing transfer task from: %v, to %v.", lowerAckLevel, upperAckLevel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be very noisy log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i can remove this
|
||
func (t *transferQueueProcessorBase) readTasks(readLevel int64) ([]queueTaskInfo, bool, error) { | ||
batchSize := t.options.BatchSize | ||
response, err := t.executionManager.GetTransferTasks(&persistence.GetTransferTasksRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is weird that we use paginationToken in one place and does not use it another place. I think we should make both paths consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that means using the complex logic, over simple existing logic.
the reason that we have to use the pagination token is timer task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you address the comments I posted.
No description provided.