From a496241aae71e53196b4b16c354c90814815930a Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Thu, 12 Jul 2018 10:38:49 -0700 Subject: [PATCH] Add time sync functionality, to move the view of time of a remote cluster (#952) * Add time sync functionality, to move the view of time of a remote cluster --- .../history/historyservice_syncshardstatus.go | 547 ++++++++++++++++++ .../go/history/historyserviceclient/client.go | 29 + .../go/history/historyserviceserver/server.go | 37 +- .gen/go/history/historyservicetest/client.go | 31 + .gen/go/history/idl.go | 4 +- .gen/go/history/types.go | 188 ++++++ .gen/go/replicator/idl.go | 4 +- .gen/go/replicator/types.go | 253 +++++++- client/history/client.go | 22 + client/history/metricClient.go | 17 + client/history/retryableClient.go | 12 + common/metrics/defs.go | 9 + common/mocks/HistoryClient.go | 14 + common/persistence/dataInterfaces.go | 4 +- idl/github.com/uber/cadence/history.thrift | 17 + idl/github.com/uber/cadence/replicator.thrift | 8 + service/history/MockHistoryEngine.go | 14 + service/history/handler.go | 41 +- service/history/historyEngine.go | 14 + service/history/historyEngineInterfaces.go | 1 + service/history/queueAckMgr.go | 15 +- service/history/queueAckMgr_test.go | 1 + service/history/shardContext.go | 22 +- service/history/shardController.go | 39 +- service/history/shardController_test.go | 8 +- service/history/timerQueueAckMgr.go | 5 +- service/history/timerQueueAckMgr_test.go | 2 +- service/worker/processor.go | 21 + 28 files changed, 1328 insertions(+), 51 deletions(-) create mode 100644 .gen/go/history/historyservice_syncshardstatus.go diff --git a/.gen/go/history/historyservice_syncshardstatus.go b/.gen/go/history/historyservice_syncshardstatus.go new file mode 100644 index 00000000000..d1502cd8086 --- /dev/null +++ b/.gen/go/history/historyservice_syncshardstatus.go @@ -0,0 +1,547 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by thriftrw v1.12.0. DO NOT EDIT. +// @generated + +package history + +import ( + "errors" + "fmt" + "github.com/uber/cadence/.gen/go/shared" + "go.uber.org/thriftrw/wire" + "strings" +) + +// HistoryService_SyncShardStatus_Args represents the arguments for the HistoryService.SyncShardStatus function. +// +// The arguments for SyncShardStatus are sent and received over the wire as this struct. +type HistoryService_SyncShardStatus_Args struct { + SyncShardStatusRequest *SyncShardStatusRequest `json:"syncShardStatusRequest,omitempty"` +} + +// ToWire translates a HistoryService_SyncShardStatus_Args struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *HistoryService_SyncShardStatus_Args) ToWire() (wire.Value, error) { + var ( + fields [1]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.SyncShardStatusRequest != nil { + w, err = v.SyncShardStatusRequest.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 1, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +func _SyncShardStatusRequest_Read(w wire.Value) (*SyncShardStatusRequest, error) { + var v SyncShardStatusRequest + err := v.FromWire(w) + return &v, err +} + +// FromWire deserializes a HistoryService_SyncShardStatus_Args struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a HistoryService_SyncShardStatus_Args struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v HistoryService_SyncShardStatus_Args +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *HistoryService_SyncShardStatus_Args) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 1: + if field.Value.Type() == wire.TStruct { + v.SyncShardStatusRequest, err = _SyncShardStatusRequest_Read(field.Value) + if err != nil { + return err + } + + } + } + } + + return nil +} + +// String returns a readable string representation of a HistoryService_SyncShardStatus_Args +// struct. +func (v *HistoryService_SyncShardStatus_Args) String() string { + if v == nil { + return "" + } + + var fields [1]string + i := 0 + if v.SyncShardStatusRequest != nil { + fields[i] = fmt.Sprintf("SyncShardStatusRequest: %v", v.SyncShardStatusRequest) + i++ + } + + return fmt.Sprintf("HistoryService_SyncShardStatus_Args{%v}", strings.Join(fields[:i], ", ")) +} + +// Equals returns true if all the fields of this HistoryService_SyncShardStatus_Args match the +// provided HistoryService_SyncShardStatus_Args. +// +// This function performs a deep comparison. +func (v *HistoryService_SyncShardStatus_Args) Equals(rhs *HistoryService_SyncShardStatus_Args) bool { + if !((v.SyncShardStatusRequest == nil && rhs.SyncShardStatusRequest == nil) || (v.SyncShardStatusRequest != nil && rhs.SyncShardStatusRequest != nil && v.SyncShardStatusRequest.Equals(rhs.SyncShardStatusRequest))) { + return false + } + + return true +} + +// GetSyncShardStatusRequest returns the value of SyncShardStatusRequest if it is set or its +// zero value if it is unset. +func (v *HistoryService_SyncShardStatus_Args) GetSyncShardStatusRequest() (o *SyncShardStatusRequest) { + if v.SyncShardStatusRequest != nil { + return v.SyncShardStatusRequest + } + + return +} + +// MethodName returns the name of the Thrift function as specified in +// the IDL, for which this struct represent the arguments. +// +// This will always be "SyncShardStatus" for this struct. +func (v *HistoryService_SyncShardStatus_Args) MethodName() string { + return "SyncShardStatus" +} + +// EnvelopeType returns the kind of value inside this struct. +// +// This will always be Call for this struct. +func (v *HistoryService_SyncShardStatus_Args) EnvelopeType() wire.EnvelopeType { + return wire.Call +} + +// HistoryService_SyncShardStatus_Helper provides functions that aid in handling the +// parameters and return values of the HistoryService.SyncShardStatus +// function. +var HistoryService_SyncShardStatus_Helper = struct { + // Args accepts the parameters of SyncShardStatus in-order and returns + // the arguments struct for the function. + Args func( + syncShardStatusRequest *SyncShardStatusRequest, + ) *HistoryService_SyncShardStatus_Args + + // IsException returns true if the given error can be thrown + // by SyncShardStatus. + // + // An error can be thrown by SyncShardStatus only if the + // corresponding exception type was mentioned in the 'throws' + // section for it in the Thrift file. + IsException func(error) bool + + // WrapResponse returns the result struct for SyncShardStatus + // given the error returned by it. The provided error may + // be nil if SyncShardStatus did not fail. + // + // This allows mapping errors returned by SyncShardStatus into a + // serializable result struct. WrapResponse returns a + // non-nil error if the provided error cannot be thrown by + // SyncShardStatus + // + // err := SyncShardStatus(args) + // result, err := HistoryService_SyncShardStatus_Helper.WrapResponse(err) + // if err != nil { + // return fmt.Errorf("unexpected error from SyncShardStatus: %v", err) + // } + // serialize(result) + WrapResponse func(error) (*HistoryService_SyncShardStatus_Result, error) + + // UnwrapResponse takes the result struct for SyncShardStatus + // and returns the erorr returned by it (if any). + // + // The error is non-nil only if SyncShardStatus threw an + // exception. + // + // result := deserialize(bytes) + // err := HistoryService_SyncShardStatus_Helper.UnwrapResponse(result) + UnwrapResponse func(*HistoryService_SyncShardStatus_Result) error +}{} + +func init() { + HistoryService_SyncShardStatus_Helper.Args = func( + syncShardStatusRequest *SyncShardStatusRequest, + ) *HistoryService_SyncShardStatus_Args { + return &HistoryService_SyncShardStatus_Args{ + SyncShardStatusRequest: syncShardStatusRequest, + } + } + + HistoryService_SyncShardStatus_Helper.IsException = func(err error) bool { + switch err.(type) { + case *shared.BadRequestError: + return true + case *shared.InternalServiceError: + return true + case *ShardOwnershipLostError: + return true + case *shared.LimitExceededError: + return true + default: + return false + } + } + + HistoryService_SyncShardStatus_Helper.WrapResponse = func(err error) (*HistoryService_SyncShardStatus_Result, error) { + if err == nil { + return &HistoryService_SyncShardStatus_Result{}, nil + } + + switch e := err.(type) { + case *shared.BadRequestError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for HistoryService_SyncShardStatus_Result.BadRequestError") + } + return &HistoryService_SyncShardStatus_Result{BadRequestError: e}, nil + case *shared.InternalServiceError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for HistoryService_SyncShardStatus_Result.InternalServiceError") + } + return &HistoryService_SyncShardStatus_Result{InternalServiceError: e}, nil + case *ShardOwnershipLostError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for HistoryService_SyncShardStatus_Result.ShardOwnershipLostError") + } + return &HistoryService_SyncShardStatus_Result{ShardOwnershipLostError: e}, nil + case *shared.LimitExceededError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for HistoryService_SyncShardStatus_Result.LimitExceededError") + } + return &HistoryService_SyncShardStatus_Result{LimitExceededError: e}, nil + } + + return nil, err + } + HistoryService_SyncShardStatus_Helper.UnwrapResponse = func(result *HistoryService_SyncShardStatus_Result) (err error) { + if result.BadRequestError != nil { + err = result.BadRequestError + return + } + if result.InternalServiceError != nil { + err = result.InternalServiceError + return + } + if result.ShardOwnershipLostError != nil { + err = result.ShardOwnershipLostError + return + } + if result.LimitExceededError != nil { + err = result.LimitExceededError + return + } + return + } + +} + +// HistoryService_SyncShardStatus_Result represents the result of a HistoryService.SyncShardStatus function call. +// +// The result of a SyncShardStatus execution is sent and received over the wire as this struct. +type HistoryService_SyncShardStatus_Result struct { + BadRequestError *shared.BadRequestError `json:"badRequestError,omitempty"` + InternalServiceError *shared.InternalServiceError `json:"internalServiceError,omitempty"` + ShardOwnershipLostError *ShardOwnershipLostError `json:"shardOwnershipLostError,omitempty"` + LimitExceededError *shared.LimitExceededError `json:"limitExceededError,omitempty"` +} + +// ToWire translates a HistoryService_SyncShardStatus_Result struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *HistoryService_SyncShardStatus_Result) ToWire() (wire.Value, error) { + var ( + fields [4]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.BadRequestError != nil { + w, err = v.BadRequestError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 1, Value: w} + i++ + } + if v.InternalServiceError != nil { + w, err = v.InternalServiceError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 2, Value: w} + i++ + } + if v.ShardOwnershipLostError != nil { + w, err = v.ShardOwnershipLostError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 4, Value: w} + i++ + } + if v.LimitExceededError != nil { + w, err = v.LimitExceededError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 5, Value: w} + i++ + } + + if i > 1 { + return wire.Value{}, fmt.Errorf("HistoryService_SyncShardStatus_Result should have at most one field: got %v fields", i) + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +// FromWire deserializes a HistoryService_SyncShardStatus_Result struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a HistoryService_SyncShardStatus_Result struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v HistoryService_SyncShardStatus_Result +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *HistoryService_SyncShardStatus_Result) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 1: + if field.Value.Type() == wire.TStruct { + v.BadRequestError, err = _BadRequestError_Read(field.Value) + if err != nil { + return err + } + + } + case 2: + if field.Value.Type() == wire.TStruct { + v.InternalServiceError, err = _InternalServiceError_Read(field.Value) + if err != nil { + return err + } + + } + case 4: + if field.Value.Type() == wire.TStruct { + v.ShardOwnershipLostError, err = _ShardOwnershipLostError_Read(field.Value) + if err != nil { + return err + } + + } + case 5: + if field.Value.Type() == wire.TStruct { + v.LimitExceededError, err = _LimitExceededError_Read(field.Value) + if err != nil { + return err + } + + } + } + } + + count := 0 + if v.BadRequestError != nil { + count++ + } + if v.InternalServiceError != nil { + count++ + } + if v.ShardOwnershipLostError != nil { + count++ + } + if v.LimitExceededError != nil { + count++ + } + if count > 1 { + return fmt.Errorf("HistoryService_SyncShardStatus_Result should have at most one field: got %v fields", count) + } + + return nil +} + +// String returns a readable string representation of a HistoryService_SyncShardStatus_Result +// struct. +func (v *HistoryService_SyncShardStatus_Result) String() string { + if v == nil { + return "" + } + + var fields [4]string + i := 0 + if v.BadRequestError != nil { + fields[i] = fmt.Sprintf("BadRequestError: %v", v.BadRequestError) + i++ + } + if v.InternalServiceError != nil { + fields[i] = fmt.Sprintf("InternalServiceError: %v", v.InternalServiceError) + i++ + } + if v.ShardOwnershipLostError != nil { + fields[i] = fmt.Sprintf("ShardOwnershipLostError: %v", v.ShardOwnershipLostError) + i++ + } + if v.LimitExceededError != nil { + fields[i] = fmt.Sprintf("LimitExceededError: %v", v.LimitExceededError) + i++ + } + + return fmt.Sprintf("HistoryService_SyncShardStatus_Result{%v}", strings.Join(fields[:i], ", ")) +} + +// Equals returns true if all the fields of this HistoryService_SyncShardStatus_Result match the +// provided HistoryService_SyncShardStatus_Result. +// +// This function performs a deep comparison. +func (v *HistoryService_SyncShardStatus_Result) Equals(rhs *HistoryService_SyncShardStatus_Result) bool { + if !((v.BadRequestError == nil && rhs.BadRequestError == nil) || (v.BadRequestError != nil && rhs.BadRequestError != nil && v.BadRequestError.Equals(rhs.BadRequestError))) { + return false + } + if !((v.InternalServiceError == nil && rhs.InternalServiceError == nil) || (v.InternalServiceError != nil && rhs.InternalServiceError != nil && v.InternalServiceError.Equals(rhs.InternalServiceError))) { + return false + } + if !((v.ShardOwnershipLostError == nil && rhs.ShardOwnershipLostError == nil) || (v.ShardOwnershipLostError != nil && rhs.ShardOwnershipLostError != nil && v.ShardOwnershipLostError.Equals(rhs.ShardOwnershipLostError))) { + return false + } + if !((v.LimitExceededError == nil && rhs.LimitExceededError == nil) || (v.LimitExceededError != nil && rhs.LimitExceededError != nil && v.LimitExceededError.Equals(rhs.LimitExceededError))) { + return false + } + + return true +} + +// GetBadRequestError returns the value of BadRequestError if it is set or its +// zero value if it is unset. +func (v *HistoryService_SyncShardStatus_Result) GetBadRequestError() (o *shared.BadRequestError) { + if v.BadRequestError != nil { + return v.BadRequestError + } + + return +} + +// GetInternalServiceError returns the value of InternalServiceError if it is set or its +// zero value if it is unset. +func (v *HistoryService_SyncShardStatus_Result) GetInternalServiceError() (o *shared.InternalServiceError) { + if v.InternalServiceError != nil { + return v.InternalServiceError + } + + return +} + +// GetShardOwnershipLostError returns the value of ShardOwnershipLostError if it is set or its +// zero value if it is unset. +func (v *HistoryService_SyncShardStatus_Result) GetShardOwnershipLostError() (o *ShardOwnershipLostError) { + if v.ShardOwnershipLostError != nil { + return v.ShardOwnershipLostError + } + + return +} + +// GetLimitExceededError returns the value of LimitExceededError if it is set or its +// zero value if it is unset. +func (v *HistoryService_SyncShardStatus_Result) GetLimitExceededError() (o *shared.LimitExceededError) { + if v.LimitExceededError != nil { + return v.LimitExceededError + } + + return +} + +// MethodName returns the name of the Thrift function as specified in +// the IDL, for which this struct represent the result. +// +// This will always be "SyncShardStatus" for this struct. +func (v *HistoryService_SyncShardStatus_Result) MethodName() string { + return "SyncShardStatus" +} + +// EnvelopeType returns the kind of value inside this struct. +// +// This will always be Reply for this struct. +func (v *HistoryService_SyncShardStatus_Result) EnvelopeType() wire.EnvelopeType { + return wire.Reply +} diff --git a/.gen/go/history/historyserviceclient/client.go b/.gen/go/history/historyserviceclient/client.go index 660c925cf06..48fd0fe656f 100644 --- a/.gen/go/history/historyserviceclient/client.go +++ b/.gen/go/history/historyserviceclient/client.go @@ -162,6 +162,12 @@ type Interface interface { opts ...yarpc.CallOption, ) (*shared.StartWorkflowExecutionResponse, error) + SyncShardStatus( + ctx context.Context, + SyncShardStatusRequest *history.SyncShardStatusRequest, + opts ...yarpc.CallOption, + ) error + TerminateWorkflowExecution( ctx context.Context, TerminateRequest *history.TerminateWorkflowExecutionRequest, @@ -676,6 +682,29 @@ func (c client) StartWorkflowExecution( return } +func (c client) SyncShardStatus( + ctx context.Context, + _SyncShardStatusRequest *history.SyncShardStatusRequest, + opts ...yarpc.CallOption, +) (err error) { + + args := history.HistoryService_SyncShardStatus_Helper.Args(_SyncShardStatusRequest) + + var body wire.Value + body, err = c.c.Call(ctx, args, opts...) + if err != nil { + return + } + + var result history.HistoryService_SyncShardStatus_Result + if err = result.FromWire(body); err != nil { + return + } + + err = history.HistoryService_SyncShardStatus_Helper.UnwrapResponse(&result) + return +} + func (c client) TerminateWorkflowExecution( ctx context.Context, _TerminateRequest *history.TerminateWorkflowExecutionRequest, diff --git a/.gen/go/history/historyserviceserver/server.go b/.gen/go/history/historyserviceserver/server.go index 117ad2281ca..e05157461d8 100644 --- a/.gen/go/history/historyserviceserver/server.go +++ b/.gen/go/history/historyserviceserver/server.go @@ -139,6 +139,11 @@ type Interface interface { StartRequest *history.StartWorkflowExecutionRequest, ) (*shared.StartWorkflowExecutionResponse, error) + SyncShardStatus( + ctx context.Context, + SyncShardStatusRequest *history.SyncShardStatusRequest, + ) error + TerminateWorkflowExecution( ctx context.Context, TerminateRequest *history.TerminateWorkflowExecutionRequest, @@ -387,6 +392,17 @@ func New(impl Interface, opts ...thrift.RegisterOption) []transport.Procedure { ThriftModule: history.ThriftModule, }, + thrift.Method{ + Name: "SyncShardStatus", + HandlerSpec: thrift.HandlerSpec{ + + Type: transport.Unary, + Unary: thrift.UnaryHandler(h.SyncShardStatus), + }, + Signature: "SyncShardStatus(SyncShardStatusRequest *history.SyncShardStatusRequest)", + ThriftModule: history.ThriftModule, + }, + thrift.Method{ Name: "TerminateWorkflowExecution", HandlerSpec: thrift.HandlerSpec{ @@ -400,7 +416,7 @@ func New(impl Interface, opts ...thrift.RegisterOption) []transport.Procedure { }, } - procedures := make([]transport.Procedure, 0, 22) + procedures := make([]transport.Procedure, 0, 23) procedures = append(procedures, thrift.BuildProcedures(service, opts...)...) return procedures } @@ -806,6 +822,25 @@ func (h handler) StartWorkflowExecution(ctx context.Context, body wire.Value) (t return response, err } +func (h handler) SyncShardStatus(ctx context.Context, body wire.Value) (thrift.Response, error) { + var args history.HistoryService_SyncShardStatus_Args + if err := args.FromWire(body); err != nil { + return thrift.Response{}, err + } + + err := h.impl.SyncShardStatus(ctx, args.SyncShardStatusRequest) + + hadError := err != nil + result, err := history.HistoryService_SyncShardStatus_Helper.WrapResponse(err) + + var response thrift.Response + if err == nil { + response.IsApplicationError = hadError + response.Body = result + } + return response, err +} + func (h handler) TerminateWorkflowExecution(ctx context.Context, body wire.Value) (thrift.Response, error) { var args history.HistoryService_TerminateWorkflowExecution_Args if err := args.FromWire(body); err != nil { diff --git a/.gen/go/history/historyservicetest/client.go b/.gen/go/history/historyservicetest/client.go index 4653bc9fcc2..96fd2605624 100644 --- a/.gen/go/history/historyservicetest/client.go +++ b/.gen/go/history/historyservicetest/client.go @@ -736,6 +736,37 @@ func (mr *_MockClientRecorder) StartWorkflowExecution( return mr.mock.ctrl.RecordCall(mr.mock, "StartWorkflowExecution", args...) } +// SyncShardStatus responds to a SyncShardStatus call based on the mock expectations. This +// call will fail if the mock does not expect this call. Use EXPECT to expect +// a call to this function. +// +// client.EXPECT().SyncShardStatus(gomock.Any(), ...).Return(...) +// ... := client.SyncShardStatus(...) +func (m *MockClient) SyncShardStatus( + ctx context.Context, + _SyncShardStatusRequest *history.SyncShardStatusRequest, + opts ...yarpc.CallOption, +) (err error) { + + args := []interface{}{ctx, _SyncShardStatusRequest} + for _, o := range opts { + args = append(args, o) + } + i := 0 + ret := m.ctrl.Call(m, "SyncShardStatus", args...) + err, _ = ret[i].(error) + return +} + +func (mr *_MockClientRecorder) SyncShardStatus( + ctx interface{}, + _SyncShardStatusRequest interface{}, + opts ...interface{}, +) *gomock.Call { + args := append([]interface{}{ctx, _SyncShardStatusRequest}, opts...) + return mr.mock.ctrl.RecordCall(mr.mock, "SyncShardStatus", args...) +} + // TerminateWorkflowExecution responds to a TerminateWorkflowExecution call based on the mock expectations. This // call will fail if the mock does not expect this call. Use EXPECT to expect // a call to this function. diff --git a/.gen/go/history/idl.go b/.gen/go/history/idl.go index 0ec9a8923bd..4e8457d0cb3 100644 --- a/.gen/go/history/idl.go +++ b/.gen/go/history/idl.go @@ -33,11 +33,11 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "history", Package: "github.com/uber/cadence/.gen/go/history", FilePath: "history.thrift", - SHA1: "d8d51c0495729b0bc672aa122e96a9659aa226ab", + SHA1: "646f163e5cdc88ac9e0ac152bbb1e96622fdbbee", Includes: []*thriftreflect.ThriftModule{ shared.ThriftModule, }, Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\ninclude \"shared.thrift\"\n\nnamespace java com.uber.cadence.history\n\nexception EventAlreadyStartedError {\n 1: required string message\n}\n\nexception ShardOwnershipLostError {\n 10: optional string message\n 20: optional string owner\n}\n\nstruct ParentExecutionInfo {\n 10: optional string domainUUID\n 15: optional string domain\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") initiatedId\n}\n\nstruct StartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.StartWorkflowExecutionRequest startRequest\n 30: optional ParentExecutionInfo parentExecutionInfo\n}\n\nstruct DescribeMutableStateRequest{\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n}\n\nstruct DescribeMutableStateResponse{\n 30: optional string mutableStateInCache\n 40: optional string mutableStateInDatabase\n}\n\nstruct GetMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") expectedNextEventId\n}\n\nstruct GetMutableStateResponse {\n 10: optional shared.WorkflowExecution execution\n 20: optional shared.WorkflowType workflowType\n 30: optional i64 (js.type = \"Long\") NextEventId\n 40: optional i64 (js.type = \"Long\") LastFirstEventId\n 50: optional shared.TaskList taskList\n 60: optional shared.TaskList stickyTaskList\n 70: optional string clientLibraryVersion\n 80: optional string clientFeatureVersion\n 90: optional string clientImpl\n 100: optional bool isWorkflowRunning\n 110: optional i32 stickyTaskListScheduleToStartTimeout\n}\n\nstruct ResetStickyTaskListRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n}\n\nstruct ResetStickyTaskListResponse {\n // The reason to keep this response is to allow returning\n // information in the future.\n}\n\nstruct RespondDecisionTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskCompletedRequest completeRequest\n}\n\nstruct RespondDecisionTaskCompletedResponse {\n 10: optional RecordDecisionTaskStartedResponse startedResponse\n}\n\nstruct RespondDecisionTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskFailedRequest failedRequest\n}\n\nstruct RecordActivityTaskHeartbeatRequest {\n 10: optional string domainUUID\n 20: optional shared.RecordActivityTaskHeartbeatRequest heartbeatRequest\n}\n\nstruct RespondActivityTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCompletedRequest completeRequest\n}\n\nstruct RespondActivityTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskFailedRequest failedRequest\n}\n\nstruct RespondActivityTaskCanceledRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCanceledRequest cancelRequest\n}\n\nstruct RecordActivityTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForActivityTaskRequest pollRequest\n}\n\nstruct RecordActivityTaskStartedResponse {\n 20: optional shared.HistoryEvent scheduledEvent\n 30: optional i64 (js.type = \"Long\") startedTimestamp\n 40: optional i64 (js.type = \"Long\") attempt\n 50: optional i64 (js.type = \"Long\") scheduledTimestampOfThisAttempt\n}\n\nstruct RecordDecisionTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForDecisionTaskRequest pollRequest\n}\n\nstruct RecordDecisionTaskStartedResponse {\n 10: optional shared.WorkflowType workflowType\n 20: optional i64 (js.type = \"Long\") previousStartedEventId\n 30: optional i64 (js.type = \"Long\") scheduledEventId\n 40: optional i64 (js.type = \"Long\") startedEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") attempt\n 70: optional bool stickyExecutionEnabled\n 80: optional shared.TransientDecisionInfo decisionInfo\n}\n\nstruct SignalWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWorkflowExecutionRequest signalRequest\n 30: optional shared.WorkflowExecution externalWorkflowExecution\n 40: optional bool childWorkflowOnly\n}\n\nstruct SignalWithStartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWithStartWorkflowExecutionRequest signalWithStartRequest\n}\n\nstruct RemoveSignalMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional string requestId\n}\n\nstruct TerminateWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.TerminateWorkflowExecutionRequest terminateRequest\n}\n\nstruct RequestCancelWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.RequestCancelWorkflowExecutionRequest cancelRequest\n 30: optional i64 (js.type = \"Long\") externalInitiatedEventId\n 40: optional shared.WorkflowExecution externalWorkflowExecution\n 50: optional bool childWorkflowOnly\n}\n\nstruct ScheduleDecisionTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n}\n\nstruct DescribeWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.DescribeWorkflowExecutionRequest request\n}\n\n/**\n* RecordChildExecutionCompletedRequest is used for reporting the completion of child execution to parent workflow\n* execution which started it. When a child execution is completed it creates this request and calls the\n* RecordChildExecutionCompleted API with the workflowExecution of parent. It also sets the completedExecution of the\n* child as it could potentially be different than the ChildExecutionStartedEvent of parent in the situation when\n* child creates multiple runs through ContinueAsNew before finally completing.\n**/\nstruct RecordChildExecutionCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") initiatedId\n 40: optional shared.WorkflowExecution completedExecution\n 50: optional shared.HistoryEvent completionEvent\n}\n\nstruct ReplicationInfo {\n 10: optional i64 (js.type = \"Long\") version\n 20: optional i64 (js.type = \"Long\") lastEventId\n}\n\nstruct ReplicateEventsRequest {\n 10: optional string sourceCluster\n 20: optional string domainUUID\n 30: optional shared.WorkflowExecution workflowExecution\n 40: optional i64 (js.type = \"Long\") firstEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") version\n 70: optional map replicationInfo\n 80: optional shared.History history\n 90: optional shared.History newRunHistory\n 100: optional bool forceBufferEvents\n}\n\n/**\n* HistoryService provides API to start a new long running workflow instance, as well as query and update the history\n* of workflow instances already created.\n**/\nservice HistoryService {\n /**\n * StartWorkflowExecution starts a new long running workflow instance. It will create the instance with\n * 'WorkflowExecutionStarted' event in history and also schedule the first DecisionTask for the worker to make the\n * first decision for this instance. It will return 'WorkflowExecutionAlreadyStartedError', if an instance already\n * exists with same workflowId.\n **/\n shared.StartWorkflowExecutionResponse StartWorkflowExecution(1: StartWorkflowExecutionRequest startRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.WorkflowExecutionAlreadyStartedError sessionAlreadyExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * Returns the information from mutable state of workflow execution.\n * It fails with 'EntityNotExistError' if specified workflow execution in unknown to the service.\n **/\n GetMutableStateResponse GetMutableState(1: GetMutableStateRequest getRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * Reset the sticky tasklist related information in mutable state of a given workflow.\n * Things cleared are:\n * 1. StickyTaskList\n * 2. StickyScheduleToStartTimeout\n * 3. ClientLibraryVersion\n * 4. ClientFeatureVersion\n * 5. ClientImpl\n **/\n ResetStickyTaskListResponse ResetStickyTaskList(1: ResetStickyTaskListRequest resetRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RecordDecisionTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForDecisionTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordDecisionTaskStartedResponse RecordDecisionTaskStarted(1: RecordDecisionTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RecordActivityTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForActivityTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordActivityTaskStartedResponse RecordActivityTaskStarted(1: RecordActivityTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondDecisionTaskCompleted is called by application worker to complete a DecisionTask handed as a result of\n * 'PollForDecisionTask' API call. Completing a DecisionTask will result in new events for the workflow execution and\n * potentially new ActivityTask being created for corresponding decisions. It will also create a DecisionTaskCompleted\n * event in the history for that session. Use the 'taskToken' provided as response of PollForDecisionTask API call\n * for completing the DecisionTask.\n **/\n RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted(1: RespondDecisionTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondDecisionTaskFailed is called by application worker to indicate failure. This results in\n * DecisionTaskFailedEvent written to the history and a new DecisionTask created. This API can be used by client to\n * either clear sticky tasklist or report ny panics during DecisionTask processing.\n **/\n void RespondDecisionTaskFailed(1: RespondDecisionTaskFailedRequest failedRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RecordActivityTaskHeartbeat is called by application worker while it is processing an ActivityTask. If worker fails\n * to heartbeat within 'heartbeatTimeoutSeconds' interval for the ActivityTask, then it will be marked as timedout and\n * 'ActivityTaskTimedOut' event will be written to the workflow history. Calling 'RecordActivityTaskHeartbeat' will\n * fail with 'EntityNotExistsError' in such situations. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for heartbeating.\n **/\n shared.RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat(1: RecordActivityTaskHeartbeatRequest heartbeatRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondActivityTaskCompleted is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskCompleted' event being written to the workflow history and a new DecisionTask\n * created for the workflow so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCompleted(1: RespondActivityTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondActivityTaskFailed is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskFailed' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskFailed(1: RespondActivityTaskFailedRequest failRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondActivityTaskCanceled is called by application worker when it is successfully canceled an ActivityTask. It will\n * result in a new 'ActivityTaskCanceled' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCanceled(1: RespondActivityTaskCanceledRequest canceledRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in\n * WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.\n **/\n void SignalWorkflowExecution(1: SignalWorkflowExecutionRequest signalRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.ServiceBusyError serviceBusyError,\n 7: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.\n * If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history\n * and a decision task being created for the execution.\n * If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled\n * event recorded in history, and a decision task being created for the execution\n **/\n shared.StartWorkflowExecutionResponse SignalWithStartWorkflowExecution(1: SignalWithStartWorkflowExecutionRequest signalWithStartRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: ShardOwnershipLostError shardOwnershipLostError,\n 4: shared.DomainNotActiveError domainNotActiveError,\n 5: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RemoveSignalMutableState is used to remove a signal request ID that was previously recorded. This is currently\n * used to clean execution info when signal decision finished.\n **/\n void RemoveSignalMutableState(1: RemoveSignalMutableStateRequest removeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event\n * in the history and immediately terminating the execution instance.\n **/\n void TerminateWorkflowExecution(1: TerminateWorkflowExecutionRequest terminateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RequestCancelWorkflowExecution is called by application worker when it wants to request cancellation of a workflow instance.\n * It will result in a new 'WorkflowExecutionCancelRequested' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. It fails with 'EntityNotExistsError' if the workflow is not valid\n * anymore due to completion or doesn't exist.\n **/\n void RequestCancelWorkflowExecution(1: RequestCancelWorkflowExecutionRequest cancelRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.CancellationAlreadyRequestedError cancellationAlreadyRequestedError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * ScheduleDecisionTask is used for creating a decision task for already started workflow execution. This is mainly\n * used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts\n * child execution without creating the decision task and then calls this API after updating the mutable state of\n * parent execution.\n **/\n void ScheduleDecisionTask(1: ScheduleDecisionTaskRequest scheduleRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent.\n * This is mainly called by transfer queue processor during the processing of DeleteExecution task.\n **/\n void RecordChildExecutionCompleted(1: RecordChildExecutionCompletedRequest completionRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * DescribeWorkflowExecution returns information about the specified workflow execution.\n **/\n shared.DescribeWorkflowExecutionResponse DescribeWorkflowExecution(1: DescribeWorkflowExecutionRequest describeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n )\n\n void ReplicateEvents(1: ReplicateEventsRequest replicateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.RetryTaskError retryTaskError,\n )\n\n /**\n * DescribeMutableState returns information about the internal states of workflow mutable state.\n **/\n DescribeMutableStateResponse DescribeMutableState(1: DescribeMutableStateRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.AccessDeniedError accessDeniedError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * DescribeHistoryHost returns information about the internal states of a history host\n **/\n shared.DescribeHistoryHostResponse DescribeHistoryHost(1: shared.DescribeHistoryHostRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.AccessDeniedError accessDeniedError,\n )\n}\n" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\ninclude \"shared.thrift\"\n\nnamespace java com.uber.cadence.history\n\nexception EventAlreadyStartedError {\n 1: required string message\n}\n\nexception ShardOwnershipLostError {\n 10: optional string message\n 20: optional string owner\n}\n\nstruct ParentExecutionInfo {\n 10: optional string domainUUID\n 15: optional string domain\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") initiatedId\n}\n\nstruct StartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.StartWorkflowExecutionRequest startRequest\n 30: optional ParentExecutionInfo parentExecutionInfo\n}\n\nstruct DescribeMutableStateRequest{\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n}\n\nstruct DescribeMutableStateResponse{\n 30: optional string mutableStateInCache\n 40: optional string mutableStateInDatabase\n}\n\nstruct GetMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") expectedNextEventId\n}\n\nstruct GetMutableStateResponse {\n 10: optional shared.WorkflowExecution execution\n 20: optional shared.WorkflowType workflowType\n 30: optional i64 (js.type = \"Long\") NextEventId\n 40: optional i64 (js.type = \"Long\") LastFirstEventId\n 50: optional shared.TaskList taskList\n 60: optional shared.TaskList stickyTaskList\n 70: optional string clientLibraryVersion\n 80: optional string clientFeatureVersion\n 90: optional string clientImpl\n 100: optional bool isWorkflowRunning\n 110: optional i32 stickyTaskListScheduleToStartTimeout\n}\n\nstruct ResetStickyTaskListRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n}\n\nstruct ResetStickyTaskListResponse {\n // The reason to keep this response is to allow returning\n // information in the future.\n}\n\nstruct RespondDecisionTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskCompletedRequest completeRequest\n}\n\nstruct RespondDecisionTaskCompletedResponse {\n 10: optional RecordDecisionTaskStartedResponse startedResponse\n}\n\nstruct RespondDecisionTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskFailedRequest failedRequest\n}\n\nstruct RecordActivityTaskHeartbeatRequest {\n 10: optional string domainUUID\n 20: optional shared.RecordActivityTaskHeartbeatRequest heartbeatRequest\n}\n\nstruct RespondActivityTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCompletedRequest completeRequest\n}\n\nstruct RespondActivityTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskFailedRequest failedRequest\n}\n\nstruct RespondActivityTaskCanceledRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCanceledRequest cancelRequest\n}\n\nstruct RecordActivityTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForActivityTaskRequest pollRequest\n}\n\nstruct RecordActivityTaskStartedResponse {\n 20: optional shared.HistoryEvent scheduledEvent\n 30: optional i64 (js.type = \"Long\") startedTimestamp\n 40: optional i64 (js.type = \"Long\") attempt\n 50: optional i64 (js.type = \"Long\") scheduledTimestampOfThisAttempt\n}\n\nstruct RecordDecisionTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForDecisionTaskRequest pollRequest\n}\n\nstruct RecordDecisionTaskStartedResponse {\n 10: optional shared.WorkflowType workflowType\n 20: optional i64 (js.type = \"Long\") previousStartedEventId\n 30: optional i64 (js.type = \"Long\") scheduledEventId\n 40: optional i64 (js.type = \"Long\") startedEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") attempt\n 70: optional bool stickyExecutionEnabled\n 80: optional shared.TransientDecisionInfo decisionInfo\n}\n\nstruct SignalWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWorkflowExecutionRequest signalRequest\n 30: optional shared.WorkflowExecution externalWorkflowExecution\n 40: optional bool childWorkflowOnly\n}\n\nstruct SignalWithStartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWithStartWorkflowExecutionRequest signalWithStartRequest\n}\n\nstruct RemoveSignalMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional string requestId\n}\n\nstruct TerminateWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.TerminateWorkflowExecutionRequest terminateRequest\n}\n\nstruct RequestCancelWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.RequestCancelWorkflowExecutionRequest cancelRequest\n 30: optional i64 (js.type = \"Long\") externalInitiatedEventId\n 40: optional shared.WorkflowExecution externalWorkflowExecution\n 50: optional bool childWorkflowOnly\n}\n\nstruct ScheduleDecisionTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n}\n\nstruct DescribeWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.DescribeWorkflowExecutionRequest request\n}\n\n/**\n* RecordChildExecutionCompletedRequest is used for reporting the completion of child execution to parent workflow\n* execution which started it. When a child execution is completed it creates this request and calls the\n* RecordChildExecutionCompleted API with the workflowExecution of parent. It also sets the completedExecution of the\n* child as it could potentially be different than the ChildExecutionStartedEvent of parent in the situation when\n* child creates multiple runs through ContinueAsNew before finally completing.\n**/\nstruct RecordChildExecutionCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") initiatedId\n 40: optional shared.WorkflowExecution completedExecution\n 50: optional shared.HistoryEvent completionEvent\n}\n\nstruct ReplicationInfo {\n 10: optional i64 (js.type = \"Long\") version\n 20: optional i64 (js.type = \"Long\") lastEventId\n}\n\nstruct ReplicateEventsRequest {\n 10: optional string sourceCluster\n 20: optional string domainUUID\n 30: optional shared.WorkflowExecution workflowExecution\n 40: optional i64 (js.type = \"Long\") firstEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") version\n 70: optional map replicationInfo\n 80: optional shared.History history\n 90: optional shared.History newRunHistory\n 100: optional bool forceBufferEvents\n}\n\nstruct SyncShardStatusRequest {\n 10: optional string sourceCluster\n 20: optional i64 (js.type = \"Long\") shardId\n 30: optional i64 (js.type = \"Long\") timestamp\n}\n\n/**\n* HistoryService provides API to start a new long running workflow instance, as well as query and update the history\n* of workflow instances already created.\n**/\nservice HistoryService {\n /**\n * StartWorkflowExecution starts a new long running workflow instance. It will create the instance with\n * 'WorkflowExecutionStarted' event in history and also schedule the first DecisionTask for the worker to make the\n * first decision for this instance. It will return 'WorkflowExecutionAlreadyStartedError', if an instance already\n * exists with same workflowId.\n **/\n shared.StartWorkflowExecutionResponse StartWorkflowExecution(1: StartWorkflowExecutionRequest startRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.WorkflowExecutionAlreadyStartedError sessionAlreadyExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * Returns the information from mutable state of workflow execution.\n * It fails with 'EntityNotExistError' if specified workflow execution in unknown to the service.\n **/\n GetMutableStateResponse GetMutableState(1: GetMutableStateRequest getRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * Reset the sticky tasklist related information in mutable state of a given workflow.\n * Things cleared are:\n * 1. StickyTaskList\n * 2. StickyScheduleToStartTimeout\n * 3. ClientLibraryVersion\n * 4. ClientFeatureVersion\n * 5. ClientImpl\n **/\n ResetStickyTaskListResponse ResetStickyTaskList(1: ResetStickyTaskListRequest resetRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RecordDecisionTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForDecisionTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordDecisionTaskStartedResponse RecordDecisionTaskStarted(1: RecordDecisionTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RecordActivityTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForActivityTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordActivityTaskStartedResponse RecordActivityTaskStarted(1: RecordActivityTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondDecisionTaskCompleted is called by application worker to complete a DecisionTask handed as a result of\n * 'PollForDecisionTask' API call. Completing a DecisionTask will result in new events for the workflow execution and\n * potentially new ActivityTask being created for corresponding decisions. It will also create a DecisionTaskCompleted\n * event in the history for that session. Use the 'taskToken' provided as response of PollForDecisionTask API call\n * for completing the DecisionTask.\n **/\n RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted(1: RespondDecisionTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondDecisionTaskFailed is called by application worker to indicate failure. This results in\n * DecisionTaskFailedEvent written to the history and a new DecisionTask created. This API can be used by client to\n * either clear sticky tasklist or report ny panics during DecisionTask processing.\n **/\n void RespondDecisionTaskFailed(1: RespondDecisionTaskFailedRequest failedRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RecordActivityTaskHeartbeat is called by application worker while it is processing an ActivityTask. If worker fails\n * to heartbeat within 'heartbeatTimeoutSeconds' interval for the ActivityTask, then it will be marked as timedout and\n * 'ActivityTaskTimedOut' event will be written to the workflow history. Calling 'RecordActivityTaskHeartbeat' will\n * fail with 'EntityNotExistsError' in such situations. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for heartbeating.\n **/\n shared.RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat(1: RecordActivityTaskHeartbeatRequest heartbeatRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondActivityTaskCompleted is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskCompleted' event being written to the workflow history and a new DecisionTask\n * created for the workflow so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCompleted(1: RespondActivityTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondActivityTaskFailed is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskFailed' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskFailed(1: RespondActivityTaskFailedRequest failRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondActivityTaskCanceled is called by application worker when it is successfully canceled an ActivityTask. It will\n * result in a new 'ActivityTaskCanceled' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCanceled(1: RespondActivityTaskCanceledRequest canceledRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in\n * WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.\n **/\n void SignalWorkflowExecution(1: SignalWorkflowExecutionRequest signalRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.ServiceBusyError serviceBusyError,\n 7: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.\n * If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history\n * and a decision task being created for the execution.\n * If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled\n * event recorded in history, and a decision task being created for the execution\n **/\n shared.StartWorkflowExecutionResponse SignalWithStartWorkflowExecution(1: SignalWithStartWorkflowExecutionRequest signalWithStartRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: ShardOwnershipLostError shardOwnershipLostError,\n 4: shared.DomainNotActiveError domainNotActiveError,\n 5: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RemoveSignalMutableState is used to remove a signal request ID that was previously recorded. This is currently\n * used to clean execution info when signal decision finished.\n **/\n void RemoveSignalMutableState(1: RemoveSignalMutableStateRequest removeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event\n * in the history and immediately terminating the execution instance.\n **/\n void TerminateWorkflowExecution(1: TerminateWorkflowExecutionRequest terminateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RequestCancelWorkflowExecution is called by application worker when it wants to request cancellation of a workflow instance.\n * It will result in a new 'WorkflowExecutionCancelRequested' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. It fails with 'EntityNotExistsError' if the workflow is not valid\n * anymore due to completion or doesn't exist.\n **/\n void RequestCancelWorkflowExecution(1: RequestCancelWorkflowExecutionRequest cancelRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.CancellationAlreadyRequestedError cancellationAlreadyRequestedError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * ScheduleDecisionTask is used for creating a decision task for already started workflow execution. This is mainly\n * used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts\n * child execution without creating the decision task and then calls this API after updating the mutable state of\n * parent execution.\n **/\n void ScheduleDecisionTask(1: ScheduleDecisionTaskRequest scheduleRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent.\n * This is mainly called by transfer queue processor during the processing of DeleteExecution task.\n **/\n void RecordChildExecutionCompleted(1: RecordChildExecutionCompletedRequest completionRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * DescribeWorkflowExecution returns information about the specified workflow execution.\n **/\n shared.DescribeWorkflowExecutionResponse DescribeWorkflowExecution(1: DescribeWorkflowExecutionRequest describeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n )\n\n void ReplicateEvents(1: ReplicateEventsRequest replicateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.RetryTaskError retryTaskError,\n )\n\n /**\n * SyncShardStatus sync the status between shards\n **/\n void SyncShardStatus(1: SyncShardStatusRequest syncShardStatusRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * DescribeMutableState returns information about the internal states of workflow mutable state.\n **/\n DescribeMutableStateResponse DescribeMutableState(1: DescribeMutableStateRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.AccessDeniedError accessDeniedError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * DescribeHistoryHost returns information about the internal states of a history host\n **/\n shared.DescribeHistoryHostResponse DescribeHistoryHost(1: shared.DescribeHistoryHostRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.AccessDeniedError accessDeniedError,\n )\n}\n" diff --git a/.gen/go/history/types.go b/.gen/go/history/types.go index 561f05f9c61..eb17f0f26e0 100644 --- a/.gen/go/history/types.go +++ b/.gen/go/history/types.go @@ -6283,6 +6283,194 @@ func (v *StartWorkflowExecutionRequest) GetParentExecutionInfo() (o *ParentExecu return } +type SyncShardStatusRequest struct { + SourceCluster *string `json:"sourceCluster,omitempty"` + ShardId *int64 `json:"shardId,omitempty"` + Timestamp *int64 `json:"timestamp,omitempty"` +} + +// ToWire translates a SyncShardStatusRequest struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *SyncShardStatusRequest) ToWire() (wire.Value, error) { + var ( + fields [3]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.SourceCluster != nil { + w, err = wire.NewValueString(*(v.SourceCluster)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 10, Value: w} + i++ + } + if v.ShardId != nil { + w, err = wire.NewValueI64(*(v.ShardId)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 20, Value: w} + i++ + } + if v.Timestamp != nil { + w, err = wire.NewValueI64(*(v.Timestamp)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 30, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +// FromWire deserializes a SyncShardStatusRequest struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a SyncShardStatusRequest struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v SyncShardStatusRequest +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *SyncShardStatusRequest) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 10: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.SourceCluster = &x + if err != nil { + return err + } + + } + case 20: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.ShardId = &x + if err != nil { + return err + } + + } + case 30: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.Timestamp = &x + if err != nil { + return err + } + + } + } + } + + return nil +} + +// String returns a readable string representation of a SyncShardStatusRequest +// struct. +func (v *SyncShardStatusRequest) String() string { + if v == nil { + return "" + } + + var fields [3]string + i := 0 + if v.SourceCluster != nil { + fields[i] = fmt.Sprintf("SourceCluster: %v", *(v.SourceCluster)) + i++ + } + if v.ShardId != nil { + fields[i] = fmt.Sprintf("ShardId: %v", *(v.ShardId)) + i++ + } + if v.Timestamp != nil { + fields[i] = fmt.Sprintf("Timestamp: %v", *(v.Timestamp)) + i++ + } + + return fmt.Sprintf("SyncShardStatusRequest{%v}", strings.Join(fields[:i], ", ")) +} + +// Equals returns true if all the fields of this SyncShardStatusRequest match the +// provided SyncShardStatusRequest. +// +// This function performs a deep comparison. +func (v *SyncShardStatusRequest) Equals(rhs *SyncShardStatusRequest) bool { + if !_String_EqualsPtr(v.SourceCluster, rhs.SourceCluster) { + return false + } + if !_I64_EqualsPtr(v.ShardId, rhs.ShardId) { + return false + } + if !_I64_EqualsPtr(v.Timestamp, rhs.Timestamp) { + return false + } + + return true +} + +// GetSourceCluster returns the value of SourceCluster if it is set or its +// zero value if it is unset. +func (v *SyncShardStatusRequest) GetSourceCluster() (o string) { + if v.SourceCluster != nil { + return *v.SourceCluster + } + + return +} + +// GetShardId returns the value of ShardId if it is set or its +// zero value if it is unset. +func (v *SyncShardStatusRequest) GetShardId() (o int64) { + if v.ShardId != nil { + return *v.ShardId + } + + return +} + +// GetTimestamp returns the value of Timestamp if it is set or its +// zero value if it is unset. +func (v *SyncShardStatusRequest) GetTimestamp() (o int64) { + if v.Timestamp != nil { + return *v.Timestamp + } + + return +} + type TerminateWorkflowExecutionRequest struct { DomainUUID *string `json:"domainUUID,omitempty"` TerminateRequest *shared.TerminateWorkflowExecutionRequest `json:"terminateRequest,omitempty"` diff --git a/.gen/go/replicator/idl.go b/.gen/go/replicator/idl.go index 4f5233459c0..e2048e75e0f 100644 --- a/.gen/go/replicator/idl.go +++ b/.gen/go/replicator/idl.go @@ -34,7 +34,7 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "replicator", Package: "github.com/uber/cadence/.gen/go/replicator", FilePath: "replicator.thrift", - SHA1: "94e4e7c5d5894556ce2cd4dc8f06ca32790db05e", + SHA1: "bb7d998490c2bab36162a762640f3a280c8348c1", Includes: []*thriftreflect.ThriftModule{ history.ThriftModule, shared.ThriftModule, @@ -42,4 +42,4 @@ var ThriftModule = &thriftreflect.ThriftModule{ Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.replicator\n\ninclude \"shared.thrift\"\ninclude \"history.thrift\"\n\nenum ReplicationTaskType {\n Domain\n History\n}\n\nenum DomainOperation {\n Create\n Update\n}\n\nstruct DomainTaskAttributes {\n 05: optional DomainOperation domainOperation\n 10: optional string id\n 20: optional shared.DomainInfo info\n 30: optional shared.DomainConfiguration config\n 40: optional shared.DomainReplicationConfiguration replicationConfig\n 50: optional i64 (js.type = \"Long\") configVersion\n 60: optional i64 (js.type = \"Long\") failoverVersion\n}\n\nstruct HistoryTaskAttributes {\n 05: optional list targetClusters\n 10: optional string domainId\n 20: optional string workflowId\n 30: optional string runId\n 40: optional i64 (js.type = \"Long\") firstEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") version\n 70: optional map replicationInfo\n 80: optional shared.History history\n 90: optional shared.History newRunHistory\n}\n\nstruct ReplicationTask {\n 10: optional ReplicationTaskType taskType\n 20: optional DomainTaskAttributes domainTaskAttributes\n 30: optional HistoryTaskAttributes historyTaskAttributes\n}\n\n" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.replicator\n\ninclude \"shared.thrift\"\ninclude \"history.thrift\"\n\nenum ReplicationTaskType {\n Domain\n History\n SyncShardStatus\n}\n\nenum DomainOperation {\n Create\n Update\n}\n\nstruct DomainTaskAttributes {\n 05: optional DomainOperation domainOperation\n 10: optional string id\n 20: optional shared.DomainInfo info\n 30: optional shared.DomainConfiguration config\n 40: optional shared.DomainReplicationConfiguration replicationConfig\n 50: optional i64 (js.type = \"Long\") configVersion\n 60: optional i64 (js.type = \"Long\") failoverVersion\n}\n\nstruct HistoryTaskAttributes {\n 05: optional list targetClusters\n 10: optional string domainId\n 20: optional string workflowId\n 30: optional string runId\n 40: optional i64 (js.type = \"Long\") firstEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") version\n 70: optional map replicationInfo\n 80: optional shared.History history\n 90: optional shared.History newRunHistory\n}\n\nstruct SyncShardStatusTaskAttributes {\n 10: optional string sourceCluster\n 20: optional i64 (js.type = \"Long\") shardId\n 30: optional i64 (js.type = \"Long\") timestamp\n}\n\nstruct ReplicationTask {\n 10: optional ReplicationTaskType taskType\n 20: optional DomainTaskAttributes domainTaskAttributes\n 30: optional HistoryTaskAttributes historyTaskAttributes\n 40: optional SyncShardStatusTaskAttributes syncShardStatusTaskAttributes\n}\n\n" diff --git a/.gen/go/replicator/types.go b/.gen/go/replicator/types.go index 8c7b140e7a0..c7cf0ad557d 100644 --- a/.gen/go/replicator/types.go +++ b/.gen/go/replicator/types.go @@ -1153,9 +1153,10 @@ func (v *HistoryTaskAttributes) GetNewRunHistory() (o *shared.History) { } type ReplicationTask struct { - TaskType *ReplicationTaskType `json:"taskType,omitempty"` - DomainTaskAttributes *DomainTaskAttributes `json:"domainTaskAttributes,omitempty"` - HistoryTaskAttributes *HistoryTaskAttributes `json:"historyTaskAttributes,omitempty"` + TaskType *ReplicationTaskType `json:"taskType,omitempty"` + DomainTaskAttributes *DomainTaskAttributes `json:"domainTaskAttributes,omitempty"` + HistoryTaskAttributes *HistoryTaskAttributes `json:"historyTaskAttributes,omitempty"` + SyncShardStatusTaskAttributes *SyncShardStatusTaskAttributes `json:"syncShardStatusTaskAttributes,omitempty"` } // ToWire translates a ReplicationTask struct into a Thrift-level intermediate @@ -1175,7 +1176,7 @@ type ReplicationTask struct { // } func (v *ReplicationTask) ToWire() (wire.Value, error) { var ( - fields [3]wire.Field + fields [4]wire.Field i int = 0 w wire.Value err error @@ -1205,6 +1206,14 @@ func (v *ReplicationTask) ToWire() (wire.Value, error) { fields[i] = wire.Field{ID: 30, Value: w} i++ } + if v.SyncShardStatusTaskAttributes != nil { + w, err = v.SyncShardStatusTaskAttributes.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 40, Value: w} + i++ + } return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } @@ -1227,6 +1236,12 @@ func _HistoryTaskAttributes_Read(w wire.Value) (*HistoryTaskAttributes, error) { return &v, err } +func _SyncShardStatusTaskAttributes_Read(w wire.Value) (*SyncShardStatusTaskAttributes, error) { + var v SyncShardStatusTaskAttributes + err := v.FromWire(w) + return &v, err +} + // FromWire deserializes a ReplicationTask struct from its Thrift-level // representation. The Thrift-level representation may be obtained // from a ThriftRW protocol implementation. @@ -1274,6 +1289,14 @@ func (v *ReplicationTask) FromWire(w wire.Value) error { return err } + } + case 40: + if field.Value.Type() == wire.TStruct { + v.SyncShardStatusTaskAttributes, err = _SyncShardStatusTaskAttributes_Read(field.Value) + if err != nil { + return err + } + } } } @@ -1288,7 +1311,7 @@ func (v *ReplicationTask) String() string { return "" } - var fields [3]string + var fields [4]string i := 0 if v.TaskType != nil { fields[i] = fmt.Sprintf("TaskType: %v", *(v.TaskType)) @@ -1302,6 +1325,10 @@ func (v *ReplicationTask) String() string { fields[i] = fmt.Sprintf("HistoryTaskAttributes: %v", v.HistoryTaskAttributes) i++ } + if v.SyncShardStatusTaskAttributes != nil { + fields[i] = fmt.Sprintf("SyncShardStatusTaskAttributes: %v", v.SyncShardStatusTaskAttributes) + i++ + } return fmt.Sprintf("ReplicationTask{%v}", strings.Join(fields[:i], ", ")) } @@ -1330,6 +1357,9 @@ func (v *ReplicationTask) Equals(rhs *ReplicationTask) bool { if !((v.HistoryTaskAttributes == nil && rhs.HistoryTaskAttributes == nil) || (v.HistoryTaskAttributes != nil && rhs.HistoryTaskAttributes != nil && v.HistoryTaskAttributes.Equals(rhs.HistoryTaskAttributes))) { return false } + if !((v.SyncShardStatusTaskAttributes == nil && rhs.SyncShardStatusTaskAttributes == nil) || (v.SyncShardStatusTaskAttributes != nil && rhs.SyncShardStatusTaskAttributes != nil && v.SyncShardStatusTaskAttributes.Equals(rhs.SyncShardStatusTaskAttributes))) { + return false + } return true } @@ -1364,11 +1394,22 @@ func (v *ReplicationTask) GetHistoryTaskAttributes() (o *HistoryTaskAttributes) return } +// GetSyncShardStatusTaskAttributes returns the value of SyncShardStatusTaskAttributes if it is set or its +// zero value if it is unset. +func (v *ReplicationTask) GetSyncShardStatusTaskAttributes() (o *SyncShardStatusTaskAttributes) { + if v.SyncShardStatusTaskAttributes != nil { + return v.SyncShardStatusTaskAttributes + } + + return +} + type ReplicationTaskType int32 const ( - ReplicationTaskTypeDomain ReplicationTaskType = 0 - ReplicationTaskTypeHistory ReplicationTaskType = 1 + ReplicationTaskTypeDomain ReplicationTaskType = 0 + ReplicationTaskTypeHistory ReplicationTaskType = 1 + ReplicationTaskTypeSyncShardStatus ReplicationTaskType = 2 ) // ReplicationTaskType_Values returns all recognized values of ReplicationTaskType. @@ -1376,6 +1417,7 @@ func ReplicationTaskType_Values() []ReplicationTaskType { return []ReplicationTaskType{ ReplicationTaskTypeDomain, ReplicationTaskTypeHistory, + ReplicationTaskTypeSyncShardStatus, } } @@ -1392,6 +1434,9 @@ func (v *ReplicationTaskType) UnmarshalText(value []byte) error { case "History": *v = ReplicationTaskTypeHistory return nil + case "SyncShardStatus": + *v = ReplicationTaskTypeSyncShardStatus + return nil default: return fmt.Errorf("unknown enum value %q for %q", value, "ReplicationTaskType") } @@ -1409,6 +1454,8 @@ func (v ReplicationTaskType) MarshalText() ([]byte, error) { return []byte("Domain"), nil case 1: return []byte("History"), nil + case 2: + return []byte("SyncShardStatus"), nil } return []byte(strconv.FormatInt(int64(v), 10)), nil } @@ -1453,6 +1500,8 @@ func (v ReplicationTaskType) String() string { return "Domain" case 1: return "History" + case 2: + return "SyncShardStatus" } return fmt.Sprintf("ReplicationTaskType(%d)", w) } @@ -1475,6 +1524,8 @@ func (v ReplicationTaskType) MarshalJSON() ([]byte, error) { return ([]byte)("\"Domain\""), nil case 1: return ([]byte)("\"History\""), nil + case 2: + return ([]byte)("\"SyncShardStatus\""), nil } return ([]byte)(strconv.FormatInt(int64(v), 10)), nil } @@ -1514,3 +1565,191 @@ func (v *ReplicationTaskType) UnmarshalJSON(text []byte) error { return fmt.Errorf("invalid JSON value %q (%T) to unmarshal into %q", t, t, "ReplicationTaskType") } } + +type SyncShardStatusTaskAttributes struct { + SourceCluster *string `json:"sourceCluster,omitempty"` + ShardId *int64 `json:"shardId,omitempty"` + Timestamp *int64 `json:"timestamp,omitempty"` +} + +// ToWire translates a SyncShardStatusTaskAttributes struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *SyncShardStatusTaskAttributes) ToWire() (wire.Value, error) { + var ( + fields [3]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.SourceCluster != nil { + w, err = wire.NewValueString(*(v.SourceCluster)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 10, Value: w} + i++ + } + if v.ShardId != nil { + w, err = wire.NewValueI64(*(v.ShardId)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 20, Value: w} + i++ + } + if v.Timestamp != nil { + w, err = wire.NewValueI64(*(v.Timestamp)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 30, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +// FromWire deserializes a SyncShardStatusTaskAttributes struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a SyncShardStatusTaskAttributes struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v SyncShardStatusTaskAttributes +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *SyncShardStatusTaskAttributes) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 10: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.SourceCluster = &x + if err != nil { + return err + } + + } + case 20: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.ShardId = &x + if err != nil { + return err + } + + } + case 30: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.Timestamp = &x + if err != nil { + return err + } + + } + } + } + + return nil +} + +// String returns a readable string representation of a SyncShardStatusTaskAttributes +// struct. +func (v *SyncShardStatusTaskAttributes) String() string { + if v == nil { + return "" + } + + var fields [3]string + i := 0 + if v.SourceCluster != nil { + fields[i] = fmt.Sprintf("SourceCluster: %v", *(v.SourceCluster)) + i++ + } + if v.ShardId != nil { + fields[i] = fmt.Sprintf("ShardId: %v", *(v.ShardId)) + i++ + } + if v.Timestamp != nil { + fields[i] = fmt.Sprintf("Timestamp: %v", *(v.Timestamp)) + i++ + } + + return fmt.Sprintf("SyncShardStatusTaskAttributes{%v}", strings.Join(fields[:i], ", ")) +} + +// Equals returns true if all the fields of this SyncShardStatusTaskAttributes match the +// provided SyncShardStatusTaskAttributes. +// +// This function performs a deep comparison. +func (v *SyncShardStatusTaskAttributes) Equals(rhs *SyncShardStatusTaskAttributes) bool { + if !_String_EqualsPtr(v.SourceCluster, rhs.SourceCluster) { + return false + } + if !_I64_EqualsPtr(v.ShardId, rhs.ShardId) { + return false + } + if !_I64_EqualsPtr(v.Timestamp, rhs.Timestamp) { + return false + } + + return true +} + +// GetSourceCluster returns the value of SourceCluster if it is set or its +// zero value if it is unset. +func (v *SyncShardStatusTaskAttributes) GetSourceCluster() (o string) { + if v.SourceCluster != nil { + return *v.SourceCluster + } + + return +} + +// GetShardId returns the value of ShardId if it is set or its +// zero value if it is unset. +func (v *SyncShardStatusTaskAttributes) GetShardId() (o int64) { + if v.ShardId != nil { + return *v.ShardId + } + + return +} + +// GetTimestamp returns the value of Timestamp if it is set or its +// zero value if it is unset. +func (v *SyncShardStatusTaskAttributes) GetTimestamp() (o int64) { + if v.Timestamp != nil { + return *v.Timestamp + } + + return +} diff --git a/client/history/client.go b/client/history/client.go index 381d0f2f69f..e7ce7353fee 100644 --- a/client/history/client.go +++ b/client/history/client.go @@ -562,6 +562,28 @@ func (c *clientImpl) ReplicateEvents( return err } +func (c *clientImpl) SyncShardStatus( + ctx context.Context, + request *h.SyncShardStatusRequest, + opts ...yarpc.CallOption) error { + + // we do not have a workflow ID here, instead, we have something even better + host, err := c.resolver.Lookup(string(request.GetShardId())) + if err != nil { + return err + } + client := c.getThriftClient(host.GetAddress()) + + opts = common.AggregateYarpcOptions(ctx, opts...) + op := func(ctx context.Context, client historyserviceclient.Interface) error { + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.SyncShardStatus(ctx, request, opts...) + } + err = c.executeWithRedirect(ctx, client, op) + return err +} + func (c *clientImpl) getHostForRequest(workflowID string) (historyserviceclient.Interface, error) { key := common.WorkflowIDToHistoryShard(workflowID, c.numberOfShards) host, err := c.resolver.Lookup(string(key)) diff --git a/client/history/metricClient.go b/client/history/metricClient.go index f6b4cb5adc1..048f0034027 100644 --- a/client/history/metricClient.go +++ b/client/history/metricClient.go @@ -401,3 +401,20 @@ func (c *metricClient) ReplicateEvents( return err } + +func (c *metricClient) SyncShardStatus( + context context.Context, + request *h.SyncShardStatusRequest, + opts ...yarpc.CallOption) error { + c.metricsClient.IncCounter(metrics.HistoryClientSyncShardStatusScope, metrics.CadenceRequests) + + sw := c.metricsClient.StartTimer(metrics.HistoryClientSyncShardStatusScope, metrics.CadenceLatency) + err := c.client.SyncShardStatus(context, request, opts...) + sw.Stop() + + if err != nil { + c.metricsClient.IncCounter(metrics.HistoryClientSyncShardStatusScope, metrics.HistoryClientFailures) + } + + return err +} diff --git a/client/history/retryableClient.go b/client/history/retryableClient.go index c280687f523..0a5036fec15 100644 --- a/client/history/retryableClient.go +++ b/client/history/retryableClient.go @@ -353,3 +353,15 @@ func (c *retryableClient) ReplicateEvents( return backoff.Retry(op, c.policy, c.isRetryable) } + +func (c *retryableClient) SyncShardStatus( + ctx context.Context, + request *h.SyncShardStatusRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.SyncShardStatus(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 215d395dc89..1feac971cb8 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -232,6 +232,8 @@ const ( HistoryClientRecordChildExecutionCompletedScope // HistoryClientReplicateEventsScope tracks RPC calls to history service HistoryClientReplicateEventsScope + // HistoryClientSyncShardStatusScope tracks RPC calls to history service + HistoryClientSyncShardStatusScope // MatchingClientPollForDecisionTaskScope tracks RPC calls to matching service MatchingClientPollForDecisionTaskScope // MatchingClientPollForActivityTaskScope tracks RPC calls to matching service @@ -362,6 +364,8 @@ const ( HistoryRequestCancelWorkflowExecutionScope // HistoryReplicateEventsScope tracks ReplicateEvents API calls received by service HistoryReplicateEventsScope + // HistorySyncShardStatusScope tracks ReplicateEvents API calls received by service + HistorySyncShardStatusScope // HistoryShardControllerScope is the scope used by shard controller HistoryShardControllerScope // TransferQueueProcessorScope is the scope used by all metric emitted by transfer queue processor @@ -468,6 +472,8 @@ const ( DomainReplicationTaskScope // HistoryReplicationTaskScope is the scope used by history task replication processing HistoryReplicationTaskScope + // SyncShardTaskScope is the scope used by sync shrad information processing + SyncShardTaskScope NumWorkerScopes ) @@ -537,6 +543,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ HistoryClientScheduleDecisionTaskScope: {operation: "HistoryClientScheduleDecisionTask"}, HistoryClientRecordChildExecutionCompletedScope: {operation: "HistoryClientRecordChildExecutionCompleted"}, HistoryClientReplicateEventsScope: {operation: "HistoryClientReplicateEvents"}, + HistoryClientSyncShardStatusScope: {operation: "HistoryClientSyncShardStatusScope"}, MatchingClientPollForDecisionTaskScope: {operation: "MatchingClientPollForDecisionTask"}, MatchingClientPollForActivityTaskScope: {operation: "MatchingClientPollForActivityTask"}, MatchingClientAddActivityTaskScope: {operation: "MatchingClientAddActivityTask"}, @@ -601,6 +608,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ HistoryRecordChildExecutionCompletedScope: {operation: "RecordChildExecutionCompleted"}, HistoryRequestCancelWorkflowExecutionScope: {operation: "RequestCancelWorkflowExecution"}, HistoryReplicateEventsScope: {operation: "ReplicateEvents"}, + HistorySyncShardStatusScope: {operation: "SyncShardStatus"}, HistoryShardControllerScope: {operation: "ShardController"}, TransferQueueProcessorScope: {operation: "TransferQueueProcessor"}, TransferActiveQueueProcessorScope: {operation: "TransferActiveQueueProcessor"}, @@ -654,6 +662,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ ReplicatorScope: {operation: "Replicator"}, DomainReplicationTaskScope: {operation: "DomainReplicationTask"}, HistoryReplicationTaskScope: {operation: "HistoryReplicationTask"}, + SyncShardTaskScope: {operation: "SyncShardTask"}, }, } diff --git a/common/mocks/HistoryClient.go b/common/mocks/HistoryClient.go index 651ba3b9317..58fd95beb0a 100644 --- a/common/mocks/HistoryClient.go +++ b/common/mocks/HistoryClient.go @@ -446,3 +446,17 @@ func (_m *HistoryClient) ReplicateEvents(ctx context.Context, request *history.R return r0 } + +// SyncShardStatus provides a mock function with given fields: ctx, request +func (_m *HistoryClient) SyncShardStatus(ctx context.Context, request *history.SyncShardStatusRequest, opts ...yarpc.CallOption) error { + ret := _m.Called(ctx, request) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *history.SyncShardStatusRequest) error); ok { + r0 = rf(ctx, request) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 2118bbdd34f..02e5b7ec438 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -139,8 +139,8 @@ type ( StolenSinceRenew int UpdatedAt time.Time ReplicationAckLevel int64 - TransferAckLevel int64 // TO BE DEPRECATED IN FAVOR OF ClusterTransferAckLevel - TimerAckLevel time.Time // TO BE DEPRECATED IN FAVOR OF ClusterTimerAckLevel + TransferAckLevel int64 + TimerAckLevel time.Time ClusterTransferAckLevel map[string]int64 ClusterTimerAckLevel map[string]time.Time DomainNotificationVersion int64 diff --git a/idl/github.com/uber/cadence/history.thrift b/idl/github.com/uber/cadence/history.thrift index 5a17a83c3f1..828938c6323 100644 --- a/idl/github.com/uber/cadence/history.thrift +++ b/idl/github.com/uber/cadence/history.thrift @@ -228,6 +228,12 @@ struct ReplicateEventsRequest { 100: optional bool forceBufferEvents } +struct SyncShardStatusRequest { + 10: optional string sourceCluster + 20: optional i64 (js.type = "Long") shardId + 30: optional i64 (js.type = "Long") timestamp +} + /** * HistoryService provides API to start a new long running workflow instance, as well as query and update the history * of workflow instances already created. @@ -540,6 +546,17 @@ service HistoryService { 6: shared.RetryTaskError retryTaskError, ) + /** + * SyncShardStatus sync the status between shards + **/ + void SyncShardStatus(1: SyncShardStatusRequest syncShardStatusRequest) + throws ( + 1: shared.BadRequestError badRequestError, + 2: shared.InternalServiceError internalServiceError, + 4: ShardOwnershipLostError shardOwnershipLostError, + 5: shared.LimitExceededError limitExceededError, + ) + /** * DescribeMutableState returns information about the internal states of workflow mutable state. **/ diff --git a/idl/github.com/uber/cadence/replicator.thrift b/idl/github.com/uber/cadence/replicator.thrift index adf860550bf..6bcdc0a2206 100644 --- a/idl/github.com/uber/cadence/replicator.thrift +++ b/idl/github.com/uber/cadence/replicator.thrift @@ -26,6 +26,7 @@ include "history.thrift" enum ReplicationTaskType { Domain History + SyncShardStatus } enum DomainOperation { @@ -56,9 +57,16 @@ struct HistoryTaskAttributes { 90: optional shared.History newRunHistory } +struct SyncShardStatusTaskAttributes { + 10: optional string sourceCluster + 20: optional i64 (js.type = "Long") shardId + 30: optional i64 (js.type = "Long") timestamp +} + struct ReplicationTask { 10: optional ReplicationTaskType taskType 20: optional DomainTaskAttributes domainTaskAttributes 30: optional HistoryTaskAttributes historyTaskAttributes + 40: optional SyncShardStatusTaskAttributes syncShardStatusTaskAttributes } diff --git a/service/history/MockHistoryEngine.go b/service/history/MockHistoryEngine.go index 3131c05e2d5..3d9de9f156c 100644 --- a/service/history/MockHistoryEngine.go +++ b/service/history/MockHistoryEngine.go @@ -428,4 +428,18 @@ func (_m *MockHistoryEngine) ReplicateEvents(request *gohistory.ReplicateEventsR return r0 } +// SyncShardStatus is mock implementation for SyncShardStatus of HistoryEngine +func (_m *MockHistoryEngine) SyncShardStatus(ctx context.Context, request *gohistory.SyncShardStatusRequest) error { + ret := _m.Called(request) + + var r0 error + if rf, ok := ret.Get(0).(func(*gohistory.SyncShardStatusRequest) error); ok { + r0 = rf(request) + } else { + r0 = ret.Error(0) + } + + return r0 +} + var _ Engine = (*MockHistoryEngine)(nil) diff --git a/service/history/handler.go b/service/history/handler.go index b09e7b8bb1b..e71f50b5d03 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -76,6 +76,9 @@ var ( errTaskListNotSet = &gen.BadRequestError{Message: "Tasklist not set."} errWorkflowIDNotSet = &gen.BadRequestError{Message: "WorkflowId is not set on request."} errRunIDNotValid = &gen.BadRequestError{Message: "RunID is not valid UUID."} + errSourceClusterNotSet = &gen.BadRequestError{Message: "Source Cluster not set on request."} + errShardIDNotSet = &gen.BadRequestError{Message: "Shard ID not set on request."} + errTimestampNotSet = &gen.BadRequestError{Message: "Timestamp not set on request."} ) // NewHandler creates a thrift handler for the history service @@ -133,7 +136,7 @@ func (h *Handler) Start() error { h.domainCache = cache.NewDomainCache(h.metadataMgr, h.GetClusterMetadata(), h.GetMetricsClient(), h.GetLogger()) h.domainCache.Start() h.controller = newShardController(h.Service, h.GetHostInfo(), hServiceResolver, h.shardManager, h.historyMgr, - h.domainCache, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient()) + h.domainCache, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient(), h.publisher) h.metricsClient = h.GetMetricsClient() h.historyEventNotifier = newHistoryEventNotifier(h.GetMetricsClient(), h.config.GetShardID) // events notifier must starts before controller @@ -924,6 +927,42 @@ func (h *Handler) ReplicateEvents(ctx context.Context, replicateRequest *hist.Re return nil } +// SyncShardStatus is called by processor to sync history shrad information from another cluster +func (h *Handler) SyncShardStatus(ctx context.Context, syncShardStatusRequest *hist.SyncShardStatusRequest) error { + h.startWG.Wait() + + h.metricsClient.IncCounter(metrics.HistorySyncShardStatusScope, metrics.CadenceRequests) + sw := h.metricsClient.StartTimer(metrics.HistorySyncShardStatusScope, metrics.CadenceLatency) + defer sw.Stop() + + if syncShardStatusRequest.SourceCluster == nil { + return errSourceClusterNotSet + } + + if syncShardStatusRequest.ShardId == nil { + return errShardIDNotSet + } + + if syncShardStatusRequest.Timestamp == nil { + return errTimestampNotSet + } + + // shard ID is already provided in the request + engine, err := h.controller.getEngineForShard(int(syncShardStatusRequest.GetShardId())) + if err != nil { + h.updateErrorMetric(metrics.HistoryReplicateEventsScope, err) + return err + } + + err = engine.SyncShardStatus(ctx, syncShardStatusRequest) + if err != nil { + h.updateErrorMetric(metrics.HistorySyncShardStatusScope, h.convertError(err)) + return h.convertError(err) + } + + return nil +} + // convertError is a helper method to convert ShardOwnershipLostError from persistence layer returned by various // HistoryEngine API calls to ShardOwnershipLost error return by HistoryService for client to be redirected to the // correct shard. diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 560730e95e0..eb315b1ffc9 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -2194,6 +2194,20 @@ func (e *historyEngineImpl) ReplicateEvents(replicateRequest *h.ReplicateEventsR return e.replicator.ApplyEvents(replicateRequest) } +func (e *historyEngineImpl) SyncShardStatus(ctx context.Context, request *h.SyncShardStatusRequest) error { + clusterName := request.GetSourceCluster() + now := time.Unix(0, request.GetTimestamp()) + + // here there are 3 main things + // 1. update the view of remote cluster's shard time + // 2. notify the timer gate in the timer queue standby processor + // 3, notify the transfer (essentially a no op, just put it here so it looks symmetric) + e.shard.SetCurrentTime(clusterName, now) + e.txProcessor.NotifyNewTask(clusterName, now, []persistence.Task{}) + e.timerProcessor.NotifyNewTimers(clusterName, now, []persistence.Task{}) + return nil +} + type updateWorkflowAction struct { deleteWorkflow bool createDecision bool diff --git a/service/history/historyEngineInterfaces.go b/service/history/historyEngineInterfaces.go index 725e3a11606..7cf3f856f2c 100644 --- a/service/history/historyEngineInterfaces.go +++ b/service/history/historyEngineInterfaces.go @@ -73,6 +73,7 @@ type ( ScheduleDecisionTask(ctx context.Context, request *h.ScheduleDecisionTaskRequest) error RecordChildExecutionCompleted(ctx context.Context, request *h.RecordChildExecutionCompletedRequest) error ReplicateEvents(request *h.ReplicateEventsRequest) error + SyncShardStatus(ctx context.Context, request *h.SyncShardStatusRequest) error } // EngineFactory is used to create an instance of sharded history engine diff --git a/service/history/queueAckMgr.go b/service/history/queueAckMgr.go index 4dddfc5b4d1..4ef27b71c8b 100644 --- a/service/history/queueAckMgr.go +++ b/service/history/queueAckMgr.go @@ -188,7 +188,6 @@ MoveAckLevelLoop: break MoveAckLevelLoop } } - updateShard := a.ackLevel != ackLevel a.ackLevel = ackLevel if a.isFailover && a.isReadFinished && len(a.outstandingTasks) == 0 { @@ -199,14 +198,12 @@ MoveAckLevelLoop: } a.Unlock() - if updateShard { - if !a.isFailover { - if err := a.processor.updateAckLevel(ackLevel); err != nil { - a.metricsClient.IncCounter(a.options.MetricScope, metrics.AckLevelUpdateFailedCounter) - logging.LogOperationFailedEvent(a.logger, "Error updating ack level for shard", err) - } - } else { - // TODO deal with failover ack level persistence, issue #646 + if !a.isFailover { + if err := a.processor.updateAckLevel(ackLevel); err != nil { + a.metricsClient.IncCounter(a.options.MetricScope, metrics.AckLevelUpdateFailedCounter) + logging.LogOperationFailedEvent(a.logger, "Error updating ack level for shard", err) } + } else { + // TODO deal with failover ack level persistence, issue #646 } } diff --git a/service/history/queueAckMgr_test.go b/service/history/queueAckMgr_test.go index e142396c1b9..009cd23fff0 100644 --- a/service/history/queueAckMgr_test.go +++ b/service/history/queueAckMgr_test.go @@ -284,6 +284,7 @@ func (s *queueAckMgrSuite) TestReadCompleteUpdateTimerTasks() { s.queueAckMgr.updateQueueAckLevel() s.Equal(taskID1, s.queueAckMgr.getQueueAckLevel()) + s.mockProcessor.On("updateAckLevel", taskID1).Return(nil).Once() s.mockProcessor.On("completeTask", taskID3).Return(nil).Once() s.queueAckMgr.completeQueueTask(taskID3) s.queueAckMgr.updateQueueAckLevel() diff --git a/service/history/shardContext.go b/service/history/shardContext.go index 6dbd77e6f72..94f186d794d 100644 --- a/service/history/shardContext.go +++ b/service/history/shardContext.go @@ -27,7 +27,9 @@ import ( "time" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/metrics" @@ -76,6 +78,7 @@ type ( shardContextImpl struct { shardID int + currentCluster string service service.Service rangeID int64 shardManager persistence.ShardManager @@ -87,6 +90,7 @@ type ( config *Config logger bark.Logger metricsClient metrics.Client + messageProducer messaging.Producer sync.RWMutex lastUpdated time.Time @@ -590,6 +594,19 @@ func (s *shardContextImpl) updateShardInfoLocked() error { } updatedShardInfo := copyShardInfo(s.shardInfo) + if s.messageProducer != nil { + syncStatusTask := &replicator.ReplicationTask{ + TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeSyncShardStatus), + SyncShardStatusTaskAttributes: &replicator.SyncShardStatusTaskAttributes{ + SourceCluster: common.StringPtr(s.currentCluster), + ShardId: common.Int64Ptr(int64(s.shardID)), + Timestamp: common.Int64Ptr(now.UnixNano()), + }, + } + // ignore the error + s.messageProducer.Publish(syncStatusTask) + } + err := s.shardManager.UpdateShard(&persistence.UpdateShardRequest{ ShardInfo: updatedShardInfo, PreviousRangeID: s.shardInfo.RangeID, @@ -660,7 +677,8 @@ func (s *shardContextImpl) GetCurrentTime(cluster string) time.Time { // TODO: This method has too many parameters. Clean it up. Maybe create a struct to pass in as parameter. func acquireShard(shardID int, svc service.Service, shardManager persistence.ShardManager, historyMgr persistence.HistoryManager, executionMgr persistence.ExecutionManager, domainCache cache.DomainCache, - owner string, closeCh chan<- int, config *Config, logger bark.Logger, metricsClient metrics.Client) (ShardContext, + owner string, closeCh chan<- int, config *Config, logger bark.Logger, + metricsClient metrics.Client, messageProducer messaging.Producer) (ShardContext, error) { response, err0 := shardManager.GetShard(&persistence.GetShardRequest{ShardID: shardID}) if err0 != nil { @@ -685,6 +703,7 @@ func acquireShard(shardID int, svc service.Service, shardManager persistence.Sha context := &shardContextImpl{ shardID: shardID, + currentCluster: svc.GetClusterMetadata().GetCurrentClusterName(), service: svc, shardManager: shardManager, historyMgr: historyMgr, @@ -693,6 +712,7 @@ func acquireShard(shardID int, svc service.Service, shardManager persistence.Sha shardInfo: updatedShardInfo, closeCh: closeCh, metricsClient: metricsClient, + messageProducer: messageProducer, config: config, standbyClusterCurrentTime: standbyClusterCurrentTime, } diff --git a/service/history/shardController.go b/service/history/shardController.go index 983faa8dc6d..2abb5b90bcf 100644 --- a/service/history/shardController.go +++ b/service/history/shardController.go @@ -27,6 +27,7 @@ import ( "time" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/messaging" "github.com/uber-common/bark" @@ -61,6 +62,7 @@ type ( logger bark.Logger config *Config metricsClient metrics.Client + messageProducer messaging.Producer sync.RWMutex historyShards map[int]*historyShardsItem @@ -69,25 +71,26 @@ type ( historyShardsItem struct { sync.RWMutex - shardID int - service service.Service - shardMgr persistence.ShardManager - historyMgr persistence.HistoryManager - executionMgr persistence.ExecutionManager - domainCache cache.DomainCache - engineFactory EngineFactory - host *membership.HostInfo - engine Engine - config *Config - logger bark.Logger - metricsClient metrics.Client + shardID int + service service.Service + shardMgr persistence.ShardManager + historyMgr persistence.HistoryManager + executionMgr persistence.ExecutionManager + domainCache cache.DomainCache + engineFactory EngineFactory + host *membership.HostInfo + engine Engine + config *Config + logger bark.Logger + metricsClient metrics.Client + messageProducer messaging.Producer } ) func newShardController(svc service.Service, host *membership.HostInfo, resolver membership.ServiceResolver, shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager, domainCache cache.DomainCache, executionMgrFactory persistence.ExecutionManagerFactory, factory EngineFactory, - config *Config, logger bark.Logger, metricsClient metrics.Client) *shardController { + config *Config, logger bark.Logger, metricsClient metrics.Client, messageProducer messaging.Producer) *shardController { logger = logger.WithFields(bark.Fields{ logging.TagWorkflowComponent: logging.TagValueShardController, }) @@ -107,13 +110,14 @@ func newShardController(svc service.Service, host *membership.HostInfo, resolver logger: logger, config: config, metricsClient: metricsClient, + messageProducer: messageProducer, } } func newHistoryShardsItem(shardID int, svc service.Service, shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager, domainCache cache.DomainCache, executionMgrFactory persistence.ExecutionManagerFactory, factory EngineFactory, host *membership.HostInfo, - config *Config, logger bark.Logger, metricsClient metrics.Client) (*historyShardsItem, error) { + config *Config, logger bark.Logger, metricsClient metrics.Client, messageProducer messaging.Producer) (*historyShardsItem, error) { executionMgr, err := executionMgrFactory.CreateExecutionManager(shardID) if err != nil { @@ -133,7 +137,8 @@ func newHistoryShardsItem(shardID int, svc service.Service, shardMgr persistence logger: logger.WithFields(bark.Fields{ logging.TagHistoryShardID: shardID, }), - metricsClient: metricsClient, + metricsClient: metricsClient, + messageProducer: messageProducer, }, nil } @@ -223,7 +228,7 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar if info.Identity() == c.host.Identity() { shardItem, err := newHistoryShardsItem(shardID, c.service, c.shardMgr, c.historyMgr, c.domainCache, - c.executionMgrFactory, c.engineFactory, c.host, c.config, c.logger, c.metricsClient) + c.executionMgrFactory, c.engineFactory, c.host, c.config, c.logger, c.metricsClient, c.messageProducer) if err != nil { return nil, err } @@ -393,7 +398,7 @@ func (i *historyShardsItem) getOrCreateEngine(shardClosedCh chan<- int) (Engine, logging.LogShardEngineCreatingEvent(i.logger, i.host.Identity(), i.shardID) context, err := acquireShard(i.shardID, i.service, i.shardMgr, i.historyMgr, i.executionMgr, i.domainCache, - i.host.Identity(), shardClosedCh, i.config, i.logger, i.metricsClient) + i.host.Identity(), shardClosedCh, i.config, i.logger, i.metricsClient, i.messageProducer) if err != nil { return nil, err } diff --git a/service/history/shardController_test.go b/service/history/shardController_test.go index d26b4f2575f..5955eb89355 100644 --- a/service/history/shardController_test.go +++ b/service/history/shardController_test.go @@ -91,7 +91,7 @@ func (s *shardControllerSuite) SetupTest() { s.domainCache = cache.NewDomainCache(s.mockMetadaraMgr, s.mockClusterMetadata, s.metricsClient, s.logger) s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, - s.metricsClient) + s.metricsClient, s.mockMessaging) } func (s *shardControllerSuite) TearDownTest() { @@ -345,7 +345,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() { numShards := 4 s.config.NumberOfShards = numShards s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, - s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) + s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient, s.mockMessaging) historyEngines := make(map[int]*MockHistoryEngine) for shardID := 0; shardID < numShards; shardID++ { mockEngine := &MockHistoryEngine{} @@ -438,7 +438,7 @@ func (s *shardControllerSuite) TestRingUpdated() { numShards := 4 s.config.NumberOfShards = numShards s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, - s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) + s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient, s.mockMessaging) historyEngines := make(map[int]*MockHistoryEngine) for shardID := 0; shardID < numShards; shardID++ { mockEngine := &MockHistoryEngine{} @@ -518,7 +518,7 @@ func (s *shardControllerSuite) TestShardControllerClosed() { numShards := 4 s.config.NumberOfShards = numShards s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, - s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) + s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient, s.mockMessaging) historyEngines := make(map[int]*MockHistoryEngine) for shardID := 0; shardID < numShards; shardID++ { mockEngine := &MockHistoryEngine{} diff --git a/service/history/timerQueueAckMgr.go b/service/history/timerQueueAckMgr.go index 57681cdd694..b61716f1111 100644 --- a/service/history/timerQueueAckMgr.go +++ b/service/history/timerQueueAckMgr.go @@ -269,7 +269,6 @@ MoveAckLevelLoop: break MoveAckLevelLoop } } - updateShard := t.ackLevel != ackLevel t.ackLevel = ackLevel if t.isFailover && t.isReadFinished && len(outstandingTasks) == 0 { @@ -280,9 +279,7 @@ MoveAckLevelLoop: } t.Unlock() - if updateShard { - t.updateTimerAckLevel(ackLevel) - } + t.updateTimerAckLevel(ackLevel) } // this function does not take cluster name as parameter, due to we only have one timer queue on Cassandra diff --git a/service/history/timerQueueAckMgr_test.go b/service/history/timerQueueAckMgr_test.go index ac026738b56..e51228414e2 100644 --- a/service/history/timerQueueAckMgr_test.go +++ b/service/history/timerQueueAckMgr_test.go @@ -517,7 +517,7 @@ func (s *timerQueueAckMgrSuite) TestReadCompleteUpdateTimerTasks() { s.timerQueueAckMgr.updateAckLevel() s.Equal(timer1.VisibilityTimestamp, s.mockShard.GetTimerClusterAckLevel(s.clusterName)) - // there will be no call to update shard + s.mockShardMgr.On("UpdateShard", mock.Anything).Return(nil).Once() timerSequenceID3 := TimerSequenceID{VisibilityTimestamp: timer3.VisibilityTimestamp, TaskID: timer3.TaskID} s.timerQueueAckMgr.completeTimerTask(timer3) s.True(s.timerQueueAckMgr.outstandingTasks[timerSequenceID3]) diff --git a/service/worker/processor.go b/service/worker/processor.go index 05d8933f031..46589c00a54 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -287,6 +287,9 @@ func (p *replicationTaskProcessor) process(msg kafka.Message, inRetry bool) erro case replicator.ReplicationTaskTypeDomain: scope = metrics.DomainReplicationTaskScope err = p.handleDomainReplicationTask(task) + case replicator.ReplicationTaskTypeSyncShardStatus: + scope = metrics.SyncShardTaskScope + err = p.handleSyncShardTask(task) case replicator.ReplicationTaskTypeHistory: scope = metrics.HistoryReplicationTaskScope err = p.handleHistoryReplicationTask(task, inRetry) @@ -310,6 +313,24 @@ func (p *replicationTaskProcessor) handleDomainReplicationTask(task *replicator. return p.domainReplicator.HandleReceivingTask(task.DomainTaskAttributes) } +func (p *replicationTaskProcessor) handleSyncShardTask(task *replicator.ReplicationTask) error { + p.metricsClient.IncCounter(metrics.SyncShardTaskScope, metrics.ReplicatorMessages) + sw := p.metricsClient.StartTimer(metrics.SyncShardTaskScope, metrics.ReplicatorLatency) + defer sw.Stop() + + attr := task.SyncShardStatusTaskAttributes + p.logger.Debugf("Received sync shard task %v.", attr) + + req := &h.SyncShardStatusRequest{ + SourceCluster: attr.SourceCluster, + ShardId: attr.ShardId, + Timestamp: attr.Timestamp, + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + return p.historyClient.SyncShardStatus(ctx, req) +} + func (p *replicationTaskProcessor) handleHistoryReplicationTask(task *replicator.ReplicationTask, inRetry bool) error { p.metricsClient.IncCounter(metrics.HistoryReplicationTaskScope, metrics.ReplicatorMessages) sw := p.metricsClient.StartTimer(metrics.HistoryReplicationTaskScope, metrics.ReplicatorLatency)