From 418b467363950ebff743ecf61514a5986b11129e Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Thu, 26 Sep 2024 13:03:01 +0200 Subject: [PATCH 1/2] [Wf-Diagnostics] introduce emitter interface in w/f diagnostics --- .../worker/diagnostics/analytics/emitter.go | 74 ++++++++++++++ .../diagnostics/analytics/emitter_test.go | 97 +++++++++++++++++++ .../worker/diagnostics/analytics/interface.go | 47 +++++++++ service/worker/diagnostics/analytics/types.go | 37 +++++++ 4 files changed, 255 insertions(+) create mode 100644 service/worker/diagnostics/analytics/emitter.go create mode 100644 service/worker/diagnostics/analytics/emitter_test.go create mode 100644 service/worker/diagnostics/analytics/interface.go create mode 100644 service/worker/diagnostics/analytics/types.go diff --git a/service/worker/diagnostics/analytics/emitter.go b/service/worker/diagnostics/analytics/emitter.go new file mode 100644 index 00000000000..9fb12b0d049 --- /dev/null +++ b/service/worker/diagnostics/analytics/emitter.go @@ -0,0 +1,74 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package analytics + +import ( + "context" + "encoding/json" + + "github.com/uber/cadence/.gen/go/indexer" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/messaging" +) + +type Emitter DataEmitter + +type emitter struct { + producer messaging.Producer +} + +type NewEmitterParams struct { + Producer messaging.Producer +} + +func NewEmitter(p NewEmitterParams) DataEmitter { + return &emitter{ + producer: p.Producer, + } +} + +func (et *emitter) EmitUsageData(ctx context.Context, data WfDiagnosticsUsageData) error { + msg := make(map[string]interface{}) + msg[Domain] = data.Domain + msg[WorkflowID] = data.WorkflowID + msg[RunID] = data.RunID + msg[Identity] = data.Identity + msg[SatisfactionFeedback] = data.SatisfactionFeedback + msg[IssueType] = data.IssueType + msg[DiagnosticsWfID] = data.DiagnosticsWorkflowID + msg[DiagnosticsWfRunID] = data.DiagnosticsRunID + msg[Environment] = data.Environment + msg[DiagnosticsStartTime] = data.DiagnosticsStartTime + msg[DiagnosticsEndTime] = data.DiagnosticsEndTime + + serializedMsg, err := json.Marshal(msg) + if err != nil { + return err + } + + pinotMsg := &indexer.PinotMessage{ + WorkflowID: common.StringPtr(data.DiagnosticsWorkflowID), + Payload: serializedMsg, + } + return et.producer.Publish(ctx, pinotMsg) +} diff --git a/service/worker/diagnostics/analytics/emitter_test.go b/service/worker/diagnostics/analytics/emitter_test.go new file mode 100644 index 00000000000..e392a85e634 --- /dev/null +++ b/service/worker/diagnostics/analytics/emitter_test.go @@ -0,0 +1,97 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package analytics + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/.gen/go/indexer" + "github.com/uber/cadence/common/mocks" +) + +func Test__EmitUsageData(t *testing.T) { + + testdata := WfDiagnosticsUsageData{ + Domain: "test-domain", + WorkflowID: "wid", + RunID: "rid", + Identity: "test@uber.com", + IssueType: "timeouts", + DiagnosticsWorkflowID: "diagnostics-wid", + DiagnosticsRunID: "diagnostics-rid", + Environment: "test-env", + DiagnosticsStartTime: time.Now(), + DiagnosticsEndTime: time.Now().Add(1 * time.Minute), + SatisfactionFeedback: false, + } + mockErr := errors.New("mockErr") + tests := map[string]struct { + data WfDiagnosticsUsageData + producerMockAffordance func(mockProducer *mocks.KafkaProducer) + expectedError error + }{ + "Case1: normal case": { + data: testdata, + producerMockAffordance: func(mockProducer *mocks.KafkaProducer) { + mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.PinotMessage) bool { + require.Equal(t, testdata.DiagnosticsWorkflowID, input.GetWorkflowID()) + return true + })).Return(nil).Once() + }, + expectedError: nil, + }, + "Case1: error case": { + data: testdata, + producerMockAffordance: func(mockProducer *mocks.KafkaProducer) { + mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.PinotMessage) bool { + require.Equal(t, testdata.DiagnosticsWorkflowID, input.GetWorkflowID()) + return true + })).Return(mockErr).Once() + }, + expectedError: mockErr, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mockProducer := &mocks.KafkaProducer{} + emitter := NewEmitter(NewEmitterParams{ + Producer: mockProducer, + }) + test.producerMockAffordance(mockProducer) + + err := emitter.EmitUsageData(context.Background(), test.data) + if test.expectedError != nil { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/service/worker/diagnostics/analytics/interface.go b/service/worker/diagnostics/analytics/interface.go new file mode 100644 index 00000000000..7564e97650a --- /dev/null +++ b/service/worker/diagnostics/analytics/interface.go @@ -0,0 +1,47 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package analytics + +import ( + "context" + "time" +) + +type WfDiagnosticsUsageData struct { + Domain string + WorkflowID string + RunID string + Identity string + IssueType string + DiagnosticsWorkflowID string + DiagnosticsRunID string + Environment string + DiagnosticsStartTime time.Time + DiagnosticsEndTime time.Time + SatisfactionFeedback bool +} + +// DataEmitter is the interface to emit workflow diagnostics data +type DataEmitter interface { + EmitUsageData(context.Context, WfDiagnosticsUsageData) error +} diff --git a/service/worker/diagnostics/analytics/types.go b/service/worker/diagnostics/analytics/types.go new file mode 100644 index 00000000000..433594a57b3 --- /dev/null +++ b/service/worker/diagnostics/analytics/types.go @@ -0,0 +1,37 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package analytics + +const ( + Domain = "domain" + WorkflowID = "workflowID" + RunID = "runID" + Identity = "identity" + IssueType = "issue_type" + DiagnosticsWfID = "diagnostics_workflowID" + DiagnosticsWfRunID = "diagnostics_workflow_runID" + SatisfactionFeedback = "satisfaction_feedback" + Environment = "environment" + DiagnosticsStartTime = "diagnostics_start_time" + DiagnosticsEndTime = "diagnostics_end_time" +) From d61f3c8a2c11388237a6b867605454943d2e9cfc Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Thu, 26 Sep 2024 15:04:45 +0200 Subject: [PATCH 2/2] address comments --- service/worker/diagnostics/analytics/emitter.go | 4 ++-- service/worker/diagnostics/analytics/emitter_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/service/worker/diagnostics/analytics/emitter.go b/service/worker/diagnostics/analytics/emitter.go index 9fb12b0d049..9f26a826b1b 100644 --- a/service/worker/diagnostics/analytics/emitter.go +++ b/service/worker/diagnostics/analytics/emitter.go @@ -37,11 +37,11 @@ type emitter struct { producer messaging.Producer } -type NewEmitterParams struct { +type EmitterParams struct { Producer messaging.Producer } -func NewEmitter(p NewEmitterParams) DataEmitter { +func NewEmitter(p EmitterParams) DataEmitter { return &emitter{ producer: p.Producer, } diff --git a/service/worker/diagnostics/analytics/emitter_test.go b/service/worker/diagnostics/analytics/emitter_test.go index e392a85e634..04f1a24bfd3 100644 --- a/service/worker/diagnostics/analytics/emitter_test.go +++ b/service/worker/diagnostics/analytics/emitter_test.go @@ -81,7 +81,7 @@ func Test__EmitUsageData(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { mockProducer := &mocks.KafkaProducer{} - emitter := NewEmitter(NewEmitterParams{ + emitter := NewEmitter(EmitterParams{ Producer: mockProducer, }) test.producerMockAffordance(mockProducer)