-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Publish replication task to Kafka after reading from replicator queue #632
Publish replication task to Kafka after reading from replicator queue #632
Conversation
func (e *mutableStateBuilder) ApplyReplicationStateUpdates(failoverVersion int64) { | ||
e.replicationState.CurrentVersion = failoverVersion | ||
e.replicationState.LastWriteVersion = failoverVersion | ||
e.replicationState.LastWriteEventID = e.hBuilder.nextEventID - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this attr NextEventID? so no need to do the -1 operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll put a TODO comment for this now. Replication protocol currently uses LastWriteEventID so I don't want to mess with the terminology. But I see your point. I'll rename this when I start work on the replication protocol part of changes.
service/history/queueProcessor.go
Outdated
func (p *queueProcessorBase) processWithRetry(task queueTaskInfo) { | ||
p.logger.Debugf("Processing task: %v, type: %v", task.GetTaskID(), task.GetTaskType()) | ||
ProcessRetryLoop: | ||
for retryCount := 1; retryCount <= 100; retryCount++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retryCount should be configurable, ie. a const / var or input parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Let me make this change.
service/history/queueProcessor.go
Outdated
a.Lock() | ||
MoveAckLevelLoop: | ||
for current := a.ackLevel + 1; current <= a.readLevel; current++ { | ||
if acked, ok := a.outstandingTasks[current]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably panic if !ok, i.e. key not found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well for now I ported over the existing logic. Let me add a comment address what happens if !ok.
// It keeps track of read level when dispatching tasks to processor and maintains a map of outstanding tasks. | ||
// Outstanding tasks map uses the task id sequencer as the key, which is used by updateAckLevel to move the ack level | ||
// for the shard when all preceding tasks are acknowledged. | ||
ackManager struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we separate this ack manager into a dedicated file? that will be easier to add unit test and have separation of logic which will make the code more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now it is not used by any other component and only implementation detail of queue processor. Let me put a comment to move this outside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit test please.
Created QueueProcessorBase which has common logic used by both transfer queue procesor and replication queue processor. Created replicationQueueProcessor which processes replication task from replicator queue and publishes it to Kafka. Bootstrap logic for replicationQueueProcessor to host it within historyEngine. Also bootstrap logic to pass in kafka publisher to history engine. History engine changes to update replication state on StartWorkflowExecution and UpdateWorkflowExecution by updating the failover version from domain and writing it to mutable state.
dc64aef
to
56752cc
Compare
Created QueueProcessorBase which has common logic used by both transfer
queue procesor and replication queue processor.
Created replicationQueueProcessor which processes replication task from
replicator queue and publishes it to Kafka.
Bootstrap logic for replicationQueueProcessor to host it within
historyEngine. Also bootstrap logic to pass in kafka publisher to
history engine.
History engine changes to update replication state on
StartWorkflowExecution and UpdateWorkflowExecution by updating the
failover version from domain and writing it to mutable state.