-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cadence Worker service to host replicator #563
Conversation
New cadence-worker service and bootstrap code to bring up the new service. This service currently only host the replicator which consumes replication tasks from kafka topic and applies to current Cadence cluster. MessagingClient interface abstract out interaction with Kafka. KafkaClient provides implementation for the interface using kafka-client library which consumes messages from Kafka broker. Created separate Kafka configuration used for bootstrapping the KafkaClient created during service startup and passed down to replicator. Created replicator.thrift which defines the payload which is sent over to other clusters through KafkaClient.
service/worker/processor.go
Outdated
go p.worker(&workerWG) | ||
} | ||
|
||
processorPumpLoop: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this loop necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, it is not necessary. Previously I had more case statements which I removed later but forgot to remove the loop.
service/worker/processor.go
Outdated
} | ||
|
||
if task, err := deserialize(msg.Value()); err != nil { | ||
p.logger.Errorf("Deserialize Error. Value: %v, Error: %v", string(msg.Value()), err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the DLQ part is working in progress, plz add a TODO here.
Since we should investigate any message which cannot be processed.
same applies to line: 151, 160
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a TODO. Adding a DLQ is pretty straightforward as the underlying kafka-client library already has support for this. But currently the idea is to block the consumer when this happens until the fix for such issues are rolled out.
@@ -0,0 +1,54 @@ | |||
Cadence Worker (In Development) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Worker naming is a bit confusing given that we use the same in other contexts. I don't have a better suggestion other than cadence-background
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the worker naming on the client-side, but not on the server. Worker is well understood convention for hosting background processing. Let's stick with worker for now.
|
||
for _, processor := range r.processors { | ||
if err := processor.Start(); err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if one of the processor returns an error, don't you need to stop the other created processors i.e. you probably need a call to Stop() here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I have changed service to call Stop on the replicator if Start returns an error.
topicClusterAssignment[topic] = []string{cfg.Cluster} | ||
} | ||
|
||
client := kafkaclient.New(kafka.NewStaticNameResolver(topicClusterAssignment, brokers), zap.NewNop(), tally.NoopScope) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest passing a real tally scope to emit metrics for debugging
service/worker/processor.go
Outdated
return // channel closed | ||
} | ||
|
||
if task, err := deserialize(msg.Value()); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also emit a metric for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
New cadence-worker service and bootstrap code to bring up the new
service. This service currently only host the replicator which consumes
replication tasks from kafka topic and applies to current Cadence
cluster.
MessagingClient interface abstract out interaction with Kafka.
KafkaClient provides implementation for the interface using kafka-client
library which consumes messages from Kafka broker.
Created separate Kafka configuration used for bootstrapping the
KafkaClient created during service startup and passed down to
replicator.
Created replicator.thrift which defines the payload which is sent over
to other clusters through KafkaClient.