From fa74c6295391e213288545cb82776146ca72cef8 Mon Sep 17 00:00:00 2001 From: Samar Abbas - Uber Date: Mon, 2 Apr 2018 09:21:20 -0700 Subject: [PATCH] Apply replication history events to passive cluster (#643) Created history replicator which is invoked by the replicator for processing of history replication tasks. It processes the history events from the replication task and make mutable state updates for each event. Once all events are processed it commits the entire update using the workflowContext API used by rest of the stack. Also mutableStateBuilder changes to apply mutable state changes using the actual event. This required some refactoring of mutableStateBuilder to reuse as much code as possible between replicator and rest of the stack. History service has new API ReplicateEvents which is called by the replicator to apply history events. Current this change only works for the happy case and is not guarded by version updates on the domain. Replicator does not support out of order processing of history replication tasks. --- .../history/historyservice_replicateevents.go | 497 ++++++++++++++++++ .../go/history/historyserviceclient/client.go | 29 + .../go/history/historyserviceserver/server.go | 37 +- .gen/go/history/historyservicetest/client.go | 31 ++ .gen/go/history/idl.go | 4 +- .gen/go/history/types.go | 278 ++++++++++ client/history/client.go | 18 + client/history/metricClient.go | 17 + common/metrics/defs.go | 6 + common/mocks/HistoryClient.go | 14 + idl/github.com/uber/cadence/history.thrift | 17 + service/history/MockHistoryEngine.go | 14 + service/history/handler.go | 28 + service/history/historyBuilder.go | 8 + service/history/historyEngine.go | 7 + service/history/historyEngineInterfaces.go | 1 + service/history/historyReplicator.go | 291 ++++++++++ service/history/mutableStateBuilder.go | 167 ++++-- service/history/workflowExecutionContext.go | 54 +- service/worker/processor.go | 27 +- service/worker/replicator.go | 8 +- service/worker/service.go | 9 +- 22 files changed, 1487 insertions(+), 75 deletions(-) create mode 100644 .gen/go/history/historyservice_replicateevents.go create mode 100644 service/history/historyReplicator.go diff --git a/.gen/go/history/historyservice_replicateevents.go b/.gen/go/history/historyservice_replicateevents.go new file mode 100644 index 00000000000..5966760f736 --- /dev/null +++ b/.gen/go/history/historyservice_replicateevents.go @@ -0,0 +1,497 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by thriftrw v1.10.0. DO NOT EDIT. +// @generated + +package history + +import ( + "errors" + "fmt" + "github.com/uber/cadence/.gen/go/shared" + "go.uber.org/thriftrw/wire" + "strings" +) + +// HistoryService_ReplicateEvents_Args represents the arguments for the HistoryService.ReplicateEvents function. +// +// The arguments for ReplicateEvents are sent and received over the wire as this struct. +type HistoryService_ReplicateEvents_Args struct { + ReplicateRequest *ReplicateEventsRequest `json:"replicateRequest,omitempty"` +} + +// ToWire translates a HistoryService_ReplicateEvents_Args struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *HistoryService_ReplicateEvents_Args) ToWire() (wire.Value, error) { + var ( + fields [1]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.ReplicateRequest != nil { + w, err = v.ReplicateRequest.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 1, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +func _ReplicateEventsRequest_Read(w wire.Value) (*ReplicateEventsRequest, error) { + var v ReplicateEventsRequest + err := v.FromWire(w) + return &v, err +} + +// FromWire deserializes a HistoryService_ReplicateEvents_Args struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a HistoryService_ReplicateEvents_Args struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v HistoryService_ReplicateEvents_Args +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *HistoryService_ReplicateEvents_Args) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 1: + if field.Value.Type() == wire.TStruct { + v.ReplicateRequest, err = _ReplicateEventsRequest_Read(field.Value) + if err != nil { + return err + } + + } + } + } + + return nil +} + +// String returns a readable string representation of a HistoryService_ReplicateEvents_Args +// struct. +func (v *HistoryService_ReplicateEvents_Args) String() string { + if v == nil { + return "" + } + + var fields [1]string + i := 0 + if v.ReplicateRequest != nil { + fields[i] = fmt.Sprintf("ReplicateRequest: %v", v.ReplicateRequest) + i++ + } + + return fmt.Sprintf("HistoryService_ReplicateEvents_Args{%v}", strings.Join(fields[:i], ", ")) +} + +// Equals returns true if all the fields of this HistoryService_ReplicateEvents_Args match the +// provided HistoryService_ReplicateEvents_Args. +// +// This function performs a deep comparison. +func (v *HistoryService_ReplicateEvents_Args) Equals(rhs *HistoryService_ReplicateEvents_Args) bool { + if !((v.ReplicateRequest == nil && rhs.ReplicateRequest == nil) || (v.ReplicateRequest != nil && rhs.ReplicateRequest != nil && v.ReplicateRequest.Equals(rhs.ReplicateRequest))) { + return false + } + + return true +} + +// MethodName returns the name of the Thrift function as specified in +// the IDL, for which this struct represent the arguments. +// +// This will always be "ReplicateEvents" for this struct. +func (v *HistoryService_ReplicateEvents_Args) MethodName() string { + return "ReplicateEvents" +} + +// EnvelopeType returns the kind of value inside this struct. +// +// This will always be Call for this struct. +func (v *HistoryService_ReplicateEvents_Args) EnvelopeType() wire.EnvelopeType { + return wire.Call +} + +// HistoryService_ReplicateEvents_Helper provides functions that aid in handling the +// parameters and return values of the HistoryService.ReplicateEvents +// function. +var HistoryService_ReplicateEvents_Helper = struct { + // Args accepts the parameters of ReplicateEvents in-order and returns + // the arguments struct for the function. + Args func( + replicateRequest *ReplicateEventsRequest, + ) *HistoryService_ReplicateEvents_Args + + // IsException returns true if the given error can be thrown + // by ReplicateEvents. + // + // An error can be thrown by ReplicateEvents only if the + // corresponding exception type was mentioned in the 'throws' + // section for it in the Thrift file. + IsException func(error) bool + + // WrapResponse returns the result struct for ReplicateEvents + // given the error returned by it. The provided error may + // be nil if ReplicateEvents did not fail. + // + // This allows mapping errors returned by ReplicateEvents into a + // serializable result struct. WrapResponse returns a + // non-nil error if the provided error cannot be thrown by + // ReplicateEvents + // + // err := ReplicateEvents(args) + // result, err := HistoryService_ReplicateEvents_Helper.WrapResponse(err) + // if err != nil { + // return fmt.Errorf("unexpected error from ReplicateEvents: %v", err) + // } + // serialize(result) + WrapResponse func(error) (*HistoryService_ReplicateEvents_Result, error) + + // UnwrapResponse takes the result struct for ReplicateEvents + // and returns the erorr returned by it (if any). + // + // The error is non-nil only if ReplicateEvents threw an + // exception. + // + // result := deserialize(bytes) + // err := HistoryService_ReplicateEvents_Helper.UnwrapResponse(result) + UnwrapResponse func(*HistoryService_ReplicateEvents_Result) error +}{} + +func init() { + HistoryService_ReplicateEvents_Helper.Args = func( + replicateRequest *ReplicateEventsRequest, + ) *HistoryService_ReplicateEvents_Args { + return &HistoryService_ReplicateEvents_Args{ + ReplicateRequest: replicateRequest, + } + } + + HistoryService_ReplicateEvents_Helper.IsException = func(err error) bool { + switch err.(type) { + case *shared.BadRequestError: + return true + case *shared.InternalServiceError: + return true + case *shared.EntityNotExistsError: + return true + case *ShardOwnershipLostError: + return true + default: + return false + } + } + + HistoryService_ReplicateEvents_Helper.WrapResponse = func(err error) (*HistoryService_ReplicateEvents_Result, error) { + if err == nil { + return &HistoryService_ReplicateEvents_Result{}, nil + } + + switch e := err.(type) { + case *shared.BadRequestError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for HistoryService_ReplicateEvents_Result.BadRequestError") + } + return &HistoryService_ReplicateEvents_Result{BadRequestError: e}, nil + case *shared.InternalServiceError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for HistoryService_ReplicateEvents_Result.InternalServiceError") + } + return &HistoryService_ReplicateEvents_Result{InternalServiceError: e}, nil + case *shared.EntityNotExistsError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for HistoryService_ReplicateEvents_Result.EntityNotExistError") + } + return &HistoryService_ReplicateEvents_Result{EntityNotExistError: e}, nil + case *ShardOwnershipLostError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for HistoryService_ReplicateEvents_Result.ShardOwnershipLostError") + } + return &HistoryService_ReplicateEvents_Result{ShardOwnershipLostError: e}, nil + } + + return nil, err + } + HistoryService_ReplicateEvents_Helper.UnwrapResponse = func(result *HistoryService_ReplicateEvents_Result) (err error) { + if result.BadRequestError != nil { + err = result.BadRequestError + return + } + if result.InternalServiceError != nil { + err = result.InternalServiceError + return + } + if result.EntityNotExistError != nil { + err = result.EntityNotExistError + return + } + if result.ShardOwnershipLostError != nil { + err = result.ShardOwnershipLostError + return + } + return + } + +} + +// HistoryService_ReplicateEvents_Result represents the result of a HistoryService.ReplicateEvents function call. +// +// The result of a ReplicateEvents execution is sent and received over the wire as this struct. +type HistoryService_ReplicateEvents_Result struct { + BadRequestError *shared.BadRequestError `json:"badRequestError,omitempty"` + InternalServiceError *shared.InternalServiceError `json:"internalServiceError,omitempty"` + EntityNotExistError *shared.EntityNotExistsError `json:"entityNotExistError,omitempty"` + ShardOwnershipLostError *ShardOwnershipLostError `json:"shardOwnershipLostError,omitempty"` +} + +// ToWire translates a HistoryService_ReplicateEvents_Result struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *HistoryService_ReplicateEvents_Result) ToWire() (wire.Value, error) { + var ( + fields [4]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.BadRequestError != nil { + w, err = v.BadRequestError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 1, Value: w} + i++ + } + if v.InternalServiceError != nil { + w, err = v.InternalServiceError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 2, Value: w} + i++ + } + if v.EntityNotExistError != nil { + w, err = v.EntityNotExistError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 3, Value: w} + i++ + } + if v.ShardOwnershipLostError != nil { + w, err = v.ShardOwnershipLostError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 4, Value: w} + i++ + } + + if i > 1 { + return wire.Value{}, fmt.Errorf("HistoryService_ReplicateEvents_Result should have at most one field: got %v fields", i) + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +// FromWire deserializes a HistoryService_ReplicateEvents_Result struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a HistoryService_ReplicateEvents_Result struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v HistoryService_ReplicateEvents_Result +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *HistoryService_ReplicateEvents_Result) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 1: + if field.Value.Type() == wire.TStruct { + v.BadRequestError, err = _BadRequestError_Read(field.Value) + if err != nil { + return err + } + + } + case 2: + if field.Value.Type() == wire.TStruct { + v.InternalServiceError, err = _InternalServiceError_Read(field.Value) + if err != nil { + return err + } + + } + case 3: + if field.Value.Type() == wire.TStruct { + v.EntityNotExistError, err = _EntityNotExistsError_Read(field.Value) + if err != nil { + return err + } + + } + case 4: + if field.Value.Type() == wire.TStruct { + v.ShardOwnershipLostError, err = _ShardOwnershipLostError_Read(field.Value) + if err != nil { + return err + } + + } + } + } + + count := 0 + if v.BadRequestError != nil { + count++ + } + if v.InternalServiceError != nil { + count++ + } + if v.EntityNotExistError != nil { + count++ + } + if v.ShardOwnershipLostError != nil { + count++ + } + if count > 1 { + return fmt.Errorf("HistoryService_ReplicateEvents_Result should have at most one field: got %v fields", count) + } + + return nil +} + +// String returns a readable string representation of a HistoryService_ReplicateEvents_Result +// struct. +func (v *HistoryService_ReplicateEvents_Result) String() string { + if v == nil { + return "" + } + + var fields [4]string + i := 0 + if v.BadRequestError != nil { + fields[i] = fmt.Sprintf("BadRequestError: %v", v.BadRequestError) + i++ + } + if v.InternalServiceError != nil { + fields[i] = fmt.Sprintf("InternalServiceError: %v", v.InternalServiceError) + i++ + } + if v.EntityNotExistError != nil { + fields[i] = fmt.Sprintf("EntityNotExistError: %v", v.EntityNotExistError) + i++ + } + if v.ShardOwnershipLostError != nil { + fields[i] = fmt.Sprintf("ShardOwnershipLostError: %v", v.ShardOwnershipLostError) + i++ + } + + return fmt.Sprintf("HistoryService_ReplicateEvents_Result{%v}", strings.Join(fields[:i], ", ")) +} + +// Equals returns true if all the fields of this HistoryService_ReplicateEvents_Result match the +// provided HistoryService_ReplicateEvents_Result. +// +// This function performs a deep comparison. +func (v *HistoryService_ReplicateEvents_Result) Equals(rhs *HistoryService_ReplicateEvents_Result) bool { + if !((v.BadRequestError == nil && rhs.BadRequestError == nil) || (v.BadRequestError != nil && rhs.BadRequestError != nil && v.BadRequestError.Equals(rhs.BadRequestError))) { + return false + } + if !((v.InternalServiceError == nil && rhs.InternalServiceError == nil) || (v.InternalServiceError != nil && rhs.InternalServiceError != nil && v.InternalServiceError.Equals(rhs.InternalServiceError))) { + return false + } + if !((v.EntityNotExistError == nil && rhs.EntityNotExistError == nil) || (v.EntityNotExistError != nil && rhs.EntityNotExistError != nil && v.EntityNotExistError.Equals(rhs.EntityNotExistError))) { + return false + } + if !((v.ShardOwnershipLostError == nil && rhs.ShardOwnershipLostError == nil) || (v.ShardOwnershipLostError != nil && rhs.ShardOwnershipLostError != nil && v.ShardOwnershipLostError.Equals(rhs.ShardOwnershipLostError))) { + return false + } + + return true +} + +// MethodName returns the name of the Thrift function as specified in +// the IDL, for which this struct represent the result. +// +// This will always be "ReplicateEvents" for this struct. +func (v *HistoryService_ReplicateEvents_Result) MethodName() string { + return "ReplicateEvents" +} + +// EnvelopeType returns the kind of value inside this struct. +// +// This will always be Reply for this struct. +func (v *HistoryService_ReplicateEvents_Result) EnvelopeType() wire.EnvelopeType { + return wire.Reply +} diff --git a/.gen/go/history/historyserviceclient/client.go b/.gen/go/history/historyserviceclient/client.go index 7edae74f840..5c43916d3f9 100644 --- a/.gen/go/history/historyserviceclient/client.go +++ b/.gen/go/history/historyserviceclient/client.go @@ -78,6 +78,12 @@ type Interface interface { opts ...yarpc.CallOption, ) error + ReplicateEvents( + ctx context.Context, + ReplicateRequest *history.ReplicateEventsRequest, + opts ...yarpc.CallOption, + ) error + RequestCancelWorkflowExecution( ctx context.Context, CancelRequest *history.RequestCancelWorkflowExecutionRequest, @@ -336,6 +342,29 @@ func (c client) RemoveSignalMutableState( return } +func (c client) ReplicateEvents( + ctx context.Context, + _ReplicateRequest *history.ReplicateEventsRequest, + opts ...yarpc.CallOption, +) (err error) { + + args := history.HistoryService_ReplicateEvents_Helper.Args(_ReplicateRequest) + + var body wire.Value + body, err = c.c.Call(ctx, args, opts...) + if err != nil { + return + } + + var result history.HistoryService_ReplicateEvents_Result + if err = result.FromWire(body); err != nil { + return + } + + err = history.HistoryService_ReplicateEvents_Helper.UnwrapResponse(&result) + return +} + func (c client) RequestCancelWorkflowExecution( ctx context.Context, _CancelRequest *history.RequestCancelWorkflowExecutionRequest, diff --git a/.gen/go/history/historyserviceserver/server.go b/.gen/go/history/historyserviceserver/server.go index 13693ad5017..42efa75c401 100644 --- a/.gen/go/history/historyserviceserver/server.go +++ b/.gen/go/history/historyserviceserver/server.go @@ -69,6 +69,11 @@ type Interface interface { RemoveRequest *history.RemoveSignalMutableStateRequest, ) error + ReplicateEvents( + ctx context.Context, + ReplicateRequest *history.ReplicateEventsRequest, + ) error + RequestCancelWorkflowExecution( ctx context.Context, CancelRequest *history.RequestCancelWorkflowExecutionRequest, @@ -218,6 +223,17 @@ func New(impl Interface, opts ...thrift.RegisterOption) []transport.Procedure { ThriftModule: history.ThriftModule, }, + thrift.Method{ + Name: "ReplicateEvents", + HandlerSpec: thrift.HandlerSpec{ + + Type: transport.Unary, + Unary: thrift.UnaryHandler(h.ReplicateEvents), + }, + Signature: "ReplicateEvents(ReplicateRequest *history.ReplicateEventsRequest)", + ThriftModule: history.ThriftModule, + }, + thrift.Method{ Name: "RequestCancelWorkflowExecution", HandlerSpec: thrift.HandlerSpec{ @@ -352,7 +368,7 @@ func New(impl Interface, opts ...thrift.RegisterOption) []transport.Procedure { }, } - procedures := make([]transport.Procedure, 0, 19) + procedures := make([]transport.Procedure, 0, 20) procedures = append(procedures, thrift.BuildProcedures(service, opts...)...) return procedures } @@ -492,6 +508,25 @@ func (h handler) RemoveSignalMutableState(ctx context.Context, body wire.Value) return response, err } +func (h handler) ReplicateEvents(ctx context.Context, body wire.Value) (thrift.Response, error) { + var args history.HistoryService_ReplicateEvents_Args + if err := args.FromWire(body); err != nil { + return thrift.Response{}, err + } + + err := h.impl.ReplicateEvents(ctx, args.ReplicateRequest) + + hadError := err != nil + result, err := history.HistoryService_ReplicateEvents_Helper.WrapResponse(err) + + var response thrift.Response + if err == nil { + response.IsApplicationError = hadError + response.Body = result + } + return response, err +} + func (h handler) RequestCancelWorkflowExecution(ctx context.Context, body wire.Value) (thrift.Response, error) { var args history.HistoryService_RequestCancelWorkflowExecution_Args if err := args.FromWire(body); err != nil { diff --git a/.gen/go/history/historyservicetest/client.go b/.gen/go/history/historyservicetest/client.go index 33a617df7e4..aa5e3fdb012 100644 --- a/.gen/go/history/historyservicetest/client.go +++ b/.gen/go/history/historyservicetest/client.go @@ -290,6 +290,37 @@ func (mr *_MockClientRecorder) RemoveSignalMutableState( return mr.mock.ctrl.RecordCall(mr.mock, "RemoveSignalMutableState", args...) } +// ReplicateEvents responds to a ReplicateEvents call based on the mock expectations. This +// call will fail if the mock does not expect this call. Use EXPECT to expect +// a call to this function. +// +// client.EXPECT().ReplicateEvents(gomock.Any(), ...).Return(...) +// ... := client.ReplicateEvents(...) +func (m *MockClient) ReplicateEvents( + ctx context.Context, + _ReplicateRequest *history.ReplicateEventsRequest, + opts ...yarpc.CallOption, +) (err error) { + + args := []interface{}{ctx, _ReplicateRequest} + for _, o := range opts { + args = append(args, o) + } + i := 0 + ret := m.ctrl.Call(m, "ReplicateEvents", args...) + err, _ = ret[i].(error) + return +} + +func (mr *_MockClientRecorder) ReplicateEvents( + ctx interface{}, + _ReplicateRequest interface{}, + opts ...interface{}, +) *gomock.Call { + args := append([]interface{}{ctx, _ReplicateRequest}, opts...) + return mr.mock.ctrl.RecordCall(mr.mock, "ReplicateEvents", args...) +} + // RequestCancelWorkflowExecution responds to a RequestCancelWorkflowExecution call based on the mock expectations. This // call will fail if the mock does not expect this call. Use EXPECT to expect // a call to this function. diff --git a/.gen/go/history/idl.go b/.gen/go/history/idl.go index 59a11d29bde..d3d651f10c3 100644 --- a/.gen/go/history/idl.go +++ b/.gen/go/history/idl.go @@ -33,11 +33,11 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "history", Package: "github.com/uber/cadence/.gen/go/history", FilePath: "history.thrift", - SHA1: "84a8b6c65fa3e00c021cc6d8907c75316fb0f213", + SHA1: "3edd199cd73f4853e33e820bad408b185667dd94", Includes: []*thriftreflect.ThriftModule{ shared.ThriftModule, }, Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\ninclude \"shared.thrift\"\n\nnamespace java com.uber.cadence.history\n\nexception EventAlreadyStartedError {\n 1: required string message\n}\n\nexception ShardOwnershipLostError {\n 10: optional string message\n 20: optional string owner\n}\n\nstruct ParentExecutionInfo {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") initiatedId\n}\n\nstruct StartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.StartWorkflowExecutionRequest startRequest\n 30: optional ParentExecutionInfo parentExecutionInfo\n}\n\nstruct GetMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") expectedNextEventId\n}\n\nstruct GetMutableStateResponse {\n 10: optional shared.WorkflowExecution execution\n 20: optional shared.WorkflowType workflowType\n 30: optional i64 (js.type = \"Long\") NextEventId\n 40: optional i64 (js.type = \"Long\") LastFirstEventId\n 50: optional shared.TaskList taskList\n 60: optional shared.TaskList stickyTaskList\n 70: optional string clientLibraryVersion\n 80: optional string clientFeatureVersion\n 90: optional string clientImpl\n 100: optional bool isWorkflowRunning\n 110: optional i32 stickyTaskListScheduleToStartTimeout\n}\n\nstruct ResetStickyTaskListRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n}\n\nstruct ResetStickyTaskListResponse {\n // The reason to keep this response is to allow returning\n // information in the future.\n}\n\nstruct RespondDecisionTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskCompletedRequest completeRequest\n}\n\nstruct RespondDecisionTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskFailedRequest failedRequest\n}\n\nstruct RecordActivityTaskHeartbeatRequest {\n 10: optional string domainUUID\n 20: optional shared.RecordActivityTaskHeartbeatRequest heartbeatRequest\n}\n\nstruct RespondActivityTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCompletedRequest completeRequest\n}\n\nstruct RespondActivityTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskFailedRequest failedRequest\n}\n\nstruct RespondActivityTaskCanceledRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCanceledRequest cancelRequest\n}\n\nstruct RecordActivityTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForActivityTaskRequest pollRequest\n}\n\nstruct RecordActivityTaskStartedResponse {\n 10: optional shared.HistoryEvent startedEvent\n 20: optional shared.HistoryEvent scheduledEvent\n}\n\nstruct RecordDecisionTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForDecisionTaskRequest pollRequest\n}\n\nstruct RecordDecisionTaskStartedResponse {\n 10: optional shared.WorkflowType workflowType\n 20: optional i64 (js.type = \"Long\") previousStartedEventId\n 30: optional i64 (js.type = \"Long\") scheduledEventId\n 40: optional i64 (js.type = \"Long\") startedEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") attempt\n 70: optional bool stickyExecutionEnabled\n 80: optional shared.TransientDecisionInfo decisionInfo\n}\n\nstruct SignalWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWorkflowExecutionRequest signalRequest\n 30: optional shared.WorkflowExecution externalWorkflowExecution\n 40: optional bool childWorkflowOnly\n}\n\nstruct SignalWithStartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWithStartWorkflowExecutionRequest signalWithStartRequest\n}\n\nstruct RemoveSignalMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional string requestId\n}\n\nstruct TerminateWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.TerminateWorkflowExecutionRequest terminateRequest\n}\n\nstruct RequestCancelWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.RequestCancelWorkflowExecutionRequest cancelRequest\n 30: optional i64 (js.type = \"Long\") externalInitiatedEventId\n 40: optional shared.WorkflowExecution externalWorkflowExecution\n 50: optional bool childWorkflowOnly\n}\n\nstruct ScheduleDecisionTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n}\n\nstruct DescribeWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.DescribeWorkflowExecutionRequest request\n}\n\n/**\n* RecordChildExecutionCompletedRequest is used for reporting the completion of child execution to parent workflow\n* execution which started it. When a child execution is completed it creates this request and calls the\n* RecordChildExecutionCompleted API with the workflowExecution of parent. It also sets the completedExecution of the\n* child as it could potentially be different than the ChildExecutionStartedEvent of parent in the situation when\n* child creates multiple runs through ContinueAsNew before finally completing.\n**/\nstruct RecordChildExecutionCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") initiatedId\n 40: optional shared.WorkflowExecution completedExecution\n 50: optional shared.HistoryEvent completionEvent\n}\n\n/**\n* HistoryService provides API to start a new long running workflow instance, as well as query and update the history\n* of workflow instances already created.\n**/\nservice HistoryService {\n /**\n * StartWorkflowExecution starts a new long running workflow instance. It will create the instance with\n * 'WorkflowExecutionStarted' event in history and also schedule the first DecisionTask for the worker to make the\n * first decision for this instance. It will return 'WorkflowExecutionAlreadyStartedError', if an instance already\n * exists with same workflowId.\n **/\n shared.StartWorkflowExecutionResponse StartWorkflowExecution(1: StartWorkflowExecutionRequest startRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.WorkflowExecutionAlreadyStartedError sessionAlreadyExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * Returns the information from mutable state of workflow execution.\n * It fails with 'EntityNotExistError' if specified workflow execution in unknown to the service.\n **/\n GetMutableStateResponse GetMutableState(1: GetMutableStateRequest getRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * Reset the sticky tasklist related information in mutable state of a given workflow.\n * Things cleared are:\n * 1. StickyTaskList\n * 2. StickyScheduleToStartTimeout\n * 3. ClientLibraryVersion\n * 4. ClientFeatureVersion\n * 5. ClientImpl\n **/\n ResetStickyTaskListResponse ResetStickyTaskList(1: ResetStickyTaskListRequest resetRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RecordDecisionTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForDecisionTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordDecisionTaskStartedResponse RecordDecisionTaskStarted(1: RecordDecisionTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RecordActivityTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForActivityTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordActivityTaskStartedResponse RecordActivityTaskStarted(1: RecordActivityTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RespondDecisionTaskCompleted is called by application worker to complete a DecisionTask handed as a result of\n * 'PollForDecisionTask' API call. Completing a DecisionTask will result in new events for the workflow execution and\n * potentially new ActivityTask being created for corresponding decisions. It will also create a DecisionTaskCompleted\n * event in the history for that session. Use the 'taskToken' provided as response of PollForDecisionTask API call\n * for completing the DecisionTask.\n **/\n void RespondDecisionTaskCompleted(1: RespondDecisionTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RespondDecisionTaskFailed is called by application worker to indicate failure. This results in\n * DecisionTaskFailedEvent written to the history and a new DecisionTask created. This API can be used by client to\n * either clear sticky tasklist or report ny panics during DecisionTask processing.\n **/\n void RespondDecisionTaskFailed(1: RespondDecisionTaskFailedRequest failedRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RecordActivityTaskHeartbeat is called by application worker while it is processing an ActivityTask. If worker fails\n * to heartbeat within 'heartbeatTimeoutSeconds' interval for the ActivityTask, then it will be marked as timedout and\n * 'ActivityTaskTimedOut' event will be written to the workflow history. Calling 'RecordActivityTaskHeartbeat' will\n * fail with 'EntityNotExistsError' in such situations. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for heartbeating.\n **/\n shared.RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat(1: RecordActivityTaskHeartbeatRequest heartbeatRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RespondActivityTaskCompleted is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskCompleted' event being written to the workflow history and a new DecisionTask\n * created for the workflow so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCompleted(1: RespondActivityTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RespondActivityTaskFailed is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskFailed' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskFailed(1: RespondActivityTaskFailedRequest failRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RespondActivityTaskCanceled is called by application worker when it is successfully canceled an ActivityTask. It will\n * result in a new 'ActivityTaskCanceled' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCanceled(1: RespondActivityTaskCanceledRequest canceledRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in\n * WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.\n **/\n void SignalWorkflowExecution(1: SignalWorkflowExecutionRequest signalRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.\n * If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history\n * and a decision task being created for the execution.\n * If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled\n * event recorded in history, and a decision task being created for the execution\n **/\n shared.StartWorkflowExecutionResponse SignalWithStartWorkflowExecution(1: SignalWithStartWorkflowExecutionRequest signalWithStartRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RemoveSignalMutableState is used to remove a signal request ID that was previously recorded. This is currently\n * used to clean execution info when signal decision finished.\n **/\n void RemoveSignalMutableState(1: RemoveSignalMutableStateRequest removeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event\n * in the history and immediately terminating the execution instance.\n **/\n void TerminateWorkflowExecution(1: TerminateWorkflowExecutionRequest terminateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RequestCancelWorkflowExecution is called by application worker when it wants to request cancellation of a workflow instance.\n * It will result in a new 'WorkflowExecutionCancelRequested' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. It fails with 'EntityNotExistsError' if the workflow is not valid\n * anymore due to completion or doesn't exist.\n **/\n void RequestCancelWorkflowExecution(1: RequestCancelWorkflowExecutionRequest cancelRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.CancellationAlreadyRequestedError cancellationAlreadyRequestedError,\n )\n\n /**\n * ScheduleDecisionTask is used for creating a decision task for already started workflow execution. This is mainly\n * used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts\n * child execution without creating the decision task and then calls this API after updating the mutable state of\n * parent execution.\n **/\n void ScheduleDecisionTask(1: ScheduleDecisionTaskRequest scheduleRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent.\n * This is mainly called by transfer queue processor during the processing of DeleteExecution task.\n **/\n void RecordChildExecutionCompleted(1: RecordChildExecutionCompletedRequest completionRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * DescribeWorkflowExecution returns information about the specified workflow execution.\n **/\n shared.DescribeWorkflowExecutionResponse DescribeWorkflowExecution(1: DescribeWorkflowExecutionRequest describeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n}\n" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\ninclude \"shared.thrift\"\n\nnamespace java com.uber.cadence.history\n\nexception EventAlreadyStartedError {\n 1: required string message\n}\n\nexception ShardOwnershipLostError {\n 10: optional string message\n 20: optional string owner\n}\n\nstruct ParentExecutionInfo {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") initiatedId\n}\n\nstruct StartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.StartWorkflowExecutionRequest startRequest\n 30: optional ParentExecutionInfo parentExecutionInfo\n}\n\nstruct GetMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") expectedNextEventId\n}\n\nstruct GetMutableStateResponse {\n 10: optional shared.WorkflowExecution execution\n 20: optional shared.WorkflowType workflowType\n 30: optional i64 (js.type = \"Long\") NextEventId\n 40: optional i64 (js.type = \"Long\") LastFirstEventId\n 50: optional shared.TaskList taskList\n 60: optional shared.TaskList stickyTaskList\n 70: optional string clientLibraryVersion\n 80: optional string clientFeatureVersion\n 90: optional string clientImpl\n 100: optional bool isWorkflowRunning\n 110: optional i32 stickyTaskListScheduleToStartTimeout\n}\n\nstruct ResetStickyTaskListRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n}\n\nstruct ResetStickyTaskListResponse {\n // The reason to keep this response is to allow returning\n // information in the future.\n}\n\nstruct RespondDecisionTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskCompletedRequest completeRequest\n}\n\nstruct RespondDecisionTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskFailedRequest failedRequest\n}\n\nstruct RecordActivityTaskHeartbeatRequest {\n 10: optional string domainUUID\n 20: optional shared.RecordActivityTaskHeartbeatRequest heartbeatRequest\n}\n\nstruct RespondActivityTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCompletedRequest completeRequest\n}\n\nstruct RespondActivityTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskFailedRequest failedRequest\n}\n\nstruct RespondActivityTaskCanceledRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCanceledRequest cancelRequest\n}\n\nstruct RecordActivityTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForActivityTaskRequest pollRequest\n}\n\nstruct RecordActivityTaskStartedResponse {\n 10: optional shared.HistoryEvent startedEvent\n 20: optional shared.HistoryEvent scheduledEvent\n}\n\nstruct RecordDecisionTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForDecisionTaskRequest pollRequest\n}\n\nstruct RecordDecisionTaskStartedResponse {\n 10: optional shared.WorkflowType workflowType\n 20: optional i64 (js.type = \"Long\") previousStartedEventId\n 30: optional i64 (js.type = \"Long\") scheduledEventId\n 40: optional i64 (js.type = \"Long\") startedEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") attempt\n 70: optional bool stickyExecutionEnabled\n 80: optional shared.TransientDecisionInfo decisionInfo\n}\n\nstruct SignalWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWorkflowExecutionRequest signalRequest\n 30: optional shared.WorkflowExecution externalWorkflowExecution\n 40: optional bool childWorkflowOnly\n}\n\nstruct SignalWithStartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWithStartWorkflowExecutionRequest signalWithStartRequest\n}\n\nstruct RemoveSignalMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional string requestId\n}\n\nstruct TerminateWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.TerminateWorkflowExecutionRequest terminateRequest\n}\n\nstruct RequestCancelWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.RequestCancelWorkflowExecutionRequest cancelRequest\n 30: optional i64 (js.type = \"Long\") externalInitiatedEventId\n 40: optional shared.WorkflowExecution externalWorkflowExecution\n 50: optional bool childWorkflowOnly\n}\n\nstruct ScheduleDecisionTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n}\n\nstruct DescribeWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.DescribeWorkflowExecutionRequest request\n}\n\n/**\n* RecordChildExecutionCompletedRequest is used for reporting the completion of child execution to parent workflow\n* execution which started it. When a child execution is completed it creates this request and calls the\n* RecordChildExecutionCompleted API with the workflowExecution of parent. It also sets the completedExecution of the\n* child as it could potentially be different than the ChildExecutionStartedEvent of parent in the situation when\n* child creates multiple runs through ContinueAsNew before finally completing.\n**/\nstruct RecordChildExecutionCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") initiatedId\n 40: optional shared.WorkflowExecution completedExecution\n 50: optional shared.HistoryEvent completionEvent\n}\n\nstruct ReplicateEventsRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") firstEventId\n 40: optional i64 (js.type = \"Long\") nextEventId\n 50: optional i64 (js.type = \"Long\") version\n 60: optional shared.History history\n}\n\n/**\n* HistoryService provides API to start a new long running workflow instance, as well as query and update the history\n* of workflow instances already created.\n**/\nservice HistoryService {\n /**\n * StartWorkflowExecution starts a new long running workflow instance. It will create the instance with\n * 'WorkflowExecutionStarted' event in history and also schedule the first DecisionTask for the worker to make the\n * first decision for this instance. It will return 'WorkflowExecutionAlreadyStartedError', if an instance already\n * exists with same workflowId.\n **/\n shared.StartWorkflowExecutionResponse StartWorkflowExecution(1: StartWorkflowExecutionRequest startRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.WorkflowExecutionAlreadyStartedError sessionAlreadyExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * Returns the information from mutable state of workflow execution.\n * It fails with 'EntityNotExistError' if specified workflow execution in unknown to the service.\n **/\n GetMutableStateResponse GetMutableState(1: GetMutableStateRequest getRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * Reset the sticky tasklist related information in mutable state of a given workflow.\n * Things cleared are:\n * 1. StickyTaskList\n * 2. StickyScheduleToStartTimeout\n * 3. ClientLibraryVersion\n * 4. ClientFeatureVersion\n * 5. ClientImpl\n **/\n ResetStickyTaskListResponse ResetStickyTaskList(1: ResetStickyTaskListRequest resetRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RecordDecisionTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForDecisionTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordDecisionTaskStartedResponse RecordDecisionTaskStarted(1: RecordDecisionTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RecordActivityTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForActivityTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordActivityTaskStartedResponse RecordActivityTaskStarted(1: RecordActivityTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RespondDecisionTaskCompleted is called by application worker to complete a DecisionTask handed as a result of\n * 'PollForDecisionTask' API call. Completing a DecisionTask will result in new events for the workflow execution and\n * potentially new ActivityTask being created for corresponding decisions. It will also create a DecisionTaskCompleted\n * event in the history for that session. Use the 'taskToken' provided as response of PollForDecisionTask API call\n * for completing the DecisionTask.\n **/\n void RespondDecisionTaskCompleted(1: RespondDecisionTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RespondDecisionTaskFailed is called by application worker to indicate failure. This results in\n * DecisionTaskFailedEvent written to the history and a new DecisionTask created. This API can be used by client to\n * either clear sticky tasklist or report ny panics during DecisionTask processing.\n **/\n void RespondDecisionTaskFailed(1: RespondDecisionTaskFailedRequest failedRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RecordActivityTaskHeartbeat is called by application worker while it is processing an ActivityTask. If worker fails\n * to heartbeat within 'heartbeatTimeoutSeconds' interval for the ActivityTask, then it will be marked as timedout and\n * 'ActivityTaskTimedOut' event will be written to the workflow history. Calling 'RecordActivityTaskHeartbeat' will\n * fail with 'EntityNotExistsError' in such situations. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for heartbeating.\n **/\n shared.RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat(1: RecordActivityTaskHeartbeatRequest heartbeatRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RespondActivityTaskCompleted is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskCompleted' event being written to the workflow history and a new DecisionTask\n * created for the workflow so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCompleted(1: RespondActivityTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RespondActivityTaskFailed is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskFailed' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskFailed(1: RespondActivityTaskFailedRequest failRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RespondActivityTaskCanceled is called by application worker when it is successfully canceled an ActivityTask. It will\n * result in a new 'ActivityTaskCanceled' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCanceled(1: RespondActivityTaskCanceledRequest canceledRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in\n * WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.\n **/\n void SignalWorkflowExecution(1: SignalWorkflowExecutionRequest signalRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.\n * If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history\n * and a decision task being created for the execution.\n * If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled\n * event recorded in history, and a decision task being created for the execution\n **/\n shared.StartWorkflowExecutionResponse SignalWithStartWorkflowExecution(1: SignalWithStartWorkflowExecutionRequest signalWithStartRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RemoveSignalMutableState is used to remove a signal request ID that was previously recorded. This is currently\n * used to clean execution info when signal decision finished.\n **/\n void RemoveSignalMutableState(1: RemoveSignalMutableStateRequest removeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event\n * in the history and immediately terminating the execution instance.\n **/\n void TerminateWorkflowExecution(1: TerminateWorkflowExecutionRequest terminateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RequestCancelWorkflowExecution is called by application worker when it wants to request cancellation of a workflow instance.\n * It will result in a new 'WorkflowExecutionCancelRequested' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. It fails with 'EntityNotExistsError' if the workflow is not valid\n * anymore due to completion or doesn't exist.\n **/\n void RequestCancelWorkflowExecution(1: RequestCancelWorkflowExecutionRequest cancelRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.CancellationAlreadyRequestedError cancellationAlreadyRequestedError,\n )\n\n /**\n * ScheduleDecisionTask is used for creating a decision task for already started workflow execution. This is mainly\n * used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts\n * child execution without creating the decision task and then calls this API after updating the mutable state of\n * parent execution.\n **/\n void ScheduleDecisionTask(1: ScheduleDecisionTaskRequest scheduleRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent.\n * This is mainly called by transfer queue processor during the processing of DeleteExecution task.\n **/\n void RecordChildExecutionCompleted(1: RecordChildExecutionCompletedRequest completionRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n /**\n * DescribeWorkflowExecution returns information about the specified workflow execution.\n **/\n shared.DescribeWorkflowExecutionResponse DescribeWorkflowExecution(1: DescribeWorkflowExecutionRequest describeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n\n void ReplicateEvents(1: ReplicateEventsRequest replicateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n )\n}\n" diff --git a/.gen/go/history/types.go b/.gen/go/history/types.go index 6657e5cb26c..c1e4fdf2b46 100644 --- a/.gen/go/history/types.go +++ b/.gen/go/history/types.go @@ -2716,6 +2716,284 @@ func (v *RemoveSignalMutableStateRequest) GetRequestId() (o string) { return } +type ReplicateEventsRequest struct { + DomainUUID *string `json:"domainUUID,omitempty"` + WorkflowExecution *shared.WorkflowExecution `json:"workflowExecution,omitempty"` + FirstEventId *int64 `json:"firstEventId,omitempty"` + NextEventId *int64 `json:"nextEventId,omitempty"` + Version *int64 `json:"version,omitempty"` + History *shared.History `json:"history,omitempty"` +} + +// ToWire translates a ReplicateEventsRequest struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *ReplicateEventsRequest) ToWire() (wire.Value, error) { + var ( + fields [6]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.DomainUUID != nil { + w, err = wire.NewValueString(*(v.DomainUUID)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 10, Value: w} + i++ + } + if v.WorkflowExecution != nil { + w, err = v.WorkflowExecution.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 20, Value: w} + i++ + } + if v.FirstEventId != nil { + w, err = wire.NewValueI64(*(v.FirstEventId)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 30, Value: w} + i++ + } + if v.NextEventId != nil { + w, err = wire.NewValueI64(*(v.NextEventId)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 40, Value: w} + i++ + } + if v.Version != nil { + w, err = wire.NewValueI64(*(v.Version)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 50, Value: w} + i++ + } + if v.History != nil { + w, err = v.History.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 60, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +func _History_Read(w wire.Value) (*shared.History, error) { + var v shared.History + err := v.FromWire(w) + return &v, err +} + +// FromWire deserializes a ReplicateEventsRequest struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a ReplicateEventsRequest struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v ReplicateEventsRequest +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *ReplicateEventsRequest) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 10: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.DomainUUID = &x + if err != nil { + return err + } + + } + case 20: + if field.Value.Type() == wire.TStruct { + v.WorkflowExecution, err = _WorkflowExecution_Read(field.Value) + if err != nil { + return err + } + + } + case 30: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.FirstEventId = &x + if err != nil { + return err + } + + } + case 40: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.NextEventId = &x + if err != nil { + return err + } + + } + case 50: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.Version = &x + if err != nil { + return err + } + + } + case 60: + if field.Value.Type() == wire.TStruct { + v.History, err = _History_Read(field.Value) + if err != nil { + return err + } + + } + } + } + + return nil +} + +// String returns a readable string representation of a ReplicateEventsRequest +// struct. +func (v *ReplicateEventsRequest) String() string { + if v == nil { + return "" + } + + var fields [6]string + i := 0 + if v.DomainUUID != nil { + fields[i] = fmt.Sprintf("DomainUUID: %v", *(v.DomainUUID)) + i++ + } + if v.WorkflowExecution != nil { + fields[i] = fmt.Sprintf("WorkflowExecution: %v", v.WorkflowExecution) + i++ + } + if v.FirstEventId != nil { + fields[i] = fmt.Sprintf("FirstEventId: %v", *(v.FirstEventId)) + i++ + } + if v.NextEventId != nil { + fields[i] = fmt.Sprintf("NextEventId: %v", *(v.NextEventId)) + i++ + } + if v.Version != nil { + fields[i] = fmt.Sprintf("Version: %v", *(v.Version)) + i++ + } + if v.History != nil { + fields[i] = fmt.Sprintf("History: %v", v.History) + i++ + } + + return fmt.Sprintf("ReplicateEventsRequest{%v}", strings.Join(fields[:i], ", ")) +} + +// Equals returns true if all the fields of this ReplicateEventsRequest match the +// provided ReplicateEventsRequest. +// +// This function performs a deep comparison. +func (v *ReplicateEventsRequest) Equals(rhs *ReplicateEventsRequest) bool { + if !_String_EqualsPtr(v.DomainUUID, rhs.DomainUUID) { + return false + } + if !((v.WorkflowExecution == nil && rhs.WorkflowExecution == nil) || (v.WorkflowExecution != nil && rhs.WorkflowExecution != nil && v.WorkflowExecution.Equals(rhs.WorkflowExecution))) { + return false + } + if !_I64_EqualsPtr(v.FirstEventId, rhs.FirstEventId) { + return false + } + if !_I64_EqualsPtr(v.NextEventId, rhs.NextEventId) { + return false + } + if !_I64_EqualsPtr(v.Version, rhs.Version) { + return false + } + if !((v.History == nil && rhs.History == nil) || (v.History != nil && rhs.History != nil && v.History.Equals(rhs.History))) { + return false + } + + return true +} + +// GetDomainUUID returns the value of DomainUUID if it is set or its +// zero value if it is unset. +func (v *ReplicateEventsRequest) GetDomainUUID() (o string) { + if v.DomainUUID != nil { + return *v.DomainUUID + } + + return +} + +// GetFirstEventId returns the value of FirstEventId if it is set or its +// zero value if it is unset. +func (v *ReplicateEventsRequest) GetFirstEventId() (o int64) { + if v.FirstEventId != nil { + return *v.FirstEventId + } + + return +} + +// GetNextEventId returns the value of NextEventId if it is set or its +// zero value if it is unset. +func (v *ReplicateEventsRequest) GetNextEventId() (o int64) { + if v.NextEventId != nil { + return *v.NextEventId + } + + return +} + +// GetVersion returns the value of Version if it is set or its +// zero value if it is unset. +func (v *ReplicateEventsRequest) GetVersion() (o int64) { + if v.Version != nil { + return *v.Version + } + + return +} + type RequestCancelWorkflowExecutionRequest struct { DomainUUID *string `json:"domainUUID,omitempty"` CancelRequest *shared.RequestCancelWorkflowExecutionRequest `json:"cancelRequest,omitempty"` diff --git a/client/history/client.go b/client/history/client.go index d7731925500..0fae7788cb7 100644 --- a/client/history/client.go +++ b/client/history/client.go @@ -477,6 +477,24 @@ func (c *clientImpl) RecordChildExecutionCompleted( return err } +func (c *clientImpl) ReplicateEvents( + ctx context.Context, + request *h.ReplicateEventsRequest, + opts ...yarpc.CallOption) error { + client, err := c.getHostForRequest(request.WorkflowExecution.GetWorkflowId()) + if err != nil { + return err + } + opts = common.AggregateYarpcOptions(ctx, opts...) + op := func(ctx context.Context, client historyserviceclient.Interface) error { + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.ReplicateEvents(ctx, request, opts...) + } + err = c.executeWithRedirect(ctx, client, op) + return err +} + func (c *clientImpl) getHostForRequest(workflowID string) (historyserviceclient.Interface, error) { key := common.WorkflowIDToHistoryShard(workflowID, c.numberOfShards) host, err := c.resolver.Lookup(string(key)) diff --git a/client/history/metricClient.go b/client/history/metricClient.go index 4344ddc7622..2e2d88fa0c7 100644 --- a/client/history/metricClient.go +++ b/client/history/metricClient.go @@ -366,3 +366,20 @@ func (c *metricClient) RecordChildExecutionCompleted( return err } + +func (c *metricClient) ReplicateEvents( + context context.Context, + request *h.ReplicateEventsRequest, + opts ...yarpc.CallOption) error { + c.metricsClient.IncCounter(metrics.HistoryClientReplicateEventsScope, metrics.CadenceRequests) + + sw := c.metricsClient.StartTimer(metrics.HistoryClientReplicateEventsScope, metrics.CadenceLatency) + err := c.client.ReplicateEvents(context, request, opts...) + sw.Stop() + + if err != nil { + c.metricsClient.IncCounter(metrics.HistoryClientReplicateEventsScope, metrics.HistoryClientFailures) + } + + return err +} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index da766ceff2a..acbd4b58efe 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -224,6 +224,8 @@ const ( HistoryClientScheduleDecisionTaskScope // HistoryClientRecordChildExecutionCompletedScope tracks RPC calls to history service HistoryClientRecordChildExecutionCompletedScope + // HistoryClientReplicateEventsScope tracks RPC calls to history service + HistoryClientReplicateEventsScope // MatchingClientPollForDecisionTaskScope tracks RPC calls to matching service MatchingClientPollForDecisionTaskScope // MatchingClientPollForActivityTaskScope tracks RPC calls to matching service @@ -346,6 +348,8 @@ const ( HistoryRecordChildExecutionCompletedScope // HistoryRequestCancelWorkflowExecutionScope tracks RequestCancelWorkflowExecution API calls received by service HistoryRequestCancelWorkflowExecutionScope + // HistoryReplicateEventsScope tracks ReplicateEvents API calls received by service + HistoryReplicateEventsScope // HistoryShardControllerScope is the scope used by shard controller HistoryShardControllerScope // TransferQueueProcessorScope is the scope used by all metric emitted by transfer queue processor @@ -477,6 +481,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ HistoryClientTerminateWorkflowExecutionScope: {operation: "HistoryClientTerminateWorkflowExecution"}, HistoryClientScheduleDecisionTaskScope: {operation: "HistoryClientScheduleDecisionTask"}, HistoryClientRecordChildExecutionCompletedScope: {operation: "HistoryClientRecordChildExecutionCompleted"}, + HistoryClientReplicateEventsScope: {operation: "HistoryClientReplicateEvents"}, MatchingClientPollForDecisionTaskScope: {operation: "MatchingClientPollForDecisionTask"}, MatchingClientPollForActivityTaskScope: {operation: "MatchingClientPollForActivityTask"}, MatchingClientAddActivityTaskScope: {operation: "MatchingClientAddActivityTask"}, @@ -538,6 +543,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ HistoryScheduleDecisionTaskScope: {operation: "ScheduleDecisionTask"}, HistoryRecordChildExecutionCompletedScope: {operation: "RecordChildExecutionCompleted"}, HistoryRequestCancelWorkflowExecutionScope: {operation: "RequestCancelWorkflowExecution"}, + HistoryReplicateEventsScope: {operation: "ReplicateEvents"}, HistoryShardControllerScope: {operation: "ShardController"}, TransferQueueProcessorScope: {operation: "TransferQueueProcessor"}, TransferTaskActivityScope: {operation: "TransferTaskActivity"}, diff --git a/common/mocks/HistoryClient.go b/common/mocks/HistoryClient.go index e2a16e93b94..ac123ff8adf 100644 --- a/common/mocks/HistoryClient.go +++ b/common/mocks/HistoryClient.go @@ -370,3 +370,17 @@ func (_m *HistoryClient) RecordChildExecutionCompleted(ctx context.Context, requ return r0 } + +// RecordChildExecutionCompleted provides a mock function with given fields: ctx, request +func (_m *HistoryClient) ReplicateEvents(ctx context.Context, request *history.ReplicateEventsRequest, opts ...yarpc.CallOption) error { + ret := _m.Called(ctx, request) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *history.ReplicateEventsRequest) error); ok { + r0 = rf(ctx, request) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/idl/github.com/uber/cadence/history.thrift b/idl/github.com/uber/cadence/history.thrift index 35e13f68697..9f68c385213 100644 --- a/idl/github.com/uber/cadence/history.thrift +++ b/idl/github.com/uber/cadence/history.thrift @@ -193,6 +193,15 @@ struct RecordChildExecutionCompletedRequest { 50: optional shared.HistoryEvent completionEvent } +struct ReplicateEventsRequest { + 10: optional string domainUUID + 20: optional shared.WorkflowExecution workflowExecution + 30: optional i64 (js.type = "Long") firstEventId + 40: optional i64 (js.type = "Long") nextEventId + 50: optional i64 (js.type = "Long") version + 60: optional shared.History history +} + /** * HistoryService provides API to start a new long running workflow instance, as well as query and update the history * of workflow instances already created. @@ -458,4 +467,12 @@ service HistoryService { 3: shared.EntityNotExistsError entityNotExistError, 4: ShardOwnershipLostError shardOwnershipLostError, ) + + void ReplicateEvents(1: ReplicateEventsRequest replicateRequest) + throws ( + 1: shared.BadRequestError badRequestError, + 2: shared.InternalServiceError internalServiceError, + 3: shared.EntityNotExistsError entityNotExistError, + 4: ShardOwnershipLostError shardOwnershipLostError, + ) } diff --git a/service/history/MockHistoryEngine.go b/service/history/MockHistoryEngine.go index 8f70c53ae0c..fa18c6acc9b 100644 --- a/service/history/MockHistoryEngine.go +++ b/service/history/MockHistoryEngine.go @@ -382,4 +382,18 @@ func (_m *MockHistoryEngine) RecordChildExecutionCompleted(request *gohistory.Re return r0 } +// ReplicateEvents is mock implementation for ReplicateEvents of HistoryEngine +func (_m *MockHistoryEngine) ReplicateEvents(request *gohistory.ReplicateEventsRequest) error { + ret := _m.Called(request) + + var r0 error + if rf, ok := ret.Get(0).(func(*gohistory.ReplicateEventsRequest) error); ok { + r0 = rf(request) + } else { + r0 = ret.Error(0) + } + + return r0 +} + var _ Engine = (*MockHistoryEngine)(nil) diff --git a/service/history/handler.go b/service/history/handler.go index b61adb6f56f..46a8d146286 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -802,6 +802,34 @@ func (h *Handler) ResetStickyTaskList(ctx context.Context, resetRequest *hist.Re return resp, nil } +// ReplicateEvents is called by processor to replicate history events for passive domains +func (h *Handler) ReplicateEvents(ctx context.Context, replicateRequest *hist.ReplicateEventsRequest) error { + h.startWG.Wait() + + h.metricsClient.IncCounter(metrics.HistoryReplicateEventsScope, metrics.CadenceRequests) + sw := h.metricsClient.StartTimer(metrics.HistoryReplicateEventsScope, metrics.CadenceLatency) + defer sw.Stop() + + if replicateRequest.DomainUUID == nil { + return errDomainNotSet + } + + workflowExecution := replicateRequest.WorkflowExecution + engine, err1 := h.controller.GetEngine(workflowExecution.GetWorkflowId()) + if err1 != nil { + h.updateErrorMetric(metrics.HistoryReplicateEventsScope, err1) + return err1 + } + + err2 := engine.ReplicateEvents(replicateRequest) + if err2 != nil { + h.updateErrorMetric(metrics.HistoryReplicateEventsScope, h.convertError(err2)) + return h.convertError(err2) + } + + return nil +} + // convertError is a helper method to convert ShardOwnershipLostError from persistence layer returned by various // HistoryEngine API calls to ShardOwnershipLost error return by HistoryService for client to be redirected to the // correct shard. diff --git a/service/history/historyBuilder.go b/service/history/historyBuilder.go index c976ff8a3eb..6fd002e4ec3 100644 --- a/service/history/historyBuilder.go +++ b/service/history/historyBuilder.go @@ -53,6 +53,14 @@ func newHistoryBuilder(msBuilder *mutableStateBuilder, logger bark.Logger) *hist } } +func newHistoryBuilderFromEvents(history []*workflow.HistoryEvent, logger bark.Logger) *historyBuilder { + return &historyBuilder{ + serializer: persistence.NewJSONHistorySerializer(), + history: history, + logger: logger.WithField(logging.TagWorkflowComponent, logging.TagValueHistoryBuilderComponent), + } +} + func (b *historyBuilder) Serialize() (*persistence.SerializedHistoryEventBatch, error) { eventBatch := persistence.NewHistoryEventBatch(persistence.GetDefaultHistoryVersion(), b.history) history, err := b.serializer.Serialize(eventBatch) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index a052c81e04a..cd50ec2db54 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -55,6 +55,7 @@ type ( executionManager persistence.ExecutionManager txProcessor transferQueueProcessor timerProcessor timerQueueProcessor + replicator *historyReplicator replicatorProcessor queueProcessor historyEventNotifier historyEventNotifier tokenSerializer common.TaskTokenSerializer @@ -142,6 +143,8 @@ func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.Vis historySerializerFactory) historyEngImpl.replicatorProcessor = replicatorProcessor shardWrapper.replcatorProcessor = replicatorProcessor + historyEngImpl.replicator = newHistoryReplicator(shard, historyCache, shard.GetDomainCache(), historyManager, + logger) } return historyEngImpl @@ -1942,6 +1945,10 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(completionRequest *h.R }) } +func (e *historyEngineImpl) ReplicateEvents(replicateRequest *h.ReplicateEventsRequest) error { + return e.replicator.ApplyEvents(replicateRequest) +} + func (e *historyEngineImpl) updateWorkflowExecution(domainID string, execution workflow.WorkflowExecution, createDeletionTask, createDecisionTask bool, action func(builder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error)) error { diff --git a/service/history/historyEngineInterfaces.go b/service/history/historyEngineInterfaces.go index e96775becba..0b6d73e0989 100644 --- a/service/history/historyEngineInterfaces.go +++ b/service/history/historyEngineInterfaces.go @@ -70,6 +70,7 @@ type ( TerminateWorkflowExecution(request *h.TerminateWorkflowExecutionRequest) error ScheduleDecisionTask(request *h.ScheduleDecisionTaskRequest) error RecordChildExecutionCompleted(request *h.RecordChildExecutionCompletedRequest) error + ReplicateEvents(request *h.ReplicateEventsRequest) error } // EngineFactory is used to create an instance of sharded history engine diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go new file mode 100644 index 00000000000..bc2793d0911 --- /dev/null +++ b/service/history/historyReplicator.go @@ -0,0 +1,291 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "fmt" + "github.com/pborman/uuid" + "github.com/uber-common/bark" + h "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/logging" + "github.com/uber/cadence/common/persistence" +) + +type ( + historyReplicator struct { + shard ShardContext + historyCache *historyCache + domainCache cache.DomainCache + historyMgr persistence.HistoryManager + historySerializer persistence.HistorySerializer + logger bark.Logger + } +) + +func newHistoryReplicator(shard ShardContext, historyCache *historyCache, domainCache cache.DomainCache, + historyMgr persistence.HistoryManager, logger bark.Logger) *historyReplicator { + replicator := &historyReplicator{ + shard: shard, + historyCache: historyCache, + domainCache: domainCache, + historyMgr: historyMgr, + historySerializer: persistence.NewJSONHistorySerializer(), + logger: logger, + } + + return replicator +} + +func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) error { + if request == nil || request.History == nil || len(request.History.Events) == 0 { + return nil + } + + domainID, err := getDomainUUID(request.DomainUUID) + if err != nil { + return err + } + execution := *request.WorkflowExecution + + var context *workflowExecutionContext + var msBuilder *mutableStateBuilder + firstEvent := request.History.Events[0] + switch firstEvent.GetEventType() { + case shared.EventTypeWorkflowExecutionStarted: + msBuilder = newMutableStateBuilder(r.shard.GetConfig(), r.logger) + + default: + var release releaseWorkflowExecutionFunc + context, release, err = r.historyCache.getOrCreateWorkflowExecution(domainID, execution) + if err != nil { + return err + } + defer release() + + msBuilder, err = context.loadWorkflowExecution() + if err != nil { + return err + } + } + + var lastEvent *shared.HistoryEvent + decisionScheduleID := emptyEventID + decisionStartID := emptyEventID + decisionTimeout := int32(0) + var requestID string + // TODO: Add handling for following events: + // WorkflowExecutionFailed, + // WorkflowExecutionTimedOut, + // ActivityTaskFailed, + // ActivityTaskTimedOut, + // ActivityTaskCancelRequested, + // RequestCancelActivityTaskFailed, + // ActivityTaskCanceled, + // TimerStarted, + // TimerFired, + // CancelTimerFailed, + // TimerCanceled, + // WorkflowExecutionCancelRequested, + // WorkflowExecutionCanceled, + // RequestCancelExternalWorkflowExecutionInitiated, + // RequestCancelExternalWorkflowExecutionFailed, + // ExternalWorkflowExecutionCancelRequested, + // MarkerRecorded, + // WorkflowExecutionSignaled, + // WorkflowExecutionTerminated, + // WorkflowExecutionContinuedAsNew, + // StartChildWorkflowExecutionInitiated, + // StartChildWorkflowExecutionFailed, + // ChildWorkflowExecutionStarted, + // ChildWorkflowExecutionCompleted, + // ChildWorkflowExecutionFailed, + // ChildWorkflowExecutionCanceled, + // ChildWorkflowExecutionTimedOut, + // ChildWorkflowExecutionTerminated, + // SignalExternalWorkflowExecutionInitiated, + // SignalExternalWorkflowExecutionFailed, + // ExternalWorkflowExecutionSignaled, + for _, event := range request.History.Events { + lastEvent = event + switch event.GetEventType() { + case shared.EventTypeWorkflowExecutionStarted: + attributes := event.WorkflowExecutionStartedEventAttributes + requestID = uuid.New() + msBuilder.ReplicateWorkflowExecutionStartedEvent(domainID, execution, requestID, attributes) + + case shared.EventTypeDecisionTaskScheduled: + attributes := event.DecisionTaskScheduledEventAttributes + di := msBuilder.ReplicateDecisionTaskScheduledEvent(event.GetEventId(), attributes.TaskList.GetName(), + attributes.GetStartToCloseTimeoutSeconds()) + + decisionScheduleID = di.ScheduleID + decisionStartID = di.StartedID + decisionTimeout = di.DecisionTimeout + + case shared.EventTypeDecisionTaskStarted: + attributes := event.DecisionTaskStartedEventAttributes + di := msBuilder.ReplicateDecisionTaskStartedEvent(nil, attributes.GetScheduledEventId(), event.GetEventId(), + attributes.GetRequestId(), event.GetTimestamp()) + + decisionScheduleID = di.ScheduleID + decisionStartID = di.StartedID + decisionTimeout = di.DecisionTimeout + + case shared.EventTypeDecisionTaskCompleted: + attributes := event.DecisionTaskCompletedEventAttributes + msBuilder.ReplicateDecisionTaskCompletedEvent(attributes.GetScheduledEventId(), + attributes.GetStartedEventId()) + + case shared.EventTypeDecisionTaskTimedOut: + attributes := event.DecisionTaskTimedOutEventAttributes + msBuilder.ReplicateDecisionTaskTimedOutEvent(attributes.GetScheduledEventId(), + attributes.GetStartedEventId()) + + case shared.EventTypeDecisionTaskFailed: + attributes := event.DecisionTaskFailedEventAttributes + msBuilder.ReplicateDecisionTaskFailedEvent(attributes.GetScheduledEventId(), + attributes.GetStartedEventId()) + + case shared.EventTypeActivityTaskScheduled: + msBuilder.ReplicateActivityTaskScheduledEvent(event) + + case shared.EventTypeActivityTaskStarted: + msBuilder.ReplicateActivityTaskStartedEvent(event) + + case shared.EventTypeActivityTaskCompleted: + if err := msBuilder.ReplicateActivityTaskCompletedEvent(event); err != nil { + return err + } + + case shared.EventTypeWorkflowExecutionCompleted: + msBuilder.ReplicateWorkflowExecutionCompletedEvent(event) + } + } + + switch firstEvent.GetEventType() { + case shared.EventTypeWorkflowExecutionStarted: + // TODO: Support for child execution + var parentExecution *shared.WorkflowExecution + initiatedID := emptyEventID + parentDomainID := "" + + // Serialize the history + serializedHistory, serializedError := r.Serialize(request.History) + if serializedError != nil { + logging.LogHistorySerializationErrorEvent(r.logger, serializedError, fmt.Sprintf( + "HistoryEventBatch serialization error on start workflow. WorkflowID: %v, RunID: %v", + execution.GetWorkflowId(), execution.GetRunId())) + return serializedError + } + + // Generate a transaction ID for appending events to history + transactionID, err2 := r.shard.GetNextTransferTaskID() + if err2 != nil { + return err2 + } + + err = r.shard.AppendHistoryEvents(&persistence.AppendHistoryEventsRequest{ + DomainID: domainID, + Execution: execution, + TransactionID: transactionID, + FirstEventID: firstEvent.GetEventId(), + Events: serializedHistory, + }) + if err != nil { + return err + } + + nextEventID := lastEvent.GetEventId() + 1 + msBuilder.executionInfo.NextEventID = nextEventID + msBuilder.executionInfo.LastFirstEventID = firstEvent.GetEventId() + + failoverVersion := request.GetVersion() + replicationState := &persistence.ReplicationState{ + CurrentVersion: failoverVersion, + StartVersion: failoverVersion, + LastWriteVersion: failoverVersion, + LastWriteEventID: lastEvent.GetEventId(), + } + + createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { + _, err = r.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ + RequestID: requestID, + DomainID: domainID, + Execution: execution, + ParentDomainID: parentDomainID, + ParentExecution: parentExecution, + InitiatedID: initiatedID, + TaskList: msBuilder.executionInfo.TaskList, + WorkflowTypeName: msBuilder.executionInfo.WorkflowTypeName, + WorkflowTimeout: msBuilder.executionInfo.WorkflowTimeout, + DecisionTimeoutValue: msBuilder.executionInfo.DecisionTimeoutValue, + ExecutionContext: nil, + NextEventID: msBuilder.GetNextEventID(), + LastProcessedEvent: emptyEventID, + TransferTasks: nil, // TODO: Generate transfer task + DecisionScheduleID: decisionScheduleID, + DecisionStartedID: decisionStartID, + DecisionStartToCloseTimeout: decisionTimeout, + TimerTasks: nil, // TODO: Generate workflow timeout task + ContinueAsNew: !isBrandNew, + PreviousRunID: prevRunID, + ReplicationState: replicationState, + }) + + if err != nil { + return "", err + } + return execution.GetRunId(), nil + } + + // try to create the workflow execution + isBrandNew := true + _, err = createWorkflow(isBrandNew, "") + // if err still non nil, see if retry + /*if errExist, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { + if err = workflowExistsErrHandler(errExist); err == nil { + isBrandNew = false + _, err = createWorkflow(isBrandNew, errExist.RunID) + } + }*/ + + default: + // Generate a transaction ID for appending events to history + transactionID, err2 := r.shard.GetNextTransferTaskID() + if err2 != nil { + return err2 + } + err = context.replicateWorkflowExecution(request, lastEvent.GetEventId(), transactionID) + } + + return err +} + +func (r *historyReplicator) Serialize(history *shared.History) (*persistence.SerializedHistoryEventBatch, error) { + eventBatch := persistence.NewHistoryEventBatch(persistence.GetDefaultHistoryVersion(), history.Events) + h, err := r.historySerializer.Serialize(eventBatch) + if err != nil { + return nil, err + } + return h, nil +} diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index 8903f5ec064..706f8679fd9 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -96,7 +96,6 @@ type ( continueAsNew *persistence.CreateWorkflowExecutionRequest newBufferedEvents *persistence.SerializedHistoryEventBatch clearBufferedEvents bool - replicationTask persistence.Task } // TODO: This should be part of persistence layer @@ -230,14 +229,14 @@ func (e *mutableStateBuilder) FlushBufferedEvents() error { return nil } -func (e *mutableStateBuilder) ApplyReplicationStateUpdates(failoverVersion int64) { +func (e *mutableStateBuilder) ApplyReplicationStateUpdates(failoverVersion, lastEventID int64) { e.replicationState.CurrentVersion = failoverVersion e.replicationState.LastWriteVersion = failoverVersion // TODO: Rename this to NextEventID to stay consistent naming convention with rest of code base - e.replicationState.LastWriteEventID = e.GetNextEventID() - 1 + e.replicationState.LastWriteEventID = lastEventID } -func (e *mutableStateBuilder) CloseUpdateSession(createReplicationTask bool) (*mutableStateSessionUpdates, error) { +func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates, error) { if err := e.FlushBufferedEvents(); err != nil { return nil, err } @@ -261,10 +260,6 @@ func (e *mutableStateBuilder) CloseUpdateSession(createReplicationTask bool) (*m clearBufferedEvents: e.clearBufferedEvents, } - if createReplicationTask { - updates.replicationTask = e.createReplicationTask() - } - // Clear all updates to prepare for the next session e.hBuilder = newHistoryBuilder(e, e.logger) e.updateActivityInfos = []*persistence.ActivityInfo{} @@ -896,38 +891,46 @@ func (e *mutableStateBuilder) AddWorkflowExecutionStartedEvent(domainID string, return nil } + event := e.hBuilder.AddWorkflowExecutionStartedEvent(request) + e.ReplicateWorkflowExecutionStartedEvent(domainID, execution, request.GetRequestId(), + event.WorkflowExecutionStartedEventAttributes) + + return event +} + +func (e *mutableStateBuilder) ReplicateWorkflowExecutionStartedEvent(domainID string, + execution workflow.WorkflowExecution, requestID string, event *workflow.WorkflowExecutionStartedEventAttributes) { e.executionInfo.DomainID = domainID - e.executionInfo.WorkflowID = *execution.WorkflowId - e.executionInfo.RunID = *execution.RunId - e.executionInfo.TaskList = *request.TaskList.Name - e.executionInfo.WorkflowTypeName = *request.WorkflowType.Name - e.executionInfo.WorkflowTimeout = *request.ExecutionStartToCloseTimeoutSeconds - e.executionInfo.DecisionTimeoutValue = *request.TaskStartToCloseTimeoutSeconds + e.executionInfo.WorkflowID = execution.GetWorkflowId() + e.executionInfo.RunID = execution.GetRunId() + e.executionInfo.TaskList = event.TaskList.GetName() + e.executionInfo.WorkflowTypeName = event.WorkflowType.GetName() + e.executionInfo.WorkflowTimeout = event.GetExecutionStartToCloseTimeoutSeconds() + e.executionInfo.DecisionTimeoutValue = event.GetTaskStartToCloseTimeoutSeconds() e.executionInfo.State = persistence.WorkflowStateCreated e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusNone e.executionInfo.LastProcessedEvent = emptyEventID - e.executionInfo.CreateRequestID = common.StringDefault(request.RequestId) + e.executionInfo.CreateRequestID = requestID e.executionInfo.DecisionScheduleID = emptyEventID e.executionInfo.DecisionStartedID = emptyEventID e.executionInfo.DecisionRequestID = emptyUUID e.executionInfo.DecisionTimeout = 0 - - return e.hBuilder.AddWorkflowExecutionStartedEvent(request) } func (e *mutableStateBuilder) AddDecisionTaskScheduledEvent() *decisionInfo { + if e.HasPendingDecisionTask() { + logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionDecisionTaskScheduled, e.GetNextEventID(), + fmt.Sprintf("{Pending Decision ScheduleID: %v}", e.executionInfo.DecisionScheduleID)) + return nil + } + // Tasklist and decision timeout should already be set from workflow execution started event taskList := e.executionInfo.TaskList if e.isStickyTaskListEnabled() { taskList = e.executionInfo.StickyTaskList } startToCloseTimeoutSeconds := e.executionInfo.DecisionTimeoutValue - if e.HasPendingDecisionTask() { - logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionDecisionTaskScheduled, e.GetNextEventID(), - fmt.Sprintf("{Pending Decision ScheduleID: %v}", e.executionInfo.DecisionScheduleID)) - return nil - } // Flush any buffered events before creating the decision, otherwise it will result in invalid IDs for transient // decision and will cause in timeout processing to not work for transient decisions @@ -944,6 +947,11 @@ func (e *mutableStateBuilder) AddDecisionTaskScheduledEvent() *decisionInfo { scheduleID = newDecisionEvent.GetEventId() } + return e.ReplicateDecisionTaskScheduledEvent(scheduleID, taskList, startToCloseTimeoutSeconds) +} + +func (e *mutableStateBuilder) ReplicateDecisionTaskScheduledEvent(scheduleID int64, taskList string, + startToCloseTimeoutSeconds int32) *decisionInfo { di := &decisionInfo{ ScheduleID: scheduleID, StartedID: emptyEventID, @@ -952,8 +960,8 @@ func (e *mutableStateBuilder) AddDecisionTaskScheduledEvent() *decisionInfo { Tasklist: taskList, Attempt: e.executionInfo.DecisionAttempt, } - e.UpdateDecision(di) + e.UpdateDecision(di) return di } @@ -988,6 +996,18 @@ func (e *mutableStateBuilder) AddDecisionTaskStartedEvent(scheduleEventID int64, timestamp = int64(0) } + di = e.ReplicateDecisionTaskStartedEvent(di, scheduleID, startedID, requestID, timestamp) + return event, di +} + +func (e *mutableStateBuilder) ReplicateDecisionTaskStartedEvent(di *decisionInfo, scheduleID, startedID int64, + requestID string, timestamp int64) *decisionInfo { + // Replicator calls it with a nil decision info, and it is safe to always lookup the decision in this case as it + // does not have to deal with transient decision case. + if di == nil { + di, _ = e.GetPendingDecision(scheduleID) + } + e.executionInfo.State = persistence.WorkflowStateRunning // Update mutable decision state di = &decisionInfo{ @@ -998,9 +1018,9 @@ func (e *mutableStateBuilder) AddDecisionTaskStartedEvent(scheduleEventID int64, Attempt: di.Attempt, Timestamp: timestamp, } - e.UpdateDecision(di) - return event, di + e.UpdateDecision(di) + return di } func (e *mutableStateBuilder) createTransientDecisionEvents(di *decisionInfo, identity string) (*workflow.HistoryEvent, @@ -1014,6 +1034,15 @@ func (e *mutableStateBuilder) createTransientDecisionEvents(di *decisionInfo, id return scheduledEvent, startedEvent } +func (e *mutableStateBuilder) BeforeAddDecisionTaskCompletedEvent() { + // Make sure to delete decision before adding events. Otherwise they are buffered rather than getting appended + e.DeleteDecision() +} + +func (e *mutableStateBuilder) AfterAddDecisionTaskCompletedEvent(startedID int64) { + e.executionInfo.LastProcessedEvent = startedID +} + func (e *mutableStateBuilder) AddDecisionTaskCompletedEvent(scheduleEventID, startedEventID int64, request *workflow.RespondDecisionTaskCompletedRequest) *workflow.HistoryEvent { hasPendingDecision := e.HasPendingDecisionTask() @@ -1025,9 +1054,7 @@ func (e *mutableStateBuilder) AddDecisionTaskCompletedEvent(scheduleEventID, sta return nil } - // Make sure to delete decision before adding events. Otherwise they are buffered rather than getting appended - e.DeleteDecision() - + e.BeforeAddDecisionTaskCompletedEvent() if di.Attempt > 0 { // Create corresponding DecisionTaskSchedule and DecisionTaskStarted events for decisions we have been retrying scheduledEvent := e.hBuilder.AddDecisionTaskScheduledEvent(e.executionInfo.TaskList, di.DecisionTimeout, di.Attempt) @@ -1038,10 +1065,15 @@ func (e *mutableStateBuilder) AddDecisionTaskCompletedEvent(scheduleEventID, sta // Now write the completed event event := e.hBuilder.AddDecisionTaskCompletedEvent(scheduleEventID, startedEventID, request) - e.executionInfo.LastProcessedEvent = startedEventID + e.AfterAddDecisionTaskCompletedEvent(startedEventID) return event } +func (e *mutableStateBuilder) ReplicateDecisionTaskCompletedEvent(scheduleEventID, startedEventID int64) { + e.BeforeAddDecisionTaskCompletedEvent() + e.AfterAddDecisionTaskCompletedEvent(startedEventID) +} + func (e *mutableStateBuilder) AddDecisionTaskTimedOutEvent(scheduleEventID int64, startedEventID int64) *workflow.HistoryEvent { hasPendingDecision := e.HasPendingDecisionTask() @@ -1059,10 +1091,14 @@ func (e *mutableStateBuilder) AddDecisionTaskTimedOutEvent(scheduleEventID int64 event = e.hBuilder.AddDecisionTaskTimedOutEvent(scheduleEventID, startedEventID, workflow.TimeoutTypeStartToClose) } - e.FailDecision() + e.ReplicateDecisionTaskTimedOutEvent(scheduleEventID, startedEventID) return event } +func (e *mutableStateBuilder) ReplicateDecisionTaskTimedOutEvent(scheduleID, startedID int64) { + e.FailDecision() +} + func (e *mutableStateBuilder) AddDecisionTaskScheduleToStartTimeoutEvent(scheduleEventID int64) *workflow.HistoryEvent { if e.executionInfo.DecisionScheduleID != scheduleEventID || e.executionInfo.DecisionStartedID > 0 { logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionDecisionTaskTimedOut, e.GetNextEventID(), @@ -1076,7 +1112,7 @@ func (e *mutableStateBuilder) AddDecisionTaskScheduleToStartTimeoutEvent(schedul event := e.hBuilder.AddDecisionTaskTimedOutEvent(scheduleEventID, 0, workflow.TimeoutTypeScheduleToStart) - e.DeleteDecision() + e.ReplicateDecisionTaskTimedOutEvent(scheduleEventID, emptyEventID) return event } @@ -1098,10 +1134,14 @@ func (e *mutableStateBuilder) AddDecisionTaskFailedEvent(scheduleEventID int64, event = e.hBuilder.AddDecisionTaskFailedEvent(scheduleEventID, startedEventID, cause, details, identity) } - e.FailDecision() + e.ReplicateDecisionTaskFailedEvent(scheduleEventID, startedEventID) return event } +func (e *mutableStateBuilder) ReplicateDecisionTaskFailedEvent(scheduleID, startedID int64) { + e.FailDecision() +} + func (e *mutableStateBuilder) AddActivityTaskScheduledEvent(decisionCompletedEventID int64, attributes *workflow.ScheduleActivityTaskDecisionAttributes) (*workflow.HistoryEvent, *persistence.ActivityInfo) { if ai, ok := e.GetActivityInfo(e.GetNextEventID()); ok { @@ -1116,36 +1156,43 @@ func (e *mutableStateBuilder) AddActivityTaskScheduledEvent(decisionCompletedEve event := e.hBuilder.AddActivityTaskScheduledEvent(decisionCompletedEventID, attributes) + ai := e.ReplicateActivityTaskScheduledEvent(event) + return event, ai +} + +func (e *mutableStateBuilder) ReplicateActivityTaskScheduledEvent( + event *workflow.HistoryEvent) *persistence.ActivityInfo { + attributes := event.ActivityTaskScheduledEventAttributes scheduleEvent, err := e.eventSerializer.Serialize(event) if err != nil { - return nil, nil + return nil } scheduleEventID := *event.EventId var scheduleToStartTimeout int32 - if attributes.ScheduleToStartTimeoutSeconds == nil || *attributes.ScheduleToStartTimeoutSeconds <= 0 { + if attributes.ScheduleToStartTimeoutSeconds == nil || attributes.GetScheduleToStartTimeoutSeconds() <= 0 { scheduleToStartTimeout = e.config.DefaultScheduleToStartActivityTimeoutInSecs } else { - scheduleToStartTimeout = *attributes.ScheduleToStartTimeoutSeconds + scheduleToStartTimeout = attributes.GetScheduleToStartTimeoutSeconds() } var scheduleToCloseTimeout int32 - if attributes.ScheduleToCloseTimeoutSeconds == nil || *attributes.ScheduleToCloseTimeoutSeconds <= 0 { + if attributes.ScheduleToCloseTimeoutSeconds == nil || attributes.GetScheduleToCloseTimeoutSeconds() <= 0 { scheduleToCloseTimeout = e.config.DefaultScheduleToCloseActivityTimeoutInSecs } else { - scheduleToCloseTimeout = *attributes.ScheduleToCloseTimeoutSeconds + scheduleToCloseTimeout = attributes.GetScheduleToCloseTimeoutSeconds() } var startToCloseTimeout int32 - if attributes.StartToCloseTimeoutSeconds == nil || *attributes.StartToCloseTimeoutSeconds <= 0 { + if attributes.StartToCloseTimeoutSeconds == nil || attributes.GetStartToCloseTimeoutSeconds() <= 0 { startToCloseTimeout = e.config.DefaultStartToCloseActivityTimeoutInSecs } else { - startToCloseTimeout = *attributes.StartToCloseTimeoutSeconds + startToCloseTimeout = attributes.GetStartToCloseTimeoutSeconds() } var heartbeatTimeout int32 if attributes.HeartbeatTimeoutSeconds != nil { - heartbeatTimeout = *attributes.HeartbeatTimeoutSeconds + heartbeatTimeout = attributes.GetHeartbeatTimeoutSeconds() } ai := &persistence.ActivityInfo{ @@ -1169,7 +1216,7 @@ func (e *mutableStateBuilder) AddActivityTaskScheduledEvent(decisionCompletedEve e.pendingActivityInfoByActivityID[ai.ActivityID] = scheduleEventID e.updateActivityInfos = append(e.updateActivityInfos, ai) - return event, ai + return ai } func (e *mutableStateBuilder) AddActivityTaskStartedEvent(ai *persistence.ActivityInfo, scheduleEventID int64, @@ -1182,14 +1229,21 @@ func (e *mutableStateBuilder) AddActivityTaskStartedEvent(ai *persistence.Activi event := e.hBuilder.AddActivityTaskStartedEvent(scheduleEventID, requestID, request) - ai.StartedID = *event.EventId - ai.RequestID = requestID - ai.StartedTime = time.Unix(0, *event.Timestamp) - e.updateActivityInfos = append(e.updateActivityInfos, ai) - + e.ReplicateActivityTaskStartedEvent(event) return event } +func (e *mutableStateBuilder) ReplicateActivityTaskStartedEvent(event *workflow.HistoryEvent) { + attributes := event.ActivityTaskStartedEventAttributes + scheduleID := attributes.GetScheduledEventId() + ai, _ := e.GetActivityInfo(scheduleID) + + ai.StartedID = event.GetEventId() + ai.RequestID = attributes.GetRequestId() + ai.StartedTime = time.Unix(0, event.GetTimestamp()) + e.updateActivityInfos = append(e.updateActivityInfos, ai) +} + func (e *mutableStateBuilder) AddActivityTaskCompletedEvent(scheduleEventID, startedEventID int64, request *workflow.RespondActivityTaskCompletedRequest) *workflow.HistoryEvent { if ai, ok := e.GetActivityInfo(scheduleEventID); !ok || ai.StartedID != startedEventID { @@ -1198,11 +1252,19 @@ func (e *mutableStateBuilder) AddActivityTaskCompletedEvent(scheduleEventID, sta return nil } - if err := e.DeleteActivity(scheduleEventID); err != nil { + event := e.hBuilder.AddActivityTaskCompletedEvent(scheduleEventID, startedEventID, request) + if err := e.ReplicateActivityTaskCompletedEvent(event); err != nil { return nil } - return e.hBuilder.AddActivityTaskCompletedEvent(scheduleEventID, startedEventID, request) + return event +} + +func (e *mutableStateBuilder) ReplicateActivityTaskCompletedEvent(event *workflow.HistoryEvent) error { + attributes := event.ActivityTaskCompletedEventAttributes + scheduleID := attributes.GetScheduledEventId() + + return e.DeleteActivity(scheduleID) } func (e *mutableStateBuilder) AddActivityTaskFailedEvent(scheduleEventID, startedEventID int64, @@ -1297,12 +1359,15 @@ func (e *mutableStateBuilder) AddCompletedWorkflowEvent(decisionCompletedEventID return nil } + event := e.hBuilder.AddCompletedWorkflowEvent(decisionCompletedEventID, attributes) + e.ReplicateWorkflowExecutionCompletedEvent(event) + return event +} + +func (e *mutableStateBuilder) ReplicateWorkflowExecutionCompletedEvent(event *workflow.HistoryEvent) { e.executionInfo.State = persistence.WorkflowStateCompleted e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusCompleted - event := e.hBuilder.AddCompletedWorkflowEvent(decisionCompletedEventID, attributes) e.writeCompletionEventToMutableState(event) - - return event } func (e *mutableStateBuilder) AddFailWorkflowEvent(decisionCompletedEventID int64, diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index d5cf4889940..a960cbafbc9 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -25,6 +25,7 @@ import ( "sync" "time" + h "github.com/uber/cadence/.gen/go/history" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" @@ -115,32 +116,57 @@ func (c *workflowExecutionContext) updateWorkflowExecutionWithDeleteTask(transfe return c.updateWorkflowExecution(transferTasks, timerTasks, transactionID) } -func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persistence.Task, - timerTasks []persistence.Task, transactionID int64) (errRet error) { +func (c *workflowExecutionContext) replicateWorkflowExecution(request *h.ReplicateEventsRequest, + lastEventID, transactionID int64) error { - defer func() { - if errRet != nil { - // Clear all cached state in case of error - c.clear() - } - }() + nextEventID := lastEventID + 1 + c.msBuilder.ApplyReplicationStateUpdates(request.GetVersion(), lastEventID) + c.msBuilder.executionInfo.NextEventID = nextEventID + + builder := newHistoryBuilderFromEvents(request.History.Events, c.logger) + return c.updateHelper(builder, nil, nil, false, transactionID) +} + +func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persistence.Task, + timerTasks []persistence.Task, transactionID int64) error { createReplicationTask := c.shard.GetService().GetClusterMetadata().IsGlobalDomainEnabled() if createReplicationTask { + // Support for global domains is enabled and we are performing an update for global domain domainEntry, err := c.shard.GetDomainCache().GetDomainByID(c.msBuilder.executionInfo.DomainID) if err != nil { return err } - c.msBuilder.ApplyReplicationStateUpdates(domainEntry.GetFailoverVersion()) + lastEventID := c.msBuilder.GetNextEventID() - 1 + c.msBuilder.ApplyReplicationStateUpdates(domainEntry.GetFailoverVersion(), lastEventID) } + + return c.updateHelper(nil, transferTasks, timerTasks, createReplicationTask, transactionID) +} + +func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transferTasks []persistence.Task, + timerTasks []persistence.Task, createReplicationTask bool, transactionID int64) (errRet error) { + + defer func() { + if errRet != nil { + // Clear all cached state in case of error + c.clear() + } + }() + // Take a snapshot of all updates we have accumulated for this execution - updates, err := c.msBuilder.CloseUpdateSession(createReplicationTask) + updates, err := c.msBuilder.CloseUpdateSession() if err != nil { return err } - builder := updates.newEventsBuilder + // Replicator passes in a custom builder as it already has the events + if builder == nil { + // If no builder is passed in then use the one as part of the updates + builder = updates.newEventsBuilder + } + if builder.history != nil && len(builder.history) > 0 { // Some operations only update the mutable state. For example RecordActivityTaskHeartbeat. firstEvent := builder.history[0] @@ -186,11 +212,11 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi } var replicationTasks []persistence.Task - if updates.replicationTask != nil { - // Support for global domains is enabled and we are performing an update for global domain + if createReplicationTask { // Let's create a replication task as part of this update - replicationTasks = append(replicationTasks, updates.replicationTask) + replicationTasks = append(replicationTasks, c.msBuilder.createReplicationTask()) } + if err1 := c.updateWorkflowExecutionWithRetry(&persistence.UpdateWorkflowExecutionRequest{ ExecutionInfo: c.msBuilder.executionInfo, ReplicationState: c.msBuilder.replicationState, diff --git a/service/worker/processor.go b/service/worker/processor.go index ae4e2767072..d77399bba93 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -29,9 +29,13 @@ import ( "encoding/json" + "context" "github.com/uber-common/bark" "github.com/uber-go/kafka-client/kafka" + h "github.com/uber/cadence/.gen/go/history" "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/client/history" "github.com/uber/cadence/common" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/messaging" @@ -39,7 +43,6 @@ import ( ) type ( - // DomainReplicator is the interface which can replicate the domain DomainReplicator interface { HandleReceivingTask(task *replicator.DomainTaskAttributes) error @@ -58,6 +61,7 @@ type ( logger bark.Logger metricsClient metrics.Client domainReplicator DomainReplicator + historyClient history.Client } ) @@ -69,7 +73,8 @@ var ( ) func newReplicationTaskProcessor(topic, consumer string, client messaging.Client, config *Config, - logger bark.Logger, metricsClient metrics.Client, domainReplicator DomainReplicator) *replicationTaskProcessor { + logger bark.Logger, metricsClient metrics.Client, domainReplicator DomainReplicator, + historyClient history.Client) *replicationTaskProcessor { return &replicationTaskProcessor{ topicName: topic, consumerName: consumer, @@ -83,6 +88,7 @@ func newReplicationTaskProcessor(topic, consumer string, client messaging.Client }), metricsClient: metricsClient, domainReplicator: domainReplicator, + historyClient: historyClient, } } @@ -178,7 +184,18 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { p.logger.Debugf("Recieved domain replication task %v.", task.DomainTaskAttributes) err = p.domainReplicator.HandleReceivingTask(task.DomainTaskAttributes) case replicator.ReplicationTaskTypeHistory: - p.logger.Debugf("Recieved history replication task %v.", task.HistoryTaskAttributes) + err = p.historyClient.ReplicateEvents(context.Background(), &h.ReplicateEventsRequest{ + DomainUUID: task.HistoryTaskAttributes.DomainId, + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: task.HistoryTaskAttributes.WorkflowId, + RunId: task.HistoryTaskAttributes.RunId, + }, + FirstEventId: task.HistoryTaskAttributes.FirstEventId, + NextEventId: task.HistoryTaskAttributes.NextEventId, + Version: task.HistoryTaskAttributes.Version, + History: task.HistoryTaskAttributes.History, + }) + default: err = ErrUnknownReplicationTask } @@ -188,9 +205,11 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { if err != nil { p.logger.WithField(logging.TagErr, err).Error("Error processing replication task.") p.metricsClient.IncCounter(metrics.ReplicatorScope, metrics.ReplicatorFailures) + msg.Nack() + } else { + msg.Ack() } sw.Stop() - msg.Ack() case <-p.consumer.Closed(): p.logger.Info("Consumer closed. Processor shutting down.") return diff --git a/service/worker/replicator.go b/service/worker/replicator.go index a21190a29e1..992d8f5b30a 100644 --- a/service/worker/replicator.go +++ b/service/worker/replicator.go @@ -24,6 +24,7 @@ import ( "fmt" "github.com/uber-common/bark" + "github.com/uber/cadence/client/history" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/messaging" @@ -36,6 +37,7 @@ type ( Replicator struct { clusterMetadata cluster.Metadata domainReplicator DomainReplicator + historyClient history.Client config *Config client messaging.Client processors []*replicationTaskProcessor @@ -46,13 +48,15 @@ type ( // NewReplicator creates a new replicator for processing replication tasks func NewReplicator(clusterMetadata cluster.Metadata, metadataManager persistence.MetadataManager, - config *Config, client messaging.Client, logger bark.Logger, metricsClient metrics.Client) *Replicator { + historyClient history.Client, config *Config, client messaging.Client, logger bark.Logger, + metricsClient metrics.Client) *Replicator { logger = logger.WithFields(bark.Fields{ logging.TagWorkflowComponent: logging.TagValueReplicatorComponent, }) return &Replicator{ clusterMetadata: clusterMetadata, domainReplicator: NewDomainReplicator(metadataManager, logger), + historyClient: historyClient, config: config, client: client, logger: logger, @@ -68,7 +72,7 @@ func (r *Replicator) Start() error { topicName := getTopicName(cluster) consumerName := getConsumerName(currentClusterName, cluster) r.processors = append(r.processors, newReplicationTaskProcessor(topicName, consumerName, r.client, r.config, - r.logger, r.metricsClient, r.domainReplicator)) + r.logger, r.metricsClient, r.domainReplicator, r.historyClient)) } } diff --git a/service/worker/service.go b/service/worker/service.go index 4d875e32894..23d800581c6 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -68,6 +68,7 @@ func (s *Service) Start() { log.Infof("%v starting", common.WorkerServiceName) base := service.New(p) + base.Start() s.metricsClient = base.GetMetricsClient() @@ -85,7 +86,13 @@ func (s *Service) Start() { } metadataManager = persistence.NewMetadataPersistenceClient(metadataManager, base.GetMetricsClient()) - replicator := NewReplicator(p.ClusterMetadata, metadataManager, s.config, p.MessagingClient, log, s.metricsClient) + history, err := base.GetClientFactory().NewHistoryClient() + if err != nil { + log.Fatalf("failed to create history service client: %v", err) + } + + replicator := NewReplicator(p.ClusterMetadata, metadataManager, history, s.config, p.MessagingClient, log, + s.metricsClient) if err := replicator.Start(); err != nil { replicator.Stop() log.Fatalf("Fail to start replicator: %v", err)