From 765b96b622e629629f35e9c6b11beb62b860bdf4 Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Fri, 16 Feb 2018 08:32:50 -0800 Subject: [PATCH 1/3] Cadence Worker service to host replicator New cadence-worker service and bootstrap code to bring up the new service. This service currently only host the replicator which consumes replication tasks from kafka topic and applies to current Cadence cluster. MessagingClient interface abstract out interaction with Kafka. KafkaClient provides implementation for the interface using kafka-client library which consumes messages from Kafka broker. Created separate Kafka configuration used for bootstrapping the KafkaClient created during service startup and passed down to replicator. Created replicator.thrift which defines the payload which is sent over to other clusters through KafkaClient. --- .gen/go/cadence/idl.go | 2 +- .../workflowservice_deprecatedomain.go | 2 +- .../cadence/workflowservice_describedomain.go | 2 +- .../workflowservice_describetasklist.go | 2 +- ...rkflowservice_describeworkflowexecution.go | 2 +- ...flowservice_getworkflowexecutionhistory.go | 2 +- ...lowservice_listclosedworkflowexecutions.go | 2 +- ...kflowservice_listopenworkflowexecutions.go | 2 +- .../workflowservice_pollforactivitytask.go | 2 +- .../workflowservice_pollfordecisiontask.go | 2 +- .../cadence/workflowservice_queryworkflow.go | 2 +- ...flowservice_recordactivitytaskheartbeat.go | 2 +- .../cadence/workflowservice_registerdomain.go | 2 +- ...wservice_requestcancelworkflowexecution.go | 2 +- ...flowservice_respondactivitytaskcanceled.go | 2 +- ...service_respondactivitytaskcanceledbyid.go | 2 +- ...lowservice_respondactivitytaskcompleted.go | 2 +- ...ervice_respondactivitytaskcompletedbyid.go | 2 +- ...rkflowservice_respondactivitytaskfailed.go | 2 +- ...owservice_respondactivitytaskfailedbyid.go | 2 +- ...lowservice_responddecisiontaskcompleted.go | 2 +- ...rkflowservice_responddecisiontaskfailed.go | 2 +- ...rkflowservice_respondquerytaskcompleted.go | 2 +- ...workflowservice_signalworkflowexecution.go | 2 +- .../workflowservice_startworkflowexecution.go | 2 +- ...kflowservice_terminateworkflowexecution.go | 2 +- .../cadence/workflowservice_updatedomain.go | 2 +- .gen/go/health/idl.go | 2 +- .gen/go/health/meta_health.go | 2 +- .gen/go/health/types.go | 2 +- ...istoryservice_describeworkflowexecution.go | 2 +- .../history/historyservice_getmutablestate.go | 2 +- ...toryservice_recordactivitytaskheartbeat.go | 2 +- ...istoryservice_recordactivitytaskstarted.go | 2 +- ...ryservice_recordchildexecutioncompleted.go | 2 +- ...istoryservice_recorddecisiontaskstarted.go | 2 +- ...historyservice_removesignalmutablestate.go | 2 +- ...yservice_requestcancelworkflowexecution.go | 2 +- ...toryservice_respondactivitytaskcanceled.go | 2 +- ...oryservice_respondactivitytaskcompleted.go | 2 +- ...istoryservice_respondactivitytaskfailed.go | 2 +- ...oryservice_responddecisiontaskcompleted.go | 2 +- ...istoryservice_responddecisiontaskfailed.go | 2 +- .../historyservice_scheduledecisiontask.go | 2 +- .../historyservice_signalworkflowexecution.go | 2 +- .../historyservice_startworkflowexecution.go | 2 +- ...storyservice_terminateworkflowexecution.go | 2 +- .gen/go/history/idl.go | 2 +- .gen/go/history/types.go | 2 +- .gen/go/matching/idl.go | 2 +- .../matchingservice_addactivitytask.go | 2 +- .../matchingservice_adddecisiontask.go | 2 +- .../matchingservice_canceloutstandingpoll.go | 2 +- .../matchingservice_describetasklist.go | 2 +- .../matchingservice_pollforactivitytask.go | 2 +- .../matchingservice_pollfordecisiontask.go | 2 +- .../matching/matchingservice_queryworkflow.go | 2 +- ...tchingservice_respondquerytaskcompleted.go | 2 +- .gen/go/matching/types.go | 2 +- .gen/go/replicator/idl.go | 37 ++ .gen/go/replicator/types.go | 513 ++++++++++++++++++ .gen/go/shared/idl.go | 2 +- .gen/go/shared/types.go | 82 ++- Makefile | 1 + cmd/server/cadence.go | 17 +- cmd/server/server.go | 5 + common/constants.go | 2 + common/logging/events.go | 9 + common/logging/helpers.go | 42 ++ common/logging/tags.go | 18 +- common/messaging/interface.go | 32 ++ common/messaging/kafkaClient.go | 67 +++ common/messaging/kafkaConfig.go | 77 +++ common/metrics/defs.go | 25 + common/service/config/config.go | 7 + common/service/service.go | 12 +- config/development_active.yaml | 24 +- glide.lock | 60 +- glide.yaml | 13 +- idl/github.com/uber/cadence/replicator.thrift | 39 ++ service/worker/processor.go | 179 ++++++ service/worker/replicator.go | 88 +++ service/worker/service.go | 86 +++ 83 files changed, 1462 insertions(+), 93 deletions(-) create mode 100644 .gen/go/replicator/idl.go create mode 100644 .gen/go/replicator/types.go create mode 100644 common/messaging/interface.go create mode 100644 common/messaging/kafkaClient.go create mode 100644 common/messaging/kafkaConfig.go create mode 100644 idl/github.com/uber/cadence/replicator.thrift create mode 100644 service/worker/processor.go create mode 100644 service/worker/replicator.go create mode 100644 service/worker/service.go diff --git a/.gen/go/cadence/idl.go b/.gen/go/cadence/idl.go index 1fcfa11c3ae..d6cea5b8946 100644 --- a/.gen/go/cadence/idl.go +++ b/.gen/go/cadence/idl.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_deprecatedomain.go b/.gen/go/cadence/workflowservice_deprecatedomain.go index 49d869ee764..2dd58c28a21 100644 --- a/.gen/go/cadence/workflowservice_deprecatedomain.go +++ b/.gen/go/cadence/workflowservice_deprecatedomain.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_describedomain.go b/.gen/go/cadence/workflowservice_describedomain.go index 513ab8f32bc..2edee414ca6 100644 --- a/.gen/go/cadence/workflowservice_describedomain.go +++ b/.gen/go/cadence/workflowservice_describedomain.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_describetasklist.go b/.gen/go/cadence/workflowservice_describetasklist.go index c11feb58717..76ab6826d19 100644 --- a/.gen/go/cadence/workflowservice_describetasklist.go +++ b/.gen/go/cadence/workflowservice_describetasklist.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_describeworkflowexecution.go b/.gen/go/cadence/workflowservice_describeworkflowexecution.go index 3c44860e0de..30a44ba422e 100644 --- a/.gen/go/cadence/workflowservice_describeworkflowexecution.go +++ b/.gen/go/cadence/workflowservice_describeworkflowexecution.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_getworkflowexecutionhistory.go b/.gen/go/cadence/workflowservice_getworkflowexecutionhistory.go index 3f0f6ddc5b8..744eaec409b 100644 --- a/.gen/go/cadence/workflowservice_getworkflowexecutionhistory.go +++ b/.gen/go/cadence/workflowservice_getworkflowexecutionhistory.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_listclosedworkflowexecutions.go b/.gen/go/cadence/workflowservice_listclosedworkflowexecutions.go index d54a753183d..5fc1e110e73 100644 --- a/.gen/go/cadence/workflowservice_listclosedworkflowexecutions.go +++ b/.gen/go/cadence/workflowservice_listclosedworkflowexecutions.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_listopenworkflowexecutions.go b/.gen/go/cadence/workflowservice_listopenworkflowexecutions.go index d72a977c7ab..8cddfce22e1 100644 --- a/.gen/go/cadence/workflowservice_listopenworkflowexecutions.go +++ b/.gen/go/cadence/workflowservice_listopenworkflowexecutions.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_pollforactivitytask.go b/.gen/go/cadence/workflowservice_pollforactivitytask.go index eb4551a5a76..f6c55f8843e 100644 --- a/.gen/go/cadence/workflowservice_pollforactivitytask.go +++ b/.gen/go/cadence/workflowservice_pollforactivitytask.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_pollfordecisiontask.go b/.gen/go/cadence/workflowservice_pollfordecisiontask.go index a4805ff6a80..2f6b452a5a4 100644 --- a/.gen/go/cadence/workflowservice_pollfordecisiontask.go +++ b/.gen/go/cadence/workflowservice_pollfordecisiontask.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_queryworkflow.go b/.gen/go/cadence/workflowservice_queryworkflow.go index 343d67f0c64..d3339d5337c 100644 --- a/.gen/go/cadence/workflowservice_queryworkflow.go +++ b/.gen/go/cadence/workflowservice_queryworkflow.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_recordactivitytaskheartbeat.go b/.gen/go/cadence/workflowservice_recordactivitytaskheartbeat.go index 108bab6db9e..9401be309b0 100644 --- a/.gen/go/cadence/workflowservice_recordactivitytaskheartbeat.go +++ b/.gen/go/cadence/workflowservice_recordactivitytaskheartbeat.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_registerdomain.go b/.gen/go/cadence/workflowservice_registerdomain.go index 645c46c679d..3c2e41a8648 100644 --- a/.gen/go/cadence/workflowservice_registerdomain.go +++ b/.gen/go/cadence/workflowservice_registerdomain.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_requestcancelworkflowexecution.go b/.gen/go/cadence/workflowservice_requestcancelworkflowexecution.go index 94d6cf20cd2..be13ca4e593 100644 --- a/.gen/go/cadence/workflowservice_requestcancelworkflowexecution.go +++ b/.gen/go/cadence/workflowservice_requestcancelworkflowexecution.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_respondactivitytaskcanceled.go b/.gen/go/cadence/workflowservice_respondactivitytaskcanceled.go index 67b1c4ef063..ec4cec76389 100644 --- a/.gen/go/cadence/workflowservice_respondactivitytaskcanceled.go +++ b/.gen/go/cadence/workflowservice_respondactivitytaskcanceled.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_respondactivitytaskcanceledbyid.go b/.gen/go/cadence/workflowservice_respondactivitytaskcanceledbyid.go index ccd5827eff0..cba1c59de51 100644 --- a/.gen/go/cadence/workflowservice_respondactivitytaskcanceledbyid.go +++ b/.gen/go/cadence/workflowservice_respondactivitytaskcanceledbyid.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_respondactivitytaskcompleted.go b/.gen/go/cadence/workflowservice_respondactivitytaskcompleted.go index 094bc217faa..38a85e101c0 100644 --- a/.gen/go/cadence/workflowservice_respondactivitytaskcompleted.go +++ b/.gen/go/cadence/workflowservice_respondactivitytaskcompleted.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_respondactivitytaskcompletedbyid.go b/.gen/go/cadence/workflowservice_respondactivitytaskcompletedbyid.go index 694208c5837..ad79dd78447 100644 --- a/.gen/go/cadence/workflowservice_respondactivitytaskcompletedbyid.go +++ b/.gen/go/cadence/workflowservice_respondactivitytaskcompletedbyid.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_respondactivitytaskfailed.go b/.gen/go/cadence/workflowservice_respondactivitytaskfailed.go index 9d8b4928b55..2c85384642d 100644 --- a/.gen/go/cadence/workflowservice_respondactivitytaskfailed.go +++ b/.gen/go/cadence/workflowservice_respondactivitytaskfailed.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_respondactivitytaskfailedbyid.go b/.gen/go/cadence/workflowservice_respondactivitytaskfailedbyid.go index c7c3ea814ab..9861242eb6b 100644 --- a/.gen/go/cadence/workflowservice_respondactivitytaskfailedbyid.go +++ b/.gen/go/cadence/workflowservice_respondactivitytaskfailedbyid.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_responddecisiontaskcompleted.go b/.gen/go/cadence/workflowservice_responddecisiontaskcompleted.go index e07d9f97484..f50be344264 100644 --- a/.gen/go/cadence/workflowservice_responddecisiontaskcompleted.go +++ b/.gen/go/cadence/workflowservice_responddecisiontaskcompleted.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_responddecisiontaskfailed.go b/.gen/go/cadence/workflowservice_responddecisiontaskfailed.go index b3759f86779..fef5ca39d19 100644 --- a/.gen/go/cadence/workflowservice_responddecisiontaskfailed.go +++ b/.gen/go/cadence/workflowservice_responddecisiontaskfailed.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_respondquerytaskcompleted.go b/.gen/go/cadence/workflowservice_respondquerytaskcompleted.go index a0db6eacfe3..e5c472149d0 100644 --- a/.gen/go/cadence/workflowservice_respondquerytaskcompleted.go +++ b/.gen/go/cadence/workflowservice_respondquerytaskcompleted.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_signalworkflowexecution.go b/.gen/go/cadence/workflowservice_signalworkflowexecution.go index 68c8ff0a301..cc06cbb797a 100644 --- a/.gen/go/cadence/workflowservice_signalworkflowexecution.go +++ b/.gen/go/cadence/workflowservice_signalworkflowexecution.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_startworkflowexecution.go b/.gen/go/cadence/workflowservice_startworkflowexecution.go index 4d9beac0706..fe90f6b2b8c 100644 --- a/.gen/go/cadence/workflowservice_startworkflowexecution.go +++ b/.gen/go/cadence/workflowservice_startworkflowexecution.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_terminateworkflowexecution.go b/.gen/go/cadence/workflowservice_terminateworkflowexecution.go index bd08046a08b..155a06a91a1 100644 --- a/.gen/go/cadence/workflowservice_terminateworkflowexecution.go +++ b/.gen/go/cadence/workflowservice_terminateworkflowexecution.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/cadence/workflowservice_updatedomain.go b/.gen/go/cadence/workflowservice_updatedomain.go index 6cf7102f0c8..8a5340423b9 100644 --- a/.gen/go/cadence/workflowservice_updatedomain.go +++ b/.gen/go/cadence/workflowservice_updatedomain.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package cadence diff --git a/.gen/go/health/idl.go b/.gen/go/health/idl.go index 56d20ee7bb9..af116ae905c 100644 --- a/.gen/go/health/idl.go +++ b/.gen/go/health/idl.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package health diff --git a/.gen/go/health/meta_health.go b/.gen/go/health/meta_health.go index 417a91c487e..2afb345cd18 100644 --- a/.gen/go/health/meta_health.go +++ b/.gen/go/health/meta_health.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package health diff --git a/.gen/go/health/types.go b/.gen/go/health/types.go index 0450838f6b9..4e64f83860e 100644 --- a/.gen/go/health/types.go +++ b/.gen/go/health/types.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package health diff --git a/.gen/go/history/historyservice_describeworkflowexecution.go b/.gen/go/history/historyservice_describeworkflowexecution.go index e57b05f5d86..808fb6f945b 100644 --- a/.gen/go/history/historyservice_describeworkflowexecution.go +++ b/.gen/go/history/historyservice_describeworkflowexecution.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_getmutablestate.go b/.gen/go/history/historyservice_getmutablestate.go index e7a421ff0f5..5a17f826d4b 100644 --- a/.gen/go/history/historyservice_getmutablestate.go +++ b/.gen/go/history/historyservice_getmutablestate.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_recordactivitytaskheartbeat.go b/.gen/go/history/historyservice_recordactivitytaskheartbeat.go index c63bc8c47f0..40c7cd775b1 100644 --- a/.gen/go/history/historyservice_recordactivitytaskheartbeat.go +++ b/.gen/go/history/historyservice_recordactivitytaskheartbeat.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_recordactivitytaskstarted.go b/.gen/go/history/historyservice_recordactivitytaskstarted.go index d3b1ba1f1b0..8a2d1e6b5ef 100644 --- a/.gen/go/history/historyservice_recordactivitytaskstarted.go +++ b/.gen/go/history/historyservice_recordactivitytaskstarted.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_recordchildexecutioncompleted.go b/.gen/go/history/historyservice_recordchildexecutioncompleted.go index 1966b63e2c8..c1e042a7486 100644 --- a/.gen/go/history/historyservice_recordchildexecutioncompleted.go +++ b/.gen/go/history/historyservice_recordchildexecutioncompleted.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_recorddecisiontaskstarted.go b/.gen/go/history/historyservice_recorddecisiontaskstarted.go index 43873f0fd48..f9e96aab250 100644 --- a/.gen/go/history/historyservice_recorddecisiontaskstarted.go +++ b/.gen/go/history/historyservice_recorddecisiontaskstarted.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_removesignalmutablestate.go b/.gen/go/history/historyservice_removesignalmutablestate.go index 95a9b4861cc..84537039717 100644 --- a/.gen/go/history/historyservice_removesignalmutablestate.go +++ b/.gen/go/history/historyservice_removesignalmutablestate.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_requestcancelworkflowexecution.go b/.gen/go/history/historyservice_requestcancelworkflowexecution.go index b960bf9a7ec..ec3ca82c9a0 100644 --- a/.gen/go/history/historyservice_requestcancelworkflowexecution.go +++ b/.gen/go/history/historyservice_requestcancelworkflowexecution.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_respondactivitytaskcanceled.go b/.gen/go/history/historyservice_respondactivitytaskcanceled.go index d7c362b09d1..06547325b11 100644 --- a/.gen/go/history/historyservice_respondactivitytaskcanceled.go +++ b/.gen/go/history/historyservice_respondactivitytaskcanceled.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_respondactivitytaskcompleted.go b/.gen/go/history/historyservice_respondactivitytaskcompleted.go index de90c62febf..02ad5e6ca5f 100644 --- a/.gen/go/history/historyservice_respondactivitytaskcompleted.go +++ b/.gen/go/history/historyservice_respondactivitytaskcompleted.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_respondactivitytaskfailed.go b/.gen/go/history/historyservice_respondactivitytaskfailed.go index d0ba515ca56..4d385ba0a05 100644 --- a/.gen/go/history/historyservice_respondactivitytaskfailed.go +++ b/.gen/go/history/historyservice_respondactivitytaskfailed.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_responddecisiontaskcompleted.go b/.gen/go/history/historyservice_responddecisiontaskcompleted.go index c282fcf31e4..58bfe889ab7 100644 --- a/.gen/go/history/historyservice_responddecisiontaskcompleted.go +++ b/.gen/go/history/historyservice_responddecisiontaskcompleted.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_responddecisiontaskfailed.go b/.gen/go/history/historyservice_responddecisiontaskfailed.go index 9e670d08180..e07cc8ab217 100644 --- a/.gen/go/history/historyservice_responddecisiontaskfailed.go +++ b/.gen/go/history/historyservice_responddecisiontaskfailed.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_scheduledecisiontask.go b/.gen/go/history/historyservice_scheduledecisiontask.go index 0cb3c5aff50..a88e85e9509 100644 --- a/.gen/go/history/historyservice_scheduledecisiontask.go +++ b/.gen/go/history/historyservice_scheduledecisiontask.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_signalworkflowexecution.go b/.gen/go/history/historyservice_signalworkflowexecution.go index a9a7377f403..e569c7023a3 100644 --- a/.gen/go/history/historyservice_signalworkflowexecution.go +++ b/.gen/go/history/historyservice_signalworkflowexecution.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_startworkflowexecution.go b/.gen/go/history/historyservice_startworkflowexecution.go index 9b510b6a601..1356f982f3b 100644 --- a/.gen/go/history/historyservice_startworkflowexecution.go +++ b/.gen/go/history/historyservice_startworkflowexecution.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/historyservice_terminateworkflowexecution.go b/.gen/go/history/historyservice_terminateworkflowexecution.go index 7118a1bab49..bbda4b4efe5 100644 --- a/.gen/go/history/historyservice_terminateworkflowexecution.go +++ b/.gen/go/history/historyservice_terminateworkflowexecution.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/idl.go b/.gen/go/history/idl.go index 0edd88aa411..b7c1ab1e96f 100644 --- a/.gen/go/history/idl.go +++ b/.gen/go/history/idl.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/history/types.go b/.gen/go/history/types.go index 174064b2125..156f0e4e897 100644 --- a/.gen/go/history/types.go +++ b/.gen/go/history/types.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package history diff --git a/.gen/go/matching/idl.go b/.gen/go/matching/idl.go index fea40675cd6..2f6f9e7ddb8 100644 --- a/.gen/go/matching/idl.go +++ b/.gen/go/matching/idl.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package matching diff --git a/.gen/go/matching/matchingservice_addactivitytask.go b/.gen/go/matching/matchingservice_addactivitytask.go index a5726d1e8d1..ad59d524b30 100644 --- a/.gen/go/matching/matchingservice_addactivitytask.go +++ b/.gen/go/matching/matchingservice_addactivitytask.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package matching diff --git a/.gen/go/matching/matchingservice_adddecisiontask.go b/.gen/go/matching/matchingservice_adddecisiontask.go index a4ba449bf32..a0a4b458b66 100644 --- a/.gen/go/matching/matchingservice_adddecisiontask.go +++ b/.gen/go/matching/matchingservice_adddecisiontask.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package matching diff --git a/.gen/go/matching/matchingservice_canceloutstandingpoll.go b/.gen/go/matching/matchingservice_canceloutstandingpoll.go index efc8deda5a8..52f950d0078 100644 --- a/.gen/go/matching/matchingservice_canceloutstandingpoll.go +++ b/.gen/go/matching/matchingservice_canceloutstandingpoll.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package matching diff --git a/.gen/go/matching/matchingservice_describetasklist.go b/.gen/go/matching/matchingservice_describetasklist.go index d420a8cb369..26f9a5b70b0 100644 --- a/.gen/go/matching/matchingservice_describetasklist.go +++ b/.gen/go/matching/matchingservice_describetasklist.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package matching diff --git a/.gen/go/matching/matchingservice_pollforactivitytask.go b/.gen/go/matching/matchingservice_pollforactivitytask.go index 816c6645e3c..2f37a4b3452 100644 --- a/.gen/go/matching/matchingservice_pollforactivitytask.go +++ b/.gen/go/matching/matchingservice_pollforactivitytask.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package matching diff --git a/.gen/go/matching/matchingservice_pollfordecisiontask.go b/.gen/go/matching/matchingservice_pollfordecisiontask.go index de18b019bca..863e98989d0 100644 --- a/.gen/go/matching/matchingservice_pollfordecisiontask.go +++ b/.gen/go/matching/matchingservice_pollfordecisiontask.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package matching diff --git a/.gen/go/matching/matchingservice_queryworkflow.go b/.gen/go/matching/matchingservice_queryworkflow.go index 77396343af1..0beea38dfad 100644 --- a/.gen/go/matching/matchingservice_queryworkflow.go +++ b/.gen/go/matching/matchingservice_queryworkflow.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package matching diff --git a/.gen/go/matching/matchingservice_respondquerytaskcompleted.go b/.gen/go/matching/matchingservice_respondquerytaskcompleted.go index 51bc2a88f0e..a453ca1bb2a 100644 --- a/.gen/go/matching/matchingservice_respondquerytaskcompleted.go +++ b/.gen/go/matching/matchingservice_respondquerytaskcompleted.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package matching diff --git a/.gen/go/matching/types.go b/.gen/go/matching/types.go index 63ac6d416ca..1a2bc97c512 100644 --- a/.gen/go/matching/types.go +++ b/.gen/go/matching/types.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package matching diff --git a/.gen/go/replicator/idl.go b/.gen/go/replicator/idl.go new file mode 100644 index 00000000000..a68eab28600 --- /dev/null +++ b/.gen/go/replicator/idl.go @@ -0,0 +1,37 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by thriftrw v1.10.0. DO NOT EDIT. +// @generated + +package replicator + +import "go.uber.org/thriftrw/thriftreflect" + +// ThriftModule represents the IDL file used to generate this package. +var ThriftModule = &thriftreflect.ThriftModule{ + Name: "replicator", + Package: "github.com/uber/cadence/.gen/go/replicator", + FilePath: "replicator.thrift", + SHA1: "c4d205b83452133d8883bd6dc5b4957ab21c0fb2", + Raw: rawIDL, +} + +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.replicator\n\nenum ReplicationTaskType {\n Domain\n History\n}\n\nstruct DomainTaskAttributes {\n}\n\nstruct HistoryTaskAttributes {\n}\n\nstruct ReplicationTask {\n 10: optional ReplicationTaskType taskType\n 20: optional DomainTaskAttributes domainTaskAttributes\n 30: optional HistoryTaskAttributes historyTaskAttributes\n}\n\n" diff --git a/.gen/go/replicator/types.go b/.gen/go/replicator/types.go new file mode 100644 index 00000000000..ce4abaeed47 --- /dev/null +++ b/.gen/go/replicator/types.go @@ -0,0 +1,513 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by thriftrw v1.10.0. DO NOT EDIT. +// @generated + +package replicator + +import ( + "bytes" + "encoding/json" + "fmt" + "go.uber.org/thriftrw/wire" + "math" + "strconv" + "strings" +) + +type DomainTaskAttributes struct { +} + +// ToWire translates a DomainTaskAttributes struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *DomainTaskAttributes) ToWire() (wire.Value, error) { + var ( + fields [0]wire.Field + i int = 0 + ) + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +// FromWire deserializes a DomainTaskAttributes struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a DomainTaskAttributes struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v DomainTaskAttributes +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *DomainTaskAttributes) FromWire(w wire.Value) error { + + for _, field := range w.GetStruct().Fields { + switch field.ID { + } + } + + return nil +} + +// String returns a readable string representation of a DomainTaskAttributes +// struct. +func (v *DomainTaskAttributes) String() string { + if v == nil { + return "" + } + + var fields [0]string + i := 0 + + return fmt.Sprintf("DomainTaskAttributes{%v}", strings.Join(fields[:i], ", ")) +} + +// Equals returns true if all the fields of this DomainTaskAttributes match the +// provided DomainTaskAttributes. +// +// This function performs a deep comparison. +func (v *DomainTaskAttributes) Equals(rhs *DomainTaskAttributes) bool { + + return true +} + +type HistoryTaskAttributes struct { +} + +// ToWire translates a HistoryTaskAttributes struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *HistoryTaskAttributes) ToWire() (wire.Value, error) { + var ( + fields [0]wire.Field + i int = 0 + ) + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +// FromWire deserializes a HistoryTaskAttributes struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a HistoryTaskAttributes struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v HistoryTaskAttributes +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *HistoryTaskAttributes) FromWire(w wire.Value) error { + + for _, field := range w.GetStruct().Fields { + switch field.ID { + } + } + + return nil +} + +// String returns a readable string representation of a HistoryTaskAttributes +// struct. +func (v *HistoryTaskAttributes) String() string { + if v == nil { + return "" + } + + var fields [0]string + i := 0 + + return fmt.Sprintf("HistoryTaskAttributes{%v}", strings.Join(fields[:i], ", ")) +} + +// Equals returns true if all the fields of this HistoryTaskAttributes match the +// provided HistoryTaskAttributes. +// +// This function performs a deep comparison. +func (v *HistoryTaskAttributes) Equals(rhs *HistoryTaskAttributes) bool { + + return true +} + +type ReplicationTask struct { + TaskType *ReplicationTaskType `json:"taskType,omitempty"` + DomainTaskAttributes *DomainTaskAttributes `json:"domainTaskAttributes,omitempty"` + HistoryTaskAttributes *HistoryTaskAttributes `json:"historyTaskAttributes,omitempty"` +} + +// ToWire translates a ReplicationTask struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *ReplicationTask) ToWire() (wire.Value, error) { + var ( + fields [3]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.TaskType != nil { + w, err = v.TaskType.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 10, Value: w} + i++ + } + if v.DomainTaskAttributes != nil { + w, err = v.DomainTaskAttributes.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 20, Value: w} + i++ + } + if v.HistoryTaskAttributes != nil { + w, err = v.HistoryTaskAttributes.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 30, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +func _ReplicationTaskType_Read(w wire.Value) (ReplicationTaskType, error) { + var v ReplicationTaskType + err := v.FromWire(w) + return v, err +} + +func _DomainTaskAttributes_Read(w wire.Value) (*DomainTaskAttributes, error) { + var v DomainTaskAttributes + err := v.FromWire(w) + return &v, err +} + +func _HistoryTaskAttributes_Read(w wire.Value) (*HistoryTaskAttributes, error) { + var v HistoryTaskAttributes + err := v.FromWire(w) + return &v, err +} + +// FromWire deserializes a ReplicationTask struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a ReplicationTask struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v ReplicationTask +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *ReplicationTask) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 10: + if field.Value.Type() == wire.TI32 { + var x ReplicationTaskType + x, err = _ReplicationTaskType_Read(field.Value) + v.TaskType = &x + if err != nil { + return err + } + + } + case 20: + if field.Value.Type() == wire.TStruct { + v.DomainTaskAttributes, err = _DomainTaskAttributes_Read(field.Value) + if err != nil { + return err + } + + } + case 30: + if field.Value.Type() == wire.TStruct { + v.HistoryTaskAttributes, err = _HistoryTaskAttributes_Read(field.Value) + if err != nil { + return err + } + + } + } + } + + return nil +} + +// String returns a readable string representation of a ReplicationTask +// struct. +func (v *ReplicationTask) String() string { + if v == nil { + return "" + } + + var fields [3]string + i := 0 + if v.TaskType != nil { + fields[i] = fmt.Sprintf("TaskType: %v", *(v.TaskType)) + i++ + } + if v.DomainTaskAttributes != nil { + fields[i] = fmt.Sprintf("DomainTaskAttributes: %v", v.DomainTaskAttributes) + i++ + } + if v.HistoryTaskAttributes != nil { + fields[i] = fmt.Sprintf("HistoryTaskAttributes: %v", v.HistoryTaskAttributes) + i++ + } + + return fmt.Sprintf("ReplicationTask{%v}", strings.Join(fields[:i], ", ")) +} + +func _ReplicationTaskType_EqualsPtr(lhs, rhs *ReplicationTaskType) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return x.Equals(y) + } + return lhs == nil && rhs == nil +} + +// Equals returns true if all the fields of this ReplicationTask match the +// provided ReplicationTask. +// +// This function performs a deep comparison. +func (v *ReplicationTask) Equals(rhs *ReplicationTask) bool { + if !_ReplicationTaskType_EqualsPtr(v.TaskType, rhs.TaskType) { + return false + } + if !((v.DomainTaskAttributes == nil && rhs.DomainTaskAttributes == nil) || (v.DomainTaskAttributes != nil && rhs.DomainTaskAttributes != nil && v.DomainTaskAttributes.Equals(rhs.DomainTaskAttributes))) { + return false + } + if !((v.HistoryTaskAttributes == nil && rhs.HistoryTaskAttributes == nil) || (v.HistoryTaskAttributes != nil && rhs.HistoryTaskAttributes != nil && v.HistoryTaskAttributes.Equals(rhs.HistoryTaskAttributes))) { + return false + } + + return true +} + +// GetTaskType returns the value of TaskType if it is set or its +// zero value if it is unset. +func (v *ReplicationTask) GetTaskType() (o ReplicationTaskType) { + if v.TaskType != nil { + return *v.TaskType + } + + return +} + +type ReplicationTaskType int32 + +const ( + ReplicationTaskTypeDomain ReplicationTaskType = 0 + ReplicationTaskTypeHistory ReplicationTaskType = 1 +) + +// ReplicationTaskType_Values returns all recognized values of ReplicationTaskType. +func ReplicationTaskType_Values() []ReplicationTaskType { + return []ReplicationTaskType{ + ReplicationTaskTypeDomain, + ReplicationTaskTypeHistory, + } +} + +// UnmarshalText tries to decode ReplicationTaskType from a byte slice +// containing its name. +// +// var v ReplicationTaskType +// err := v.UnmarshalText([]byte("Domain")) +func (v *ReplicationTaskType) UnmarshalText(value []byte) error { + switch string(value) { + case "Domain": + *v = ReplicationTaskTypeDomain + return nil + case "History": + *v = ReplicationTaskTypeHistory + return nil + default: + return fmt.Errorf("unknown enum value %q for %q", value, "ReplicationTaskType") + } +} + +// Ptr returns a pointer to this enum value. +func (v ReplicationTaskType) Ptr() *ReplicationTaskType { + return &v +} + +// ToWire translates ReplicationTaskType into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// Enums are represented as 32-bit integers over the wire. +func (v ReplicationTaskType) ToWire() (wire.Value, error) { + return wire.NewValueI32(int32(v)), nil +} + +// FromWire deserializes ReplicationTaskType from its Thrift-level +// representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TI32) +// if err != nil { +// return ReplicationTaskType(0), err +// } +// +// var v ReplicationTaskType +// if err := v.FromWire(x); err != nil { +// return ReplicationTaskType(0), err +// } +// return v, nil +func (v *ReplicationTaskType) FromWire(w wire.Value) error { + *v = (ReplicationTaskType)(w.GetI32()) + return nil +} + +// String returns a readable string representation of ReplicationTaskType. +func (v ReplicationTaskType) String() string { + w := int32(v) + switch w { + case 0: + return "Domain" + case 1: + return "History" + } + return fmt.Sprintf("ReplicationTaskType(%d)", w) +} + +// Equals returns true if this ReplicationTaskType value matches the provided +// value. +func (v ReplicationTaskType) Equals(rhs ReplicationTaskType) bool { + return v == rhs +} + +// MarshalJSON serializes ReplicationTaskType into JSON. +// +// If the enum value is recognized, its name is returned. Otherwise, +// its integer value is returned. +// +// This implements json.Marshaler. +func (v ReplicationTaskType) MarshalJSON() ([]byte, error) { + switch int32(v) { + case 0: + return ([]byte)("\"Domain\""), nil + case 1: + return ([]byte)("\"History\""), nil + } + return ([]byte)(strconv.FormatInt(int64(v), 10)), nil +} + +// UnmarshalJSON attempts to decode ReplicationTaskType from its JSON +// representation. +// +// This implementation supports both, numeric and string inputs. If a +// string is provided, it must be a known enum name. +// +// This implements json.Unmarshaler. +func (v *ReplicationTaskType) UnmarshalJSON(text []byte) error { + d := json.NewDecoder(bytes.NewReader(text)) + d.UseNumber() + t, err := d.Token() + if err != nil { + return err + } + + switch w := t.(type) { + case json.Number: + x, err := w.Int64() + if err != nil { + return err + } + if x > math.MaxInt32 { + return fmt.Errorf("enum overflow from JSON %q for %q", text, "ReplicationTaskType") + } + if x < math.MinInt32 { + return fmt.Errorf("enum underflow from JSON %q for %q", text, "ReplicationTaskType") + } + *v = (ReplicationTaskType)(x) + return nil + case string: + return v.UnmarshalText([]byte(w)) + default: + return fmt.Errorf("invalid JSON value %q (%T) to unmarshal into %q", t, t, "ReplicationTaskType") + } +} diff --git a/.gen/go/shared/idl.go b/.gen/go/shared/idl.go index 1a1ed9a5fd7..a9c63fb9fb8 100644 --- a/.gen/go/shared/idl.go +++ b/.gen/go/shared/idl.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package shared diff --git a/.gen/go/shared/types.go b/.gen/go/shared/types.go index 419ec56653e..edee643ba65 100644 --- a/.gen/go/shared/types.go +++ b/.gen/go/shared/types.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by thriftrw v1.8.0. DO NOT EDIT. +// Code generated by thriftrw v1.10.0. DO NOT EDIT. // @generated package shared @@ -2010,6 +2010,11 @@ func (v *CancelExternalWorkflowExecutionFailedCause) UnmarshalText(value []byte) } } +// Ptr returns a pointer to this enum value. +func (v CancelExternalWorkflowExecutionFailedCause) Ptr() *CancelExternalWorkflowExecutionFailedCause { + return &v +} + // ToWire translates CancelExternalWorkflowExecutionFailedCause into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -2695,6 +2700,11 @@ func (v *ChildPolicy) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v ChildPolicy) Ptr() *ChildPolicy { + return &v +} + // ToWire translates ChildPolicy into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -3356,6 +3366,11 @@ func (v *ChildWorkflowExecutionFailedCause) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v ChildWorkflowExecutionFailedCause) Ptr() *ChildWorkflowExecutionFailedCause { + return &v +} + // ToWire translates ChildWorkflowExecutionFailedCause into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -5699,6 +5714,11 @@ func (v *DecisionTaskFailedCause) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v DecisionTaskFailedCause) Ptr() *DecisionTaskFailedCause { + return &v +} + // ToWire translates DecisionTaskFailedCause into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -6747,6 +6767,11 @@ func (v *DecisionType) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v DecisionType) Ptr() *DecisionType { + return &v +} + // ToWire translates DecisionType into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -8816,6 +8841,11 @@ func (v *DomainStatus) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v DomainStatus) Ptr() *DomainStatus { + return &v +} + // ToWire translates DomainStatus into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -9255,6 +9285,11 @@ func (v *EventType) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v EventType) Ptr() *EventType { + return &v +} + // ToWire translates EventType into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -12085,6 +12120,11 @@ func (v *HistoryEventFilterType) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v HistoryEventFilterType) Ptr() *HistoryEventFilterType { + return &v +} + // ToWire translates HistoryEventFilterType into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -13642,6 +13682,11 @@ func (v *PendingActivityState) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v PendingActivityState) Ptr() *PendingActivityState { + return &v +} + // ToWire translates PendingActivityState into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -15186,6 +15231,11 @@ func (v *QueryTaskCompletedType) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v QueryTaskCompletedType) Ptr() *QueryTaskCompletedType { + return &v +} + // ToWire translates QueryTaskCompletedType into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -20319,6 +20369,11 @@ func (v *SignalExternalWorkflowExecutionFailedCause) UnmarshalText(value []byte) } } +// Ptr returns a pointer to this enum value. +func (v SignalExternalWorkflowExecutionFailedCause) Ptr() *SignalExternalWorkflowExecutionFailedCause { + return &v +} + // ToWire translates SignalExternalWorkflowExecutionFailedCause into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -23597,6 +23652,11 @@ func (v *TaskListKind) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v TaskListKind) Ptr() *TaskListKind { + return &v +} + // ToWire translates TaskListKind into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -23853,6 +23913,11 @@ func (v *TaskListType) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v TaskListType) Ptr() *TaskListType { + return &v +} + // ToWire translates TaskListType into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -24229,6 +24294,11 @@ func (v *TimeoutType) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v TimeoutType) Ptr() *TimeoutType { + return &v +} + // ToWire translates TimeoutType into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -26309,6 +26379,11 @@ func (v *WorkflowExecutionCloseStatus) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v WorkflowExecutionCloseStatus) Ptr() *WorkflowExecutionCloseStatus { + return &v +} + // ToWire translates WorkflowExecutionCloseStatus into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. @@ -28400,6 +28475,11 @@ func (v *WorkflowIdReusePolicy) UnmarshalText(value []byte) error { } } +// Ptr returns a pointer to this enum value. +func (v WorkflowIdReusePolicy) Ptr() *WorkflowIdReusePolicy { + return &v +} + // ToWire translates WorkflowIdReusePolicy into a Thrift-level intermediate // representation. This intermediate representation may be serialized // into bytes using a ThriftRW protocol implementation. diff --git a/Makefile b/Makefile index 4a32eb4e81a..ae46e9267d0 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,7 @@ THRIFTRW_SRCS = \ idl/github.com/uber/cadence/health.thrift \ idl/github.com/uber/cadence/history.thrift \ idl/github.com/uber/cadence/matching.thrift \ + idl/github.com/uber/cadence/replicator.thrift \ idl/github.com/uber/cadence/shared.thrift \ PROGS = cadence diff --git a/cmd/server/cadence.go b/cmd/server/cadence.go index 9072b17d869..24d60847638 100644 --- a/cmd/server/cadence.go +++ b/cmd/server/cadence.go @@ -32,7 +32,10 @@ import ( ) // validServices is the list of all valid cadence services -var validServices = []string{historyService, matchingService, frontendService} +var validServices = []string{historyService, matchingService, frontendService, workerService} + +// inDevelopmentServices is the list of services we want to support skipping logic on startup if config does not exist +var inDevelopmentServices = map[string]bool{workerService: true} // main entry point for the cadence server func main() { @@ -60,9 +63,17 @@ func startHandler(c *cli.Context) { if err := cassandra.VerifyCompatibleVersion(cassCfg, dir); err != nil { log.Fatalf("Incompatible versions", err) } - for _, svc := range getServices(c) { + + services := getServices(c) +LoadServiceLoop: + for _, svc := range services { if _, ok := cfg.Services[svc]; !ok { - log.Fatalf("`%v` service missing config", svc) + if _, ok := inDevelopmentServices[svc]; len(services) > 1 && ok { + log.Printf("Config missing for development service `%v`. Skipping to load service.\n", svc) + continue LoadServiceLoop + } else { + log.Fatalf("`%v` service missing config", svc) + } } server := newServer(svc, &cfg) server.Start() diff --git a/cmd/server/server.go b/cmd/server/server.go index 821a45477cf..29e31b5eafa 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -31,6 +31,7 @@ import ( "github.com/uber/cadence/service/frontend" "github.com/uber/cadence/service/history" "github.com/uber/cadence/service/matching" + "github.com/uber/cadence/service/worker" ) type ( @@ -46,6 +47,7 @@ const ( frontendService = "frontend" historyService = "history" matchingService = "matching" + workerService = "worker" ) // newServer returns a new instance of a daemon @@ -94,6 +96,7 @@ func (s *server) startService() common.Daemon { params.Name = "cadence-" + s.name params.Logger = s.cfg.Log.NewBarkLogger() params.CassandraConfig = s.cfg.Cassandra + params.MessagingClient = s.cfg.Kafka.NewKafkaClient() params.RingpopFactory, err = s.cfg.Ringpop.NewFactory() if err != nil { @@ -120,6 +123,8 @@ func (s *server) startService() common.Daemon { daemon = history.NewService(¶ms, history.NewConfig(s.cfg.Cassandra.NumHistoryShards)) case matchingService: daemon = matching.NewService(¶ms, matching.NewConfig()) + case workerService: + daemon = worker.NewService(¶ms, worker.NewConfig()) } go execute(daemon, s.doneC) diff --git a/common/constants.go b/common/constants.go index 9412c20b2e3..5564aac4837 100644 --- a/common/constants.go +++ b/common/constants.go @@ -36,6 +36,8 @@ const ( HistoryServiceName = "cadence-history" // MatchingServiceName is the name of the matching service MatchingServiceName = "cadence-matching" + // WorkerServiceName is the name of the worker service + WorkerServiceName = "cadence-worker" ) // Data encoding types diff --git a/common/logging/events.go b/common/logging/events.go index 50ab2c5a00f..a0ed19049a0 100644 --- a/common/logging/events.go +++ b/common/logging/events.go @@ -79,6 +79,15 @@ const ( InvalidQueryTaskEventID = 6000 QueryTaskFailedEventID = 6001 + // Worker Service Events + ReplicationTaskProcessorStarting = 7100 + ReplicationTaskProcessorStarted = 7101 + ReplicationTaskProcessorStartFailed = 7102 + ReplicationTaskProcessorShuttingDown = 7103 + ReplicationTaskProcessorShutdown = 7104 + ReplicationTaskProcessorShutdownTimedout = 7105 + ReplicationTaskProcessingFailed = 7106 + // General purpose events OperationFailed = 9000 OperationPanic = 9001 diff --git a/common/logging/helpers.go b/common/logging/helpers.go index 9e98d17cc73..98dfe9bf1e1 100644 --- a/common/logging/helpers.go +++ b/common/logging/helpers.go @@ -360,3 +360,45 @@ func LogQueryTaskFailedEvent(logger bark.Logger, domain, workflowID, runID, quer "QueryType": queryType, }).Info("QueryWorkflowFailed.") } + +// LogReplicationTaskProcessorStartingEvent is used to log replication task processor starting +func LogReplicationTaskProcessorStartingEvent(logger bark.Logger) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: ReplicationTaskProcessorStarting, + }).Info("Replication task processor starting.") +} + +// LogReplicationTaskProcessorStartedEvent is used to log replication task processor started +func LogReplicationTaskProcessorStartedEvent(logger bark.Logger) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: ReplicationTaskProcessorStarted, + }).Info("Replication task processor started.") +} + +// LogReplicationTaskProcessorStartFailedEvent is used to log replication task processor started +func LogReplicationTaskProcessorStartFailedEvent(logger bark.Logger, err error) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: ReplicationTaskProcessorStartFailed, + }).WithError(err).Warn("Replication task processor failed to start.") +} + +// LogReplicationTaskProcessorShuttingDownEvent is used to log replication task processing shutting down +func LogReplicationTaskProcessorShuttingDownEvent(logger bark.Logger) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: ReplicationTaskProcessorShuttingDown, + }).Info("Replication task processor shutting down.") +} + +// LogReplicationTaskProcessorShutdownEvent is used to log replication task processor shutdown complete +func LogReplicationTaskProcessorShutdownEvent(logger bark.Logger) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: ReplicationTaskProcessorShutdown, + }).Info("Replication task processor shutdown.") +} + +// LogReplicationTaskProcessorShutdownTimedoutEvent is used to log timeout during replication task processor shutdown +func LogReplicationTaskProcessorShutdownTimedoutEvent(logger bark.Logger) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: ReplicationTaskProcessorShutdownTimedout, + }).Warn("Replication task processor timedout on shutdown.") +} diff --git a/common/logging/tags.go b/common/logging/tags.go index a8ce94e3f0e..0b74e714cda 100644 --- a/common/logging/tags.go +++ b/common/logging/tags.go @@ -42,16 +42,20 @@ const ( TagDecisionFailCause = "decision-fail-cause" TagTaskID = "task-id" TagTaskType = "task-type" + TagTopicName = "topic-name" + TagConsumerName = "consumer-name" // workflow logging tag values // TagWorkflowComponent Values - TagValueHistoryBuilderComponent = "history-builder" - TagValueHistoryEngineComponent = "history-engine" - TagValueHistoryCacheComponent = "history-cache" - TagValueTransferQueueComponent = "transfer-queue-processor" - TagValueTimerQueueComponent = "timer-queue-processor" - TagValueShardController = "shard-controller" - TagValueMatchingEngineComponent = "matching-engine" + TagValueHistoryBuilderComponent = "history-builder" + TagValueHistoryEngineComponent = "history-engine" + TagValueHistoryCacheComponent = "history-cache" + TagValueTransferQueueComponent = "transfer-queue-processor" + TagValueTimerQueueComponent = "timer-queue-processor" + TagValueShardController = "shard-controller" + TagValueMatchingEngineComponent = "matching-engine" + TagValueReplicatorComponent = "replicator" + TagValueReplicationTaskProcessorComponent = "replication-task-processor" // TagHistoryBuilderAction values TagValueActionWorkflowStarted = "add-workflowexecution-started-event" diff --git a/common/messaging/interface.go b/common/messaging/interface.go new file mode 100644 index 00000000000..88e8a4737de --- /dev/null +++ b/common/messaging/interface.go @@ -0,0 +1,32 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package messaging + +import ( + "github.com/uber-go/kafka-client/kafka" +) + +type ( + // Client is the interface used to abstract out interaction with messaging system for replication + Client interface { + NewConsumer(topicName, consumerName string, concurrency int) (kafka.Consumer, error) + } +) diff --git a/common/messaging/kafkaClient.go b/common/messaging/kafkaClient.go new file mode 100644 index 00000000000..fd895cd3303 --- /dev/null +++ b/common/messaging/kafkaClient.go @@ -0,0 +1,67 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package messaging + +import ( + "github.com/uber-go/kafka-client" + "github.com/uber-go/kafka-client/kafka" + "strings" +) + +type ( + kafkaClient struct { + config *KafkaConfig + client *kafkaclient.Client + } +) + +// NewConsumer is used to create a Kafka consumer +func (c *kafkaClient) NewConsumer(topicName, consumerName string, concurrency int) (kafka.Consumer, error) { + clusterName := c.config.getClusterForTopic(topicName) + brokers := c.config.getBrokersForCluster(clusterName) + + consumerConfig := &kafka.ConsumerConfig{ + GroupName: consumerName, + TopicList: kafka.ConsumerTopicList{ + kafka.ConsumerTopic{ + Topic: kafka.Topic{ + Name: topicName, + Cluster: clusterName, + BrokerList: brokers, + }, + RetryQ: kafka.Topic{ + Name: strings.Join([]string{topicName, "retry"}, "-"), + Cluster: clusterName, + BrokerList: brokers, + }, + DLQ: kafka.Topic{ + Name: strings.Join([]string{topicName, "dlq"}, "-"), + Cluster: clusterName, + BrokerList: brokers, + }, + }, + }, + Concurrency: concurrency, + } + + consumer, err := c.client.NewConsumer(consumerConfig) + return consumer, err +} diff --git a/common/messaging/kafkaConfig.go b/common/messaging/kafkaConfig.go new file mode 100644 index 00000000000..4e47e2dedb2 --- /dev/null +++ b/common/messaging/kafkaConfig.go @@ -0,0 +1,77 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package messaging + +import ( + "github.com/uber-go/kafka-client" + "github.com/uber-go/kafka-client/kafka" + "github.com/uber-go/tally" + + "go.uber.org/zap" +) + +type ( + // KafkaConfig describes the configuration needed to connect to all kafka clusters + KafkaConfig struct { + Clusters map[string]ClusterConfig `yaml:"clusters"` + Topics map[string]TopicConfig `yaml:"topics"` + } + + // ClusterConfig describes the configuration for a single Kafka cluster + ClusterConfig struct { + Brokers []string `yaml:"brokers"` + } + + // TopicConfig describes the mapping from topic to Kafka cluster + TopicConfig struct { + Cluster string `yaml:"cluster"` + } +) + +// NewKafkaClient is used to create an instance of KafkaClient +func (k *KafkaConfig) NewKafkaClient() Client { + // mapping from cluster name to list of broker ip addresses + brokers := map[string][]string{} + for cluster, cfg := range k.Clusters { + brokers[cluster] = cfg.Brokers + } + + // mapping from topic name to cluster that has that topic + topicClusterAssignment := map[string][]string{} + for topic, cfg := range k.Topics { + topicClusterAssignment[topic] = []string{cfg.Cluster} + } + + client := kafkaclient.New(kafka.NewStaticNameResolver(topicClusterAssignment, brokers), zap.NewNop(), tally.NoopScope) + + return &kafkaClient{ + config: k, + client: client, + } +} + +func (k *KafkaConfig) getClusterForTopic(topic string) string { + return k.Topics[topic].Cluster +} + +func (k *KafkaConfig) getBrokersForCluster(cluster string) []string { + return k.Clusters[cluster].Brokers +} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 4803cd853ef..276c33b03e0 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -57,6 +57,7 @@ const ( Frontend History Matching + Worker NumServices ) @@ -387,6 +388,14 @@ const ( NumMatchingScopes ) +// -- Operation scopes for Worker service -- +const ( + // ReplicationScope is the scope used by all metric emitted by replicator + ReplicatorScope = iota + NumCommonScopes + + NumWorkerScopes +) + // ScopeDefs record the scopes for all services var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ // common scope Names @@ -529,6 +538,10 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ MatchingCancelOutstandingPollScope: {operation: "CancelOutstandingPoll"}, MatchingDescribeTaskListScope: {operation: "DescribeTaskList"}, }, + // Worker Scope Names + Worker: { + ReplicatorScope: {operation: "Replicator"}, + }, } // Common Metrics enum @@ -620,6 +633,13 @@ const ( BufferThrottleCounter ) +// Worker metrics enum +const ( + ReplicatorMessages = iota + NumCommonMetrics + ReplicatorFailures + ReplicatorLatency +) + // MetricDefs record the metrics for all services var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ Common: { @@ -702,6 +722,11 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ SyncThrottleCounter: {metricName: "sync.throttle.count"}, BufferThrottleCounter: {metricName: "buffer.throttle.count"}, }, + Worker: { + ReplicatorMessages: {metricName: "replicator.messages"}, + ReplicatorFailures: {metricName: "replicator.errors"}, + ReplicatorLatency: {metricName: "replicator.latency"}, + }, } // ErrorClass is an enum to help with classifying SLA vs. non-SLA errors (SLA = "service level agreement") diff --git a/common/service/config/config.go b/common/service/config/config.go index 211980c78b8..463c0d10244 100644 --- a/common/service/config/config.go +++ b/common/service/config/config.go @@ -25,6 +25,7 @@ import ( "time" "github.com/uber-go/tally/m3" + "github.com/uber/cadence/common/messaging" "github.com/uber/ringpop-go/discovery" ) @@ -41,6 +42,8 @@ type ( ClustersInfo ClustersInfo `yaml:"clustersInfo"` // Services is a map of service name to service config items Services map[string]Service `yaml:"services"` + // Kafka is the config for connecting to kafka + Kafka messaging.KafkaConfig `yaml:"kafka"` } // Service contains the service specific config items @@ -109,6 +112,10 @@ type ( NumHistoryShards int `yaml:"numHistoryShards" validate:"nonzero"` } + // Replicator describes the configuration of replicator + Replicator struct { + } + // Logger contains the config items for logger Logger struct { // Stdout is true if the output needs to goto standard out diff --git a/common/service/service.go b/common/service/service.go index f73b43cd1f5..e92291fa3eb 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -30,6 +30,7 @@ import ( "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/service/config" @@ -39,7 +40,12 @@ import ( "go.uber.org/yarpc" ) -var cadenceServices = []string{common.FrontendServiceName, common.HistoryServiceName, common.MatchingServiceName} +var cadenceServices = []string{ + common.FrontendServiceName, + common.HistoryServiceName, + common.MatchingServiceName, + common.WorkerServiceName, +} type ( // BootstrapParams holds the set of parameters @@ -53,6 +59,8 @@ type ( PProfInitializer common.PProfInitializer CassandraConfig config.Cassandra ClusterMetadata cluster.Metadata + ReplicatorConfig config.Replicator + MessagingClient messaging.Client } // RingpopFactory provides a bootstrapped ringpop @@ -223,6 +231,8 @@ func getMetricsServiceIdx(serviceName string, logger bark.Logger) metrics.Servic return metrics.History case common.MatchingServiceName: return metrics.Matching + case common.WorkerServiceName: + return metrics.Worker default: logger.Fatalf("Unknown service name '%v' for metrics!", serviceName) } diff --git a/config/development_active.yaml b/config/development_active.yaml index fdf116cdc7f..bb6f95ff7f7 100644 --- a/config/development_active.yaml +++ b/config/development_active.yaml @@ -45,10 +45,32 @@ services: pprof: port: 7937 + worker: + rpc: + port: 7940 + bindOnLocalHost: true + metrics: + statsd: + hostPort: "127.0.0.1:8125" + prefix: "cadence_active" + pprof: + port: 7941 + clustersInfo: initialFailoverVersion: 0 failoverVersionIncrement: 10 currentClusterName: "active" clusterNames: - "active" - - "standby" \ No newline at end of file + - "standby" + +kafka: + clusters: + test: + brokers: + - 127.0.0.1:9092 + topics: + active: + cluster: test + standby: + cluster: test \ No newline at end of file diff --git a/glide.lock b/glide.lock index e75f0fe6608..65dc115a77d 100644 --- a/glide.lock +++ b/glide.lock @@ -1,10 +1,8 @@ -hash: ecb8ed9e7ba54f2e184b33ae268374b10a198236bf10404754c0478c7d7c726d -updated: 2017-11-28T12:06:49.425762143-08:00 +hash: 961c13e43d7fb51e9c4e19d5fca098596754b882f54a276272717274c121a08d +updated: 2018-02-13T15:58:53.770031-08:00 imports: - name: github.com/apache/thrift - version: 53dd39833a08ce33582e5ff31fa18bb4735d6731 - repo: git://git.apache.org/thrift.git - vcs: git + version: b2a4d4ae21c789b689dd162deb819665567f481c subpackages: - lib/go/thrift - name: github.com/benbjohnson/clock @@ -13,8 +11,10 @@ imports: version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 subpackages: - quantile +- name: github.com/bsm/sarama-cluster + version: 3001c2453136632aa3219a58ea3795bb584b83b5 - name: github.com/cactus/go-statsd-client - version: 91c326c3f7bd20f0226d3d1c289dd9f8ce28d33d + version: 138b925ccdf617776955904ba7759fce64406cec subpackages: - statsd - name: github.com/davecgh/go-spew @@ -23,6 +23,14 @@ imports: - spew - name: github.com/dgryski/go-farm version: e2d0fe22b456fa0a35cd883ba355ecfcf1881490 +- name: github.com/eapache/go-resiliency + version: b1fe83b5b03f624450823b751b662259ffc6af70 + subpackages: + - breaker +- name: github.com/eapache/go-xerial-snappy + version: bb955e01b9346ac19dc29eb16586c90ded99a98c +- name: github.com/eapache/queue + version: 44cc805cf13205b55f69e14bcb69867d1ae92f98 - name: github.com/emirpasic/gods version: f6c17b524822278a87e3b3bd809fec33b51f5b46 subpackages: @@ -40,10 +48,6 @@ imports: - internal/lru - internal/murmur - internal/streams -- name: github.com/gogo/protobuf - version: 342cbe0a04158f6dcb03ca0079991a51a4248c02 - subpackages: - - proto - name: github.com/golang/mock version: 13f360950a79f5864a972c786a10a50e44b69541 subpackages: @@ -67,6 +71,12 @@ imports: - log - name: github.com/pborman/uuid version: e790cca94e6cc75c7064b1332e63811d4aae1a53 +- name: github.com/pierrec/lz4 + version: 2fcda4cb7018ce05a25959d2fe08c83e3329f169 +- name: github.com/pierrec/xxHash + version: a0006b13c722f7f12368c00a3d3c2ae8a999a0c6 + subpackages: + - xxHash32 - name: github.com/pmezard/go-difflib version: d8ed2627bdf02c080bf22230dbb337003b7aba2d subpackages: @@ -92,6 +102,8 @@ imports: - xfs - name: github.com/rcrowley/go-metrics version: 1f30fe9094a513ce4c700b9a54458bbb0c96996c +- name: github.com/Shopify/sarama + version: f7be6aa2bc7b2e38edf816b08b582782194a1c02 - name: github.com/sirupsen/logrus version: 3d4380f53a34dcdc95f0c1db702615992b38d9a4 - name: github.com/stretchr/objx @@ -107,12 +119,21 @@ imports: version: dbf558e8a7b65e2b54e1e01c14ee0e4207a865f5 - name: github.com/uber-go/atomic version: e682c1008ac17bf26d2e4b5ad6cdd08520ed0b22 +- name: github.com/uber-go/kafka-client + version: 5d9967a526b6c6533a752a83036eb16ffd4e2b5a + subpackages: + - internal/backoff + - internal/consumer + - internal/list + - internal/metrics + - internal/util + - kafka - name: github.com/uber-go/mapdecode version: 718b4994083e432669f44a00174c5f1bcdb1434d subpackages: - internal/mapstructure - name: github.com/uber-go/tally - version: 95078a8f10668bd1fa73ae46761cdc58d25436b8 + version: 6c4631652c6aab57c64f65c2e0aaec2e9aae3a64 subpackages: - m3 - m3/customtransports @@ -134,7 +155,7 @@ imports: - swim - util - name: github.com/uber/tchannel-go - version: 2d75494d3a0ffabbd00bc2cb71ed039d94def186 + version: acd4eb54c11531814d9cd0344fdc53bebe86ab12 subpackages: - internal/argreader - json @@ -153,13 +174,18 @@ imports: version: 4e336646b2ef9fc6e47be8e21594178f98e5ebcf - name: go.uber.org/multierr version: 3c4937480c32f4c13a875a1829af76c98ca3d40a +- name: go.uber.org/net/metrics + version: 1e19de5b971489a45d178e12a0f72a78c70e300e + subpackages: + - bucket + - push + - tallypush - name: go.uber.org/thriftrw - version: bce7fd589d505915f56a7901d8c143e1625e085c + version: 6216e5bb19132c14e979cf9c62abae6f022a0c0d subpackages: - - internal/semver - version - name: go.uber.org/yarpc - version: 9fe9f33dbb59ff30a8da4bf962683022531ae916 + version: 112080bb21323a681a9b269fa7b5a1489558aab0 subpackages: - api/backoff - api/encoding @@ -180,8 +206,8 @@ imports: - internal/iopool - internal/observability - internal/outboundmiddleware - - internal/pally - internal/request + - internal/yarpcerrors - peer - peer/hostport - pkg/errors @@ -200,7 +226,7 @@ imports: - internal/exit - zapcore - name: golang.org/x/net - version: 6921abc35dffd00438a0c020584ce560108737ea + version: f5dfe339be1d06f81b22525fe34671ee7d2c8904 repo: https://github.com/golang/net subpackages: - bpf diff --git a/glide.yaml b/glide.yaml index c1108457edf..871ce606359 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,7 +1,9 @@ package: github.com/uber/cadence +excludeDirs: +- .gen import: - package: github.com/uber/tchannel-go - version: 2d75494d3a0ffabbd00bc2cb71ed039d94def186 + version: ^1 subpackages: - thrift - thrift/thrift-gen @@ -23,8 +25,12 @@ import: - package: github.com/urfave/cli - package: gopkg.in/yaml.v2 - package: gopkg.in/validator.v2 -- package: golang.org/x/time/rate -- package: github.com/cactus/go-statsd-client/statsd +- package: golang.org/x/time + subpackages: + - rate +- package: github.com/cactus/go-statsd-client + subpackages: + - statsd - package: go.uber.org/thriftrw version: ^1.6 - package: go.uber.org/yarpc @@ -34,6 +40,7 @@ import: - encoding/thrift/thriftrw-plugin-yarpc - transport/http - transport/tchannel +- package: github.com/uber-go/kafka-client # Added excludeDirs to prevent build from failing on the yarpc generated code. excludeDirs: diff --git a/idl/github.com/uber/cadence/replicator.thrift b/idl/github.com/uber/cadence/replicator.thrift new file mode 100644 index 00000000000..79bb3b6cfe9 --- /dev/null +++ b/idl/github.com/uber/cadence/replicator.thrift @@ -0,0 +1,39 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +namespace java com.uber.cadence.replicator + +enum ReplicationTaskType { + Domain + History +} + +struct DomainTaskAttributes { +} + +struct HistoryTaskAttributes { +} + +struct ReplicationTask { + 10: optional ReplicationTaskType taskType + 20: optional DomainTaskAttributes domainTaskAttributes + 30: optional HistoryTaskAttributes historyTaskAttributes +} + diff --git a/service/worker/processor.go b/service/worker/processor.go new file mode 100644 index 00000000000..f7c2cb052c1 --- /dev/null +++ b/service/worker/processor.go @@ -0,0 +1,179 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package worker + +import ( + "sync" + "sync/atomic" + "time" + + "encoding/json" + "github.com/uber-common/bark" + "github.com/uber-go/kafka-client/kafka" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/logging" + "github.com/uber/cadence/common/messaging" +) + +type ( + replicationTaskProcessor struct { + topicName string + consumerName string + client messaging.Client + consumer kafka.Consumer + isStarted int32 + isStopped int32 + shutdownWG sync.WaitGroup + shutdownCh chan struct{} + config *Config + logger bark.Logger + } +) + +func newReplicationTaskProcessor(topic, consumer string, client messaging.Client, config *Config, + logger bark.Logger) *replicationTaskProcessor { + return &replicationTaskProcessor{ + topicName: topic, + consumerName: consumer, + client: client, + shutdownCh: make(chan struct{}), + config: config, + logger: logger.WithFields(bark.Fields{ + logging.TagWorkflowComponent: logging.TagValueReplicationTaskProcessorComponent, + logging.TagTopicName: topic, + logging.TagConsumerName: consumer, + }), + } +} + +func (p *replicationTaskProcessor) Start() error { + if !atomic.CompareAndSwapInt32(&p.isStarted, 0, 1) { + return nil + } + + logging.LogReplicationTaskProcessorStartingEvent(p.logger) + consumer, err := p.client.NewConsumer(p.topicName, p.consumerName, p.config.ReplicatorConcurrency) + if err != nil { + logging.LogReplicationTaskProcessorStartFailedEvent(p.logger, err) + return err + } + + if err := consumer.Start(); err != nil { + logging.LogReplicationTaskProcessorStartFailedEvent(p.logger, err) + return err + } + + p.consumer = consumer + p.shutdownWG.Add(1) + go p.processorPump() + + logging.LogReplicationTaskProcessorStartedEvent(p.logger) + return nil +} + +func (p *replicationTaskProcessor) Stop() { + if !atomic.CompareAndSwapInt32(&p.isStopped, 0, 1) { + return + } + + logging.LogReplicationTaskProcessorShuttingDownEvent(p.logger) + defer logging.LogReplicationTaskProcessorShutdownEvent(p.logger) + + if atomic.LoadInt32(&p.isStarted) == 1 { + close(p.shutdownCh) + } + + if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success { + logging.LogReplicationTaskProcessorShutdownTimedoutEvent(p.logger) + } +} + +func (p *replicationTaskProcessor) processorPump() { + defer p.shutdownWG.Done() + + var workerWG sync.WaitGroup + for i := 0; i < p.config.ReplicatorConcurrency; i++ { + workerWG.Add(1) + go p.worker(&workerWG) + } + +processorPumpLoop: + for { + select { + case <-p.shutdownCh: + // Processor is shutting down, close the underlying consumer + p.consumer.Stop() + break processorPumpLoop + } + } + + p.logger.Info("Replication task processor pump shutting down.") + if success := common.AwaitWaitGroup(&workerWG, 10*time.Second); !success { + p.logger.Warn("Replication task processor timed out on worker shutdown.") + } +} + +func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { + defer workerWG.Done() + + for { + select { + case msg, ok := <-p.consumer.Messages(): + if !ok { + p.logger.Info("Worker for replication task processor shutting down.") + return // channel closed + } + + if task, err := deserialize(msg.Value()); err != nil { + p.logger.Errorf("Deserialize Error. Value: %v, Error: %v", string(msg.Value()), err) + } else { + + if task.TaskType == nil { + p.logger.Warn("Empty replication task.") + } + + switch task.GetTaskType() { + case replicator.ReplicationTaskTypeDomain: + p.logger.Info("Recieved replication task for domain.") + case replicator.ReplicationTaskTypeHistory: + p.logger.Info("Recieved replication task for history.") + default: + p.logger.Errorf("Unknown replication task: %v", task.TaskType) + } + } + + msg.Ack() + case <-p.consumer.Closed(): + p.logger.Info("Consumer closed. Processor shutting down.") + return + } + } +} + +func deserialize(payload []byte) (*replicator.ReplicationTask, error) { + var task replicator.ReplicationTask + if err := json.Unmarshal(payload, &task); err != nil { + return nil, err + } + + return &task, nil +} diff --git a/service/worker/replicator.go b/service/worker/replicator.go new file mode 100644 index 00000000000..58bb7c4e837 --- /dev/null +++ b/service/worker/replicator.go @@ -0,0 +1,88 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package worker + +import ( + "fmt" + "github.com/uber-common/bark" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/logging" + "github.com/uber/cadence/common/messaging" +) + +type ( + // Replicator is the processor for replication tasks + Replicator struct { + metadata cluster.Metadata + config *Config + client messaging.Client + processors []*replicationTaskProcessor + logger bark.Logger + } +) + +// NewReplicator creates a new replicator for processing replication tasks +func NewReplicator(metadata cluster.Metadata, config *Config, client messaging.Client, logger bark.Logger) *Replicator { + return &Replicator{ + metadata: metadata, + config: config, + client: client, + logger: logger.WithFields(bark.Fields{ + logging.TagWorkflowComponent: logging.TagValueReplicatorComponent, + }), + } +} + +// Start is called to start replicator +func (r *Replicator) Start() error { + currentClusterName := r.metadata.GetCurrentClusterName() + for cluster := range r.metadata.GetAllClusterNames() { + if cluster != currentClusterName { + topicName := getTopicName(cluster) + consumerName := getConsumerName(currentClusterName, cluster) + r.processors = append(r.processors, newReplicationTaskProcessor(topicName, consumerName, r.client, r.config, + r.logger)) + } + } + + for _, processor := range r.processors { + if err := processor.Start(); err != nil { + return err + } + } + + return nil +} + +// Stop is called to stop replicator +func (r *Replicator) Stop() { + for _, processor := range r.processors { + processor.Stop() + } +} + +func getConsumerName(currentCluster, remoteCluster string) string { + return fmt.Sprintf("%v_consumer_for_%v", currentCluster, remoteCluster) +} + +func getTopicName(sourceCluster string) string { + return sourceCluster +} diff --git a/service/worker/service.go b/service/worker/service.go new file mode 100644 index 00000000000..904417df85a --- /dev/null +++ b/service/worker/service.go @@ -0,0 +1,86 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package worker + +import ( + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/service" +) + +type ( + // Service represents the cadence-worker service. This service host all background processing which needs to happen + // for a Cadence cluster. This service runs the replicator which is responsible for applying replication tasks + // generated by remote clusters. + Service struct { + stopC chan struct{} + params *service.BootstrapParams + config *Config + } + + // Config contains all the service config for worker + Config struct { + // Replicator settings + ReplicatorConcurrency int + } +) + +// NewService builds a new cadence-worker service +func NewService(params *service.BootstrapParams, config *Config) common.Daemon { + return &Service{ + params: params, + config: config, + stopC: make(chan struct{}), + } +} + +// NewConfig builds the new Config for cadence-worker service +func NewConfig() *Config { + return &Config{ + ReplicatorConcurrency: 10, + } +} + +// Start is called to start the service +func (s *Service) Start() { + p := s.params + log := p.Logger + + log.Infof("%v starting", common.WorkerServiceName) + base := service.New(p) + + replicator := NewReplicator(p.ClusterMetadata, s.config, p.MessagingClient, log) + if err := replicator.Start(); err != nil { + log.Fatalf("Fail to start replicator: %v", err) + } + + log.Infof("%v started", common.WorkerServiceName) + <-s.stopC + base.Stop() +} + +// Stop is called to stop the service +func (s *Service) Stop() { + select { + case s.stopC <- struct{}{}: + default: + } + s.params.Logger.Infof("%v stopped", common.WorkerServiceName) +} From f6cec246f2ce9bc856d4bf5a902e77a692477fda Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Fri, 16 Feb 2018 09:15:19 -0800 Subject: [PATCH 2/3] Add readme for running dev setup --- service/worker/README.md | 54 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 service/worker/README.md diff --git a/service/worker/README.md b/service/worker/README.md new file mode 100644 index 00000000000..2e2074860bc --- /dev/null +++ b/service/worker/README.md @@ -0,0 +1,54 @@ +Cadence Worker (In Development) +=============================== + +Cadence Worker is a new role for Cadence service used for hosting any +components responsible for performing background processing on the Cadence +cluster. + +Replicator +---------- + +Replicator is a background worker responsible for consuming replication tasks +generated by remote Cadence clusters and pass it down to processor so they +can be applied to local Cadence cluster. + +It uses Kafka as the replication tasks buffer and relies on +[kafka-client library] (https://github.com/uber-go/kafka-client/) for consuming +messages from Kafka. + + +Quickstart for localhost development +==================================== + +1. Setup Kafka by following instructions: +[Kafka Quickstart](https://kafka.apache.org/quickstart) +2. Create Kafka topic for active cluster: +``` +bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic active +``` +3. Create Kafka topic for standby cluster: +``` +bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic standby +``` +4. Start Cadence development server for active zone: +``` +./cadence --zone active start +``` + + +Cadence cluster is now running with the replicator consuming messages from +Kafka topic standby. + +Create replication task using CLI +--------------------------------- + +Kafka CLI can be used to generate a replication task for testing purpose: + +``` +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic standby +``` + +Replication task message: +``` +{taskType: 0} +``` From d82e9b6c520c0953872285ef52d841c545cc1ddc Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Tue, 20 Feb 2018 10:54:04 -0800 Subject: [PATCH 3/3] address code review feedback --- cmd/server/server.go | 5 +- common/messaging/kafkaConfig.go | 2 +- service/worker/processor.go | 81 ++++++++++++++++++++------------- service/worker/replicator.go | 18 +++++--- service/worker/service.go | 13 ++++-- 5 files changed, 75 insertions(+), 44 deletions(-) diff --git a/cmd/server/server.go b/cmd/server/server.go index 29e31b5eafa..dd48e3145a7 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -32,6 +32,8 @@ import ( "github.com/uber/cadence/service/history" "github.com/uber/cadence/service/matching" "github.com/uber/cadence/service/worker" + + "go.uber.org/zap" ) type ( @@ -96,7 +98,6 @@ func (s *server) startService() common.Daemon { params.Name = "cadence-" + s.name params.Logger = s.cfg.Log.NewBarkLogger() params.CassandraConfig = s.cfg.Cassandra - params.MessagingClient = s.cfg.Kafka.NewKafkaClient() params.RingpopFactory, err = s.cfg.Ringpop.NewFactory() if err != nil { @@ -113,6 +114,8 @@ func (s *server) startService() common.Daemon { s.cfg.ClustersInfo.CurrentClusterName, s.cfg.ClustersInfo.ClusterNames, ) + // TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop + params.MessagingClient = s.cfg.Kafka.NewKafkaClient(zap.NewNop(), params.MetricScope) var daemon common.Daemon diff --git a/common/messaging/kafkaConfig.go b/common/messaging/kafkaConfig.go index 4e47e2dedb2..70948849c74 100644 --- a/common/messaging/kafkaConfig.go +++ b/common/messaging/kafkaConfig.go @@ -47,7 +47,7 @@ type ( ) // NewKafkaClient is used to create an instance of KafkaClient -func (k *KafkaConfig) NewKafkaClient() Client { +func (k *KafkaConfig) NewKafkaClient(logger *zap.Logger, metricScope tally.Scope) Client { // mapping from cluster name to list of broker ip addresses brokers := map[string][]string{} for cluster, cfg := range k.Clusters { diff --git a/service/worker/processor.go b/service/worker/processor.go index f7c2cb052c1..199fbda4160 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -21,6 +21,8 @@ package worker import ( + "errors" + "fmt" "sync" "sync/atomic" "time" @@ -32,25 +34,34 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" ) type ( replicationTaskProcessor struct { - topicName string - consumerName string - client messaging.Client - consumer kafka.Consumer - isStarted int32 - isStopped int32 - shutdownWG sync.WaitGroup - shutdownCh chan struct{} - config *Config - logger bark.Logger + topicName string + consumerName string + client messaging.Client + consumer kafka.Consumer + isStarted int32 + isStopped int32 + shutdownWG sync.WaitGroup + shutdownCh chan struct{} + config *Config + logger bark.Logger + metricsClient metrics.Client } ) +var ( + // ErrEmptyReplicationTask is the error to indicate empty replication task + ErrEmptyReplicationTask = errors.New("empty replication task") + // ErrUnknownReplicationTask is the error to indicate unknown replication task type + ErrUnknownReplicationTask = errors.New("unknown replication task") +) + func newReplicationTaskProcessor(topic, consumer string, client messaging.Client, config *Config, - logger bark.Logger) *replicationTaskProcessor { + logger bark.Logger, metricsClient metrics.Client) *replicationTaskProcessor { return &replicationTaskProcessor{ topicName: topic, consumerName: consumer, @@ -62,6 +73,7 @@ func newReplicationTaskProcessor(topic, consumer string, client messaging.Client logging.TagTopicName: topic, logging.TagConsumerName: consumer, }), + metricsClient: metricsClient, } } @@ -116,14 +128,10 @@ func (p *replicationTaskProcessor) processorPump() { go p.worker(&workerWG) } -processorPumpLoop: - for { - select { - case <-p.shutdownCh: - // Processor is shutting down, close the underlying consumer - p.consumer.Stop() - break processorPumpLoop - } + select { + case <-p.shutdownCh: + // Processor is shutting down, close the underlying consumer + p.consumer.Stop() } p.logger.Info("Replication task processor pump shutting down.") @@ -143,24 +151,35 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { return // channel closed } - if task, err := deserialize(msg.Value()); err != nil { - p.logger.Errorf("Deserialize Error. Value: %v, Error: %v", string(msg.Value()), err) + p.metricsClient.IncCounter(metrics.ReplicatorScope, metrics.ReplicatorMessages) + sw := p.metricsClient.StartTimer(metrics.ReplicatorScope, metrics.ReplicatorLatency) + + // TODO: We skip over any messages which cannot be deserialized. Figure out DLQ story for corrupted messages. + task, err := deserialize(msg.Value()) + if err != nil { + err = fmt.Errorf("Deserialize Error. Value: %v, Error: %v", string(msg.Value()), err) } else { + // TODO: We need to figure out DLQ story for corrupted payload if task.TaskType == nil { - p.logger.Warn("Empty replication task.") - } - - switch task.GetTaskType() { - case replicator.ReplicationTaskTypeDomain: - p.logger.Info("Recieved replication task for domain.") - case replicator.ReplicationTaskTypeHistory: - p.logger.Info("Recieved replication task for history.") - default: - p.logger.Errorf("Unknown replication task: %v", task.TaskType) + err = ErrEmptyReplicationTask + } else { + switch task.GetTaskType() { + case replicator.ReplicationTaskTypeDomain: + p.logger.Info("Recieved replication task for domain.") + case replicator.ReplicationTaskTypeHistory: + p.logger.Info("Recieved replication task for history.") + default: + err = ErrUnknownReplicationTask + } } } + if err != nil { + p.logger.WithField(logging.TagErr, err).Error("Error processing replication task.") + p.metricsClient.IncCounter(metrics.ReplicatorScope, metrics.ReplicatorFailures) + } + sw.Stop() msg.Ack() case <-p.consumer.Closed(): p.logger.Info("Consumer closed. Processor shutting down.") diff --git a/service/worker/replicator.go b/service/worker/replicator.go index 58bb7c4e837..aa2f61c8ef2 100644 --- a/service/worker/replicator.go +++ b/service/worker/replicator.go @@ -26,21 +26,24 @@ import ( "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" ) type ( // Replicator is the processor for replication tasks Replicator struct { - metadata cluster.Metadata - config *Config - client messaging.Client - processors []*replicationTaskProcessor - logger bark.Logger + metadata cluster.Metadata + config *Config + client messaging.Client + processors []*replicationTaskProcessor + logger bark.Logger + metricsClient metrics.Client } ) // NewReplicator creates a new replicator for processing replication tasks -func NewReplicator(metadata cluster.Metadata, config *Config, client messaging.Client, logger bark.Logger) *Replicator { +func NewReplicator(metadata cluster.Metadata, config *Config, client messaging.Client, logger bark.Logger, + metricsClient metrics.Client) *Replicator { return &Replicator{ metadata: metadata, config: config, @@ -48,6 +51,7 @@ func NewReplicator(metadata cluster.Metadata, config *Config, client messaging.C logger: logger.WithFields(bark.Fields{ logging.TagWorkflowComponent: logging.TagValueReplicatorComponent, }), + metricsClient: metricsClient, } } @@ -59,7 +63,7 @@ func (r *Replicator) Start() error { topicName := getTopicName(cluster) consumerName := getConsumerName(currentClusterName, cluster) r.processors = append(r.processors, newReplicationTaskProcessor(topicName, consumerName, r.client, r.config, - r.logger)) + r.logger, r.metricsClient)) } } diff --git a/service/worker/service.go b/service/worker/service.go index 904417df85a..8a3f58ed2fb 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -22,6 +22,7 @@ package worker import ( "github.com/uber/cadence/common" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/service" ) @@ -30,9 +31,10 @@ type ( // for a Cadence cluster. This service runs the replicator which is responsible for applying replication tasks // generated by remote clusters. Service struct { - stopC chan struct{} - params *service.BootstrapParams - config *Config + stopC chan struct{} + params *service.BootstrapParams + config *Config + metricsClient metrics.Client } // Config contains all the service config for worker @@ -66,8 +68,11 @@ func (s *Service) Start() { log.Infof("%v starting", common.WorkerServiceName) base := service.New(p) - replicator := NewReplicator(p.ClusterMetadata, s.config, p.MessagingClient, log) + s.metricsClient = base.GetMetricsClient() + + replicator := NewReplicator(p.ClusterMetadata, s.config, p.MessagingClient, log, s.metricsClient) if err := replicator.Start(); err != nil { + replicator.Stop() log.Fatalf("Fail to start replicator: %v", err) }