From 1aadff5e21fe2dd272c0288c389308741624fa0a Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Tue, 5 Nov 2024 11:24:03 +0100 Subject: [PATCH 1/4] Tests for admin_kafka_commands --- tools/cli/admin_kafka_commands.go | 3 +- tools/cli/admin_kafka_commands_test.go | 975 +++++++++++++++++++++++++ 2 files changed, 976 insertions(+), 2 deletions(-) create mode 100644 tools/cli/admin_kafka_commands_test.go diff --git a/tools/cli/admin_kafka_commands.go b/tools/cli/admin_kafka_commands.go index 176468fb46a..ef7f42826ae 100644 --- a/tools/cli/admin_kafka_commands.go +++ b/tools/cli/admin_kafka_commands.go @@ -180,7 +180,7 @@ func getOutputFile(outputFile string) (*os.File, error) { return f, nil } -func startReader(file *os.File, readerCh chan<- []byte) error { +func startReader(file io.Reader, readerCh chan<- []byte) error { defer close(readerCh) reader := bufio.NewReader(file) @@ -533,7 +533,6 @@ func decodeReplicationTask( task *types.ReplicationTask, serializer persistence.PayloadSerializer, ) ([]byte, error) { - switch task.GetTaskType() { case types.ReplicationTaskTypeHistoryV2: historyV2 := task.GetHistoryTaskV2Attributes() diff --git a/tools/cli/admin_kafka_commands_test.go b/tools/cli/admin_kafka_commands_test.go new file mode 100644 index 00000000000..6982ab484eb --- /dev/null +++ b/tools/cli/admin_kafka_commands_test.go @@ -0,0 +1,975 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package cli + +import ( + "bytes" + "context" + "errors" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/uber/cadence/.gen/go/indexer" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/client/admin" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/tools/cli/clitest" + "github.com/uber/cadence/tools/common/commoncli" + "go.uber.org/thriftrw/protocol/binary" + "go.uber.org/thriftrw/ptr" + "os" + "testing" +) + +func TestWriterChannel(t *testing.T) { + t.Run("replication task type", func(t *testing.T) { + ch := newWriterChannel(kafkaMessageTypeReplicationTask) + + assert.Equal(t, kafkaMessageType(0), ch.Type) + assert.Nil(t, ch.VisibilityMsgChannel) + assert.NotNil(t, ch.ReplicationTaskChannel) + assert.Equal(t, 10000, cap(ch.ReplicationTaskChannel)) + }) + + t.Run("msg visibility type", func(t *testing.T) { + ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) + + assert.Equal(t, kafkaMessageType(1), ch.Type) + assert.Nil(t, ch.ReplicationTaskChannel) + assert.NotNil(t, ch.VisibilityMsgChannel) + assert.Equal(t, 10000, cap(ch.VisibilityMsgChannel)) + }) + + t.Run("replication task type close channel", func(t *testing.T) { + ch := newWriterChannel(kafkaMessageTypeReplicationTask) + ch.ReplicationTaskChannel <- nil + _, ok := <-ch.ReplicationTaskChannel + assert.True(t, ok) + + ch.Close() + _, ok = <-ch.ReplicationTaskChannel + assert.False(t, ok) + }) + t.Run("msg visibility type close channel", func(t *testing.T) { + ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) + ch.VisibilityMsgChannel <- nil + _, ok := <-ch.VisibilityMsgChannel + assert.True(t, ok) + + ch.Close() + _, ok = <-ch.VisibilityMsgChannel + assert.False(t, ok) + }) +} + +func TestBuildFilterFn(t *testing.T) { + allTasks := []*types.ReplicationTask{ + nil, + {HistoryTaskV2Attributes: nil}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{RunID: "run-id-1"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{RunID: "run-id-2"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "my-workflow-id"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "my-workflow-id", RunID: "run-id-1"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "my-workflow-id", RunID: "run-id-2"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "alien-workflow-id"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "alien-workflow-id", RunID: "run-id-1"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "alien-workflow-id", RunID: "run-id-2"}}, + } + + tests := []struct { + name string + filterFn filterFn + expectedFilteredTaskList []*types.ReplicationTask + }{ + {"empty filter always return true", buildFilterFn("", ""), allTasks}, + { + "filter with workflowId only", + buildFilterFn("my-workflow-id", ""), + []*types.ReplicationTask{ + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "my-workflow-id"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "my-workflow-id", RunID: "run-id-1"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "my-workflow-id", RunID: "run-id-2"}}, + }, + }, + { + "filter with runId only", + buildFilterFn("", "run-id-1"), + []*types.ReplicationTask{ + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{RunID: "run-id-1"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "my-workflow-id", RunID: "run-id-1"}}, + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "alien-workflow-id", RunID: "run-id-1"}}, + }, + }, + { + "filter with workflow and runId", + buildFilterFn("my-workflow-id", "run-id-1"), + []*types.ReplicationTask{ + {HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{WorkflowID: "my-workflow-id", RunID: "run-id-1"}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var result []*types.ReplicationTask + + for _, task := range allTasks { + if tt.filterFn(task) { + result = append(result, task) + } + } + + assert.Equal(t, tt.expectedFilteredTaskList, result) + }) + } +} + +func TestBuildFilterFnForVisibility(t *testing.T) { + allMessages := []*indexer.Message{ + nil, + {}, + {RunID: ptr.String("run-id-1")}, + {RunID: ptr.String("run-id-2")}, + {WorkflowID: ptr.String("my-workflow-id")}, + {WorkflowID: ptr.String("my-workflow-id"), RunID: ptr.String("run-id-1")}, + {WorkflowID: ptr.String("my-workflow-id"), RunID: ptr.String("run-id-2")}, + {WorkflowID: ptr.String("alien-workflow-id")}, + {WorkflowID: ptr.String("alien-workflow-id"), RunID: ptr.String("run-id-1")}, + {WorkflowID: ptr.String("alien-workflow-id"), RunID: ptr.String("run-id-2")}, + } + + tests := []struct { + name string + filterFn filterFnForVisibility + expectedFilteredTaskList []*indexer.Message + }{ + {"empty filter always return true", buildFilterFnForVisibility("", ""), allMessages}, + { + "filter with workflowId only", + buildFilterFnForVisibility("my-workflow-id", ""), + []*indexer.Message{ + {WorkflowID: ptr.String("my-workflow-id")}, + {WorkflowID: ptr.String("my-workflow-id"), RunID: ptr.String("run-id-1")}, + {WorkflowID: ptr.String("my-workflow-id"), RunID: ptr.String("run-id-2")}, + }, + }, + { + "filter with runId only", + buildFilterFnForVisibility("", "run-id-1"), + []*indexer.Message{ + {RunID: ptr.String("run-id-1")}, + {WorkflowID: ptr.String("my-workflow-id"), RunID: ptr.String("run-id-1")}, + {WorkflowID: ptr.String("alien-workflow-id"), RunID: ptr.String("run-id-1")}, + }, + }, + { + "filter with workflow and runId", + buildFilterFnForVisibility("my-workflow-id", "run-id-1"), + []*indexer.Message{ + {WorkflowID: ptr.String("my-workflow-id"), RunID: ptr.String("run-id-1")}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var result []*indexer.Message + + for _, task := range allMessages { + if tt.filterFn(task) { + result = append(result, task) + } + } + + assert.Equal(t, tt.expectedFilteredTaskList, result) + }) + } +} + +func TestGetOutputFile(t *testing.T) { + t.Run("returns stdout if filename is empty", func(t *testing.T) { + file, err := getOutputFile("") + assert.NoError(t, err) + assert.Equal(t, os.Stdout, file) + }) + + t.Run("creates a file", func(t *testing.T) { + file, err := getOutputFile("test.txt") + defer func() { + if file != nil { + _ = file.Close() + _ = os.Remove("test.txt") + } + }() + + assert.NoError(t, err) + assert.NotNil(t, file) + + info, err := file.Stat() + + assert.Equal(t, "test.txt", info.Name()) + }) + + t.Run("fails to create a file", func(t *testing.T) { + file, err := getOutputFile("/non/existent/directory/test.txt") + + assert.EqualError(t, err, "failed to create output file open /non/existent/directory/test.txt: no such file or directory") + assert.Nil(t, file) + }) +} + +type FailingIOReader struct{} + +func (*FailingIOReader) Read(_ []byte) (n int, err error) { + return 0, errors.New("failed to read") +} + +func TestStartReader(t *testing.T) { + t.Run("starts reader successfully", func(t *testing.T) { + ch := make(chan []byte) + file := bytes.NewBuffer([]byte{1, 2, 3, 4}) + + go func() { + err := startReader(file, ch) + assert.NoError(t, err) + }() + + readBytes := <-ch + + assert.Equal(t, []byte{1, 2, 3, 4}, readBytes) + }) + + t.Run("reader stops successfully at the EOF", func(t *testing.T) { + ch := make(chan []byte) + file := bytes.NewBuffer([]byte{1, 2, 3, 4}) + + go func() { + <-ch + }() + + err := startReader(file, ch) + assert.NoError(t, err) + }) + + t.Run("returns error if couldn't read", func(t *testing.T) { + err := startReader(&FailingIOReader{}, make(chan<- []byte)) + assert.EqualError(t, err, "failed to read from reader: failed to read") + }) +} + +func TestStartParser(t *testing.T) { + t.Run("skipErrors ignores invalid data", func(t *testing.T) { + readerChannel := make(chan []byte) + writerChannel := newWriterChannel(kafkaMessageTypeReplicationTask) + skippedMessagesCount := int32(0) + + go func() { + readerChannel <- []byte{1, 2, 3, 4} + close(readerChannel) + }() + + err := startParser(readerChannel, writerChannel, true, &skippedMessagesCount) + assert.NoError(t, err) + assert.Equal(t, int32(1), skippedMessagesCount) + }) + + t.Run("without skipErrors the invalid data returns error", func(t *testing.T) { + readerChannel := make(chan []byte) + writerChannel := newWriterChannel(kafkaMessageTypeReplicationTask) + count := int32(0) + + go func() { + readerChannel <- []byte{1, 2, 3, 4} + close(readerChannel) + }() + + err := startParser(readerChannel, writerChannel, false, &count) + assert.EqualError(t, err, "error in splitBuffer: header not found, did you generate dump with -v") + }) + + t.Run("successfully start parser", func(t *testing.T) { + readerChannel := make(chan []byte) + writerChannel := newWriterChannel(kafkaMessageTypeReplicationTask) + count := int32(0) + + go func() { + readerChannel <- []byte("Partition: abc, Offset: 0, Key: 123") + close(readerChannel) + }() + + go func() { + err := startParser(readerChannel, writerChannel, false, &count) + assert.NoError(t, err) + }() + + result := <-writerChannel.ReplicationTaskChannel + assert.Equal(t, (*types.ReplicationTask)(nil), result) + }) +} + +func TestDeserializeMessage(t *testing.T) { + t.Run("skipErrors ignores malformed messages", func(t *testing.T) { + tasks, skipped, err := deserializeMessages([][]byte{{1, 2, 3, 4}}, true) + + assert.Equal(t, []*types.ReplicationTask(nil), tasks) + assert.Equal(t, int32(1), skipped) + assert.NoError(t, err) + }) + + t.Run("without skipErrors malformed messages return error", func(t *testing.T) { + tasks, skipped, err := deserializeMessages([][]byte{{1, 2, 3, 4}}, false) + + assert.Nil(t, tasks) + assert.Equal(t, int32(0), skipped) + assert.EqualError(t, err, "Input was malformedError: unexpected EOF") + }) + + t.Run("successful deserialization", func(t *testing.T) { + wireVal, _ := (&replicator.ReplicationTask{CreationTime: ptr.Int64(123)}).ToWire() + + encoded := bytes.NewBuffer([]byte{}) + _ = binary.Default.Encode(wireVal, encoded) + + tasks, skipped, err := deserializeMessages([][]byte{encoded.Bytes()}, false) + + assert.Equal(t, []*types.ReplicationTask{{}}, tasks) + assert.Equal(t, int32(0), skipped) + assert.NoError(t, err) + }) +} + +func TestDeserializeVisibilityMessage(t *testing.T) { + t.Run("skipErrors ignores malformed messages", func(t *testing.T) { + tasks, skipped, err := deserializeVisibilityMessages([][]byte{{1, 2, 3, 4}}, true) + + assert.Equal(t, []*indexer.Message(nil), tasks) + assert.Equal(t, int32(1), skipped) + assert.NoError(t, err) + }) + + t.Run("without skipErrors malformed messages return error", func(t *testing.T) { + tasks, skipped, err := deserializeVisibilityMessages([][]byte{{1, 2, 3, 4}}, false) + + assert.Nil(t, tasks) + assert.Equal(t, int32(0), skipped) + assert.EqualError(t, err, "Input was malformedError: unexpected EOF") + }) + + t.Run("successful deserialization", func(t *testing.T) { + wireVal, _ := (&indexer.Message{Version: ptr.Int64(123)}).ToWire() + + encoded := bytes.NewBuffer([]byte{}) + _ = binary.Default.Encode(wireVal, encoded) + + tasks, skipped, err := deserializeVisibilityMessages([][]byte{encoded.Bytes()}, false) + + assert.Equal(t, []*indexer.Message{{}}, tasks) + assert.Equal(t, int32(0), skipped) + assert.NoError(t, err) + }) +} + +func TestDecodeReplicationTask(t *testing.T) { + t.Run("decode replication task", func(t *testing.T) { + result, err := decodeReplicationTask(&types.ReplicationTask{TaskType: types.ReplicationTaskTypeHistory.Ptr()}, persistence.NewPayloadSerializer()) + expectedTaskJSONBytes := []byte("{\"taskType\":\"History\"}") + assert.NoError(t, err) + assert.Equal(t, expectedTaskJSONBytes, result) + }) + + t.Run("decode replication task type HistoryV2", func(t *testing.T) { + task := &types.ReplicationTask{ + TaskType: types.ReplicationTaskTypeHistoryV2.Ptr(), + HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{ + Events: &types.DataBlob{ + EncodingType: nil, + Data: nil, + }, + NewRunEvents: &types.DataBlob{ + EncodingType: nil, + Data: nil, + }, + }, + } + result, err := decodeReplicationTask(task, persistence.NewPayloadSerializer()) + expectedTaskJSONBytes := []byte("{\"Task\":{\"taskType\":\"HistoryV2\",\"historyTaskV2Attributes\":{}},\"Events\":null,\"NewRunEvents\":null}") + assert.NoError(t, err) + assert.Equal(t, expectedTaskJSONBytes, result) + }) + + t.Run("deserialize returns an error", func(t *testing.T) { + task := &types.ReplicationTask{ + TaskType: types.ReplicationTaskTypeHistoryV2.Ptr(), + HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{ + Events: &types.DataBlob{EncodingType: nil, Data: []byte{}}, + }, + } + + eventsBlob := persistence.NewDataBlobFromInternal(&types.DataBlob{EncodingType: nil, Data: []byte{}}) + serializerMock := persistence.NewMockPayloadSerializer(gomock.NewController(t)) + serializerMock.EXPECT().DeserializeBatchEvents(eventsBlob).Return(nil, assert.AnError).Times(1) + + result, err := decodeReplicationTask(task, serializerMock) + + assert.Equal(t, err, assert.AnError) + assert.Nil(t, result) + }) + + t.Run("deserialize returns an error for NewRunEvents", func(t *testing.T) { + task := &types.ReplicationTask{ + TaskType: types.ReplicationTaskTypeHistoryV2.Ptr(), + HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{ + Events: &types.DataBlob{EncodingType: nil, Data: []byte{}}, + NewRunEvents: &types.DataBlob{EncodingType: nil, Data: []byte{}}, + }, + } + + eventsBlob := persistence.NewDataBlobFromInternal(&types.DataBlob{EncodingType: nil, Data: []byte{}}) + newRunEventsBlob := persistence.NewDataBlobFromInternal(&types.DataBlob{EncodingType: nil, Data: []byte{}}) + serializerMock := persistence.NewMockPayloadSerializer(gomock.NewController(t)) + serializerMock.EXPECT().DeserializeBatchEvents(eventsBlob).Return(nil, nil).Times(1) + serializerMock.EXPECT().DeserializeBatchEvents(newRunEventsBlob).Return(nil, assert.AnError).Times(1) + + result, err := decodeReplicationTask(task, serializerMock) + + assert.Equal(t, err, assert.AnError) + assert.Nil(t, result) + }) +} + +func TestDoRereplicate(t *testing.T) { + t.Run("successful rereplication", func(t *testing.T) { + clientMock := admin.NewMockClient(gomock.NewController(t)) + clientMock.EXPECT().ResendReplicationTasks(context.Background(), &types.ResendReplicationTasksRequest{ + DomainID: "domainID", + WorkflowID: "wid", + RunID: "rid", + RemoteCluster: "sourceCluster", + EndEventID: ptr.Int64(1), + EndVersion: ptr.Int64(1), + }).Return(nil).Times(1) + + err := doRereplicate(context.Background(), "domainID", "wid", "rid", ptr.Int64(1), ptr.Int64(1), "sourceCluster", clientMock) + + assert.NoError(t, err) + }) + + t.Run("returns err", func(t *testing.T) { + clientMock := admin.NewMockClient(gomock.NewController(t)) + clientMock.EXPECT().ResendReplicationTasks(context.Background(), &types.ResendReplicationTasksRequest{ + DomainID: "domainID", + WorkflowID: "wid", + RunID: "rid", + RemoteCluster: "sourceCluster", + EndEventID: ptr.Int64(1), + EndVersion: ptr.Int64(1), + }).Return(assert.AnError).Times(1) + + err := doRereplicate(context.Background(), "domainID", "wid", "rid", ptr.Int64(1), ptr.Int64(1), "sourceCluster", clientMock) + + assert.EqualError(t, err, "Failed to resend ndc workflow: assert.AnError general error for testing") + }) +} + +func TestAdminRereplicate(t *testing.T) { + tests := []struct { + name string + args []clitest.CliArgument + err error + }{ + { + name: "success", + args: []clitest.CliArgument{ + clitest.StringArgument(FlagSourceCluster, "sourceCluster"), + clitest.IntArgument(FlagMaxEventID, 123), + clitest.IntArgument(FlagEndEventVersion, 321), + clitest.StringArgument(FlagDomainID, "domainID"), + clitest.StringArgument(FlagWorkflowID, "workflowID"), + clitest.StringArgument(FlagRunID, "runID"), + clitest.StringArgument(FlagContextTimeout, "10s"), + }, + err: nil, + }, + { + name: "source cluster is missing", + args: []clitest.CliArgument{ + clitest.IntArgument(FlagMaxEventID, 123), + clitest.IntArgument(FlagEndEventVersion, 321), + clitest.StringArgument(FlagDomainID, "domainID"), + clitest.StringArgument(FlagWorkflowID, "workflowID"), + clitest.StringArgument(FlagRunID, "runID"), + clitest.StringArgument(FlagContextTimeout, "10s"), + }, + err: commoncli.Problem("Required flag not found: ", errors.New("option source_cluster is required")), + }, + { + name: "domain ID is missing", + args: []clitest.CliArgument{ + clitest.StringArgument(FlagSourceCluster, "sourceCluster"), + clitest.IntArgument(FlagMaxEventID, 123), + clitest.IntArgument(FlagEndEventVersion, 321), + clitest.StringArgument(FlagWorkflowID, "workflowID"), + clitest.StringArgument(FlagRunID, "runID"), + clitest.StringArgument(FlagContextTimeout, "10s"), + }, + err: commoncli.Problem("Required flag not found: ", errors.New("option domain_id is required")), + }, + { + name: "workflow ID is missing", + args: []clitest.CliArgument{ + clitest.StringArgument(FlagSourceCluster, "sourceCluster"), + clitest.IntArgument(FlagMaxEventID, 123), + clitest.IntArgument(FlagEndEventVersion, 321), + clitest.StringArgument(FlagDomainID, "domainID"), + clitest.StringArgument(FlagRunID, "runID"), + clitest.StringArgument(FlagContextTimeout, "10s"), + }, + err: commoncli.Problem("Required flag not found: ", errors.New("option workflow_id is required")), + }, + { + name: "run ID is missing", + args: []clitest.CliArgument{ + clitest.StringArgument(FlagSourceCluster, "sourceCluster"), + clitest.IntArgument(FlagMaxEventID, 123), + clitest.IntArgument(FlagEndEventVersion, 321), + clitest.StringArgument(FlagDomainID, "domainID"), + clitest.StringArgument(FlagWorkflowID, "workflowID"), + clitest.StringArgument(FlagContextTimeout, "10s"), + }, + err: commoncli.Problem("Required flag not found: ", errors.New("option run_id is required")), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testData := newCLITestData(t) + testData.mockAdminClient.EXPECT().ResendReplicationTasks(gomock.Any(), &types.ResendReplicationTasksRequest{ + DomainID: "domainID", + WorkflowID: "workflowID", + RunID: "runID", + RemoteCluster: "sourceCluster", + EndEventID: ptr.Int64(124), + EndVersion: ptr.Int64(321), + }).Return(nil).MaxTimes(1) + + cliContext := clitest.NewCLIContext(t, testData.app, tt.args...) + + err := AdminRereplicate(cliContext) + assert.Equal(t, tt.err, err) + }) + } +} + +func TestParse(t *testing.T) { + t.Run("successful parse replication task", func(t *testing.T) { + wireVal, _ := (&replicator.ReplicationTask{CreationTime: ptr.Int64(123)}).ToWire() + encoded := bytes.NewBuffer([]byte{}) + _ = binary.Default.Encode(wireVal, encoded) + data := []byte("Partition: abc, Offset: 0, Key: 123") + data = append(data, preambleVersion0) + data = append(data, encoded.Bytes()...) + + skipped := ptr.Int32(0) + ch := newWriterChannel(kafkaMessageTypeReplicationTask) + + go func() { + err := parse(data, false, skipped, ch) + assert.NoError(t, err) + close(ch.ReplicationTaskChannel) + }() + + result := <-ch.ReplicationTaskChannel + + assert.NotNil(t, result) + }) + + t.Run("successful parse visibility task", func(t *testing.T) { + wireVal, _ := (&indexer.Message{Version: ptr.Int64(123)}).ToWire() + encoded := bytes.NewBuffer([]byte{}) + _ = binary.Default.Encode(wireVal, encoded) + data := []byte("Partition: abc, Offset: 0, Key: 123") + data = append(data, preambleVersion0) + data = append(data, encoded.Bytes()...) + + skipped := ptr.Int32(0) + ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) + + go func() { + err := parse(data, false, skipped, ch) + assert.NoError(t, err) + close(ch.VisibilityMsgChannel) + }() + + result := <-ch.VisibilityMsgChannel + + assert.NotNil(t, result) + }) + + t.Run("invalid replication task", func(t *testing.T) { + data := []byte("Partition: abc, Offset: 0, Key: 123") + data = append(data, preambleVersion0) + data = append(data, []byte{1, 2, 3, 4}...) + + skipped := ptr.Int32(0) + ch := newWriterChannel(kafkaMessageTypeReplicationTask) + + go func() { + err := parse(data, false, skipped, ch) + assert.EqualError(t, err, "parsing failed: Input was malformedError: unknown ttype Type(1)") + close(ch.ReplicationTaskChannel) + }() + + result := <-ch.ReplicationTaskChannel + + assert.Nil(t, result) + }) + + t.Run("invalid visibility task", func(t *testing.T) { + data := []byte("Partition: abc, Offset: 0, Key: 123") + data = append(data, preambleVersion0) + data = append(data, []byte{1, 2, 3, 4}...) + + skipped := ptr.Int32(0) + ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) + + go func() { + err := parse(data, false, skipped, ch) + assert.EqualError(t, err, "parsing failed: Input was malformedError: unknown ttype Type(1)") + close(ch.VisibilityMsgChannel) + }() + + result := <-ch.VisibilityMsgChannel + + assert.Nil(t, result) + }) +} + +func TestAdminKafkaParse(t *testing.T) { + t.Run("success", func(t *testing.T) { + defer func() { + _ = os.Remove("input.txt") + _ = os.Remove("output.txt") + }() + + wireVal, _ := (&replicator.ReplicationTask{CreationTime: ptr.Int64(123)}).ToWire() + encoded := bytes.NewBuffer([]byte{}) + _ = binary.Default.Encode(wireVal, encoded) + data := []byte("Partition: abc, Offset: 0, Key: 123") + data = append(data, preambleVersion0) + data = append(data, encoded.Bytes()...) + + err := os.WriteFile("input.txt", data, 0666) + if err != nil { + assert.FailNow(t, err.Error()) + } + + testData := newCLITestData(t) + + cliContext := clitest.NewCLIContext(t, testData.app, + clitest.StringArgument(FlagInputFile, "input.txt"), + clitest.StringArgument(FlagOutputFilename, "output.txt"), + ) + + err = AdminKafkaParse(cliContext) + + assert.NoError(t, err) + + result, err := os.ReadFile("output.txt") + + assert.Equal(t, "{\"creationTime\":123}\n", string(result)) + }) + + t.Run("can't open input file", func(t *testing.T) { + testData := newCLITestData(t) + + cliContext := clitest.NewCLIContext(t, testData.app, + clitest.StringArgument(FlagInputFile, "/path/do/not/exist/input.txt"), + clitest.StringArgument(FlagOutputFilename, "output.txt"), + ) + + err := AdminKafkaParse(cliContext) + + assert.EqualError(t, err, "Error in Admin kafka parse: : failed to open input file for reading: /path/do/not/exist/input.txt: open /path/do/not/exist/input.txt: no such file or directory") + }) + + t.Run("can't open output file", func(t *testing.T) { + defer func() { + _ = os.Remove("input.txt") + }() + + testData := newCLITestData(t) + + err := os.WriteFile("input.txt", []byte{}, 0666) + if err != nil { + assert.FailNow(t, err.Error()) + } + + cliContext := clitest.NewCLIContext(t, testData.app, + clitest.StringArgument(FlagInputFile, "input.txt"), + clitest.StringArgument(FlagOutputFilename, "/path/do/not/exist/output.txt"), + ) + + err = AdminKafkaParse(cliContext) + + assert.EqualError(t, err, "Error in Admin kafka parse: : failed to create output file open /path/do/not/exist/output.txt: no such file or directory") + }) +} + +func TestWriteVisibilityMessage(t *testing.T) { + t.Run("successful write", func(t *testing.T) { + file, _ := os.Create("output.txt") + defer func() { + _ = file.Close() + _ = os.Remove("output.txt") + }() + + ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) + skippedCount := ptr.Int32(0) + cliContext := clitest.NewCLIContext(t, newCLITestData(t).app) + + go func() { + ch.VisibilityMsgChannel <- &indexer.Message{ + DomainID: ptr.String("domain_id"), + WorkflowID: ptr.String("workflow_id"), + RunID: ptr.String("run_id"), + } + ch.Close() + }() + + err := writeVisibilityMessage(file, ch, skippedCount, false, false, cliContext) + + assert.NoError(t, err) + + result, _ := os.ReadFile("output.txt") + + assert.Equal(t, "{\"domainID\":\"domain_id\",\"workflowID\":\"workflow_id\",\"runID\":\"run_id\"}\n", string(result)) + }) + + t.Run("successful write with header mode", func(t *testing.T) { + file, _ := os.Create("output.txt") + defer func() { + _ = file.Close() + _ = os.Remove("output.txt") + }() + + ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) + skippedCount := ptr.Int32(0) + cliContext := clitest.NewCLIContext(t, newCLITestData(t).app) + + go func() { + ch.VisibilityMsgChannel <- &indexer.Message{ + DomainID: ptr.String("domain_id"), + WorkflowID: ptr.String("workflow_id"), + RunID: ptr.String("run_id"), + } + ch.Close() + }() + + err := writeVisibilityMessage(file, ch, skippedCount, false, true, cliContext) + + assert.NoError(t, err) + + result, _ := os.ReadFile("output.txt") + + assert.Equal(t, "domain_id, workflow_id, run_id, Index, 0\n", string(result)) + }) + + t.Run("returns error if can't write to the file", func(t *testing.T) { + defer func() { + _ = os.Remove("output.txt") + }() + + file, _ := os.Create("output.txt") + _ = file.Close() + + ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) + skippedCount := ptr.Int32(0) + cliContext := clitest.NewCLIContext(t, newCLITestData(t).app) + + go func() { + ch.VisibilityMsgChannel <- &indexer.Message{} + ch.Close() + }() + + err := writeVisibilityMessage(file, ch, skippedCount, false, true, cliContext) + + assert.EqualError(t, err, "failed to write to file: write output.txt: file already closed") + }) +} + +func TestWriteReplicationTask(t *testing.T) { + t.Run("successful write", func(t *testing.T) { + file, _ := os.Create("output.txt") + defer func() { + _ = file.Close() + _ = os.Remove("output.txt") + }() + + ch := newWriterChannel(kafkaMessageTypeReplicationTask) + skippedCount := ptr.Int32(0) + cliContext := clitest.NewCLIContext(t, newCLITestData(t).app) + + go func() { + ch.ReplicationTaskChannel <- &types.ReplicationTask{ + SourceTaskID: 123, + CreationTime: ptr.Int64(123), + } + ch.Close() + }() + + err := writeReplicationTask(file, ch, skippedCount, false, false, persistence.NewPayloadSerializer(), cliContext) + + assert.NoError(t, err) + + result, _ := os.ReadFile("output.txt") + + assert.Equal(t, "{\"sourceTaskId\":123,\"creationTime\":123}\n", string(result)) + }) + + t.Run("successful write with header mode", func(t *testing.T) { + file, _ := os.Create("output.txt") + defer func() { + _ = file.Close() + _ = os.Remove("output.txt") + }() + + ch := newWriterChannel(kafkaMessageTypeReplicationTask) + skippedCount := ptr.Int32(0) + cliContext := clitest.NewCLIContext(t, newCLITestData(t).app) + + go func() { + ch.ReplicationTaskChannel <- &types.ReplicationTask{ + SourceTaskID: 123, + CreationTime: ptr.Int64(123), + HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{ + DomainID: "domain_id", + WorkflowID: "workflow_id", + RunID: "run_id", + }, + } + ch.Close() + }() + + err := writeReplicationTask(file, ch, skippedCount, false, true, persistence.NewPayloadSerializer(), cliContext) + + assert.NoError(t, err) + + result, _ := os.ReadFile("output.txt") + + assert.Equal(t, "domain_id, workflow_id, run_id\n", string(result)) + }) + + t.Run("returns error if can't write to the file", func(t *testing.T) { + defer func() { + _ = os.Remove("output.txt") + }() + + file, _ := os.Create("output.txt") + _ = file.Close() + + ch := newWriterChannel(kafkaMessageTypeReplicationTask) + skippedCount := ptr.Int32(0) + cliContext := clitest.NewCLIContext(t, newCLITestData(t).app) + + go func() { + ch.ReplicationTaskChannel <- &types.ReplicationTask{} + ch.Close() + }() + + err := writeReplicationTask(file, ch, skippedCount, false, false, persistence.NewPayloadSerializer(), cliContext) + + assert.EqualError(t, err, "failed to write to file: write output.txt: file already closed") + }) +} + +func TestStartWriter(t *testing.T) { + t.Run("successful write to replication task", func(t *testing.T) { + file, _ := os.Create("output.txt") + defer func() { + _ = file.Close() + _ = os.Remove("output.txt") + }() + + ch := newWriterChannel(kafkaMessageTypeReplicationTask) + doneCh := make(chan struct{}) + skippedCount := ptr.Int32(0) + cliContext := clitest.NewCLIContext(t, newCLITestData(t).app) + + go func() { + ch.ReplicationTaskChannel <- &types.ReplicationTask{ + SourceTaskID: 123, + CreationTime: ptr.Int64(123), + } + ch.Close() + }() + + err := startWriter(file, ch, doneCh, skippedCount, persistence.NewPayloadSerializer(), cliContext) + + assert.NoError(t, err) + + result, _ := os.ReadFile("output.txt") + + assert.Equal(t, "{\"sourceTaskId\":123,\"creationTime\":123}\n", string(result)) + }) + + t.Run("successful write with visibility task", func(t *testing.T) { + file, _ := os.Create("output.txt") + defer func() { + _ = file.Close() + _ = os.Remove("output.txt") + }() + + ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) + doneCh := make(chan struct{}) + skippedCount := ptr.Int32(0) + cliContext := clitest.NewCLIContext(t, newCLITestData(t).app) + + go func() { + ch.VisibilityMsgChannel <- &indexer.Message{ + DomainID: ptr.String("domain_id"), + WorkflowID: ptr.String("workflow_id"), + RunID: ptr.String("run_id"), + } + ch.Close() + }() + + err := startWriter(file, ch, doneCh, skippedCount, persistence.NewPayloadSerializer(), cliContext) + + assert.NoError(t, err) + + result, _ := os.ReadFile("output.txt") + + assert.Equal(t, "{\"domainID\":\"domain_id\",\"workflowID\":\"workflow_id\",\"runID\":\"run_id\"}\n", string(result)) + }) + + t.Run("unsupported type", func(t *testing.T) { + ch := newWriterChannel(kafkaMessageType(321)) + doneCh := make(chan struct{}) + cliContext := clitest.NewCLIContext(t, newCLITestData(t).app) + + err := startWriter(nil, ch, doneCh, nil, persistence.NewPayloadSerializer(), cliContext) + + assert.EqualError(t, err, "unsupported message type: 321") + }) +} From ef310914a65107ffbaad70ae2f11c1071991e838 Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Tue, 5 Nov 2024 14:35:01 +0100 Subject: [PATCH 2/4] After review --- tools/cli/admin_kafka_commands_test.go | 116 +++++++++---------------- tools/cli/workflow_commands_test.go | 18 ++-- 2 files changed, 51 insertions(+), 83 deletions(-) diff --git a/tools/cli/admin_kafka_commands_test.go b/tools/cli/admin_kafka_commands_test.go index 6982ab484eb..c6085796aac 100644 --- a/tools/cli/admin_kafka_commands_test.go +++ b/tools/cli/admin_kafka_commands_test.go @@ -667,11 +667,6 @@ func TestParse(t *testing.T) { func TestAdminKafkaParse(t *testing.T) { t.Run("success", func(t *testing.T) { - defer func() { - _ = os.Remove("input.txt") - _ = os.Remove("output.txt") - }() - wireVal, _ := (&replicator.ReplicationTask{CreationTime: ptr.Int64(123)}).ToWire() encoded := bytes.NewBuffer([]byte{}) _ = binary.Default.Encode(wireVal, encoded) @@ -679,23 +674,21 @@ func TestAdminKafkaParse(t *testing.T) { data = append(data, preambleVersion0) data = append(data, encoded.Bytes()...) - err := os.WriteFile("input.txt", data, 0666) - if err != nil { - assert.FailNow(t, err.Error()) - } + inputFileName := createTempFileWithContent(t, string(data)) + outputFileName := createTempFileWithContent(t, "") testData := newCLITestData(t) cliContext := clitest.NewCLIContext(t, testData.app, - clitest.StringArgument(FlagInputFile, "input.txt"), - clitest.StringArgument(FlagOutputFilename, "output.txt"), + clitest.StringArgument(FlagInputFile, inputFileName), + clitest.StringArgument(FlagOutputFilename, outputFileName), ) - err = AdminKafkaParse(cliContext) + err := AdminKafkaParse(cliContext) assert.NoError(t, err) - result, err := os.ReadFile("output.txt") + result, err := os.ReadFile(outputFileName) assert.Equal(t, "{\"creationTime\":123}\n", string(result)) }) @@ -714,23 +707,16 @@ func TestAdminKafkaParse(t *testing.T) { }) t.Run("can't open output file", func(t *testing.T) { - defer func() { - _ = os.Remove("input.txt") - }() - testData := newCLITestData(t) - err := os.WriteFile("input.txt", []byte{}, 0666) - if err != nil { - assert.FailNow(t, err.Error()) - } + inputFileName := createTempFileWithContent(t, "") cliContext := clitest.NewCLIContext(t, testData.app, - clitest.StringArgument(FlagInputFile, "input.txt"), + clitest.StringArgument(FlagInputFile, inputFileName), clitest.StringArgument(FlagOutputFilename, "/path/do/not/exist/output.txt"), ) - err = AdminKafkaParse(cliContext) + err := AdminKafkaParse(cliContext) assert.EqualError(t, err, "Error in Admin kafka parse: : failed to create output file open /path/do/not/exist/output.txt: no such file or directory") }) @@ -738,11 +724,9 @@ func TestAdminKafkaParse(t *testing.T) { func TestWriteVisibilityMessage(t *testing.T) { t.Run("successful write", func(t *testing.T) { - file, _ := os.Create("output.txt") - defer func() { - _ = file.Close() - _ = os.Remove("output.txt") - }() + filename := createTempFileWithContent(t, "") + file, _ := os.OpenFile(filename, os.O_WRONLY, 0600) + defer file.Close() ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) skippedCount := ptr.Int32(0) @@ -761,17 +745,15 @@ func TestWriteVisibilityMessage(t *testing.T) { assert.NoError(t, err) - result, _ := os.ReadFile("output.txt") + result, _ := os.ReadFile(filename) assert.Equal(t, "{\"domainID\":\"domain_id\",\"workflowID\":\"workflow_id\",\"runID\":\"run_id\"}\n", string(result)) }) t.Run("successful write with header mode", func(t *testing.T) { - file, _ := os.Create("output.txt") - defer func() { - _ = file.Close() - _ = os.Remove("output.txt") - }() + filename := createTempFileWithContent(t, "") + file, _ := os.OpenFile(filename, os.O_WRONLY, 0600) + defer file.Close() ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) skippedCount := ptr.Int32(0) @@ -790,18 +772,14 @@ func TestWriteVisibilityMessage(t *testing.T) { assert.NoError(t, err) - result, _ := os.ReadFile("output.txt") + result, _ := os.ReadFile(filename) assert.Equal(t, "domain_id, workflow_id, run_id, Index, 0\n", string(result)) }) t.Run("returns error if can't write to the file", func(t *testing.T) { - defer func() { - _ = os.Remove("output.txt") - }() - - file, _ := os.Create("output.txt") - _ = file.Close() + filename := createTempFileWithContent(t, "") + file, _ := os.OpenFile(filename, os.O_WRONLY, 0600) ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) skippedCount := ptr.Int32(0) @@ -812,19 +790,19 @@ func TestWriteVisibilityMessage(t *testing.T) { ch.Close() }() + _ = file.Close() + err := writeVisibilityMessage(file, ch, skippedCount, false, true, cliContext) - assert.EqualError(t, err, "failed to write to file: write output.txt: file already closed") + assert.Errorf(t, err, "failed to write to file: write %v: file already closed", filename) }) } func TestWriteReplicationTask(t *testing.T) { t.Run("successful write", func(t *testing.T) { - file, _ := os.Create("output.txt") - defer func() { - _ = file.Close() - _ = os.Remove("output.txt") - }() + filename := createTempFileWithContent(t, "") + file, _ := os.OpenFile(filename, os.O_WRONLY, 0600) + defer file.Close() ch := newWriterChannel(kafkaMessageTypeReplicationTask) skippedCount := ptr.Int32(0) @@ -842,17 +820,15 @@ func TestWriteReplicationTask(t *testing.T) { assert.NoError(t, err) - result, _ := os.ReadFile("output.txt") + result, _ := os.ReadFile(filename) assert.Equal(t, "{\"sourceTaskId\":123,\"creationTime\":123}\n", string(result)) }) t.Run("successful write with header mode", func(t *testing.T) { - file, _ := os.Create("output.txt") - defer func() { - _ = file.Close() - _ = os.Remove("output.txt") - }() + filename := createTempFileWithContent(t, "") + file, _ := os.OpenFile(filename, os.O_WRONLY, 0600) + defer file.Close() ch := newWriterChannel(kafkaMessageTypeReplicationTask) skippedCount := ptr.Int32(0) @@ -875,18 +851,14 @@ func TestWriteReplicationTask(t *testing.T) { assert.NoError(t, err) - result, _ := os.ReadFile("output.txt") + result, _ := os.ReadFile(filename) assert.Equal(t, "domain_id, workflow_id, run_id\n", string(result)) }) t.Run("returns error if can't write to the file", func(t *testing.T) { - defer func() { - _ = os.Remove("output.txt") - }() - - file, _ := os.Create("output.txt") - _ = file.Close() + filename := createTempFileWithContent(t, "") + file, _ := os.OpenFile(filename, os.O_WRONLY, 0600) ch := newWriterChannel(kafkaMessageTypeReplicationTask) skippedCount := ptr.Int32(0) @@ -897,19 +869,19 @@ func TestWriteReplicationTask(t *testing.T) { ch.Close() }() + file.Close() + err := writeReplicationTask(file, ch, skippedCount, false, false, persistence.NewPayloadSerializer(), cliContext) - assert.EqualError(t, err, "failed to write to file: write output.txt: file already closed") + assert.Errorf(t, err, "failed to write to file: write %v: file already closed", filename) }) } func TestStartWriter(t *testing.T) { t.Run("successful write to replication task", func(t *testing.T) { - file, _ := os.Create("output.txt") - defer func() { - _ = file.Close() - _ = os.Remove("output.txt") - }() + filename := createTempFileWithContent(t, "") + file, _ := os.OpenFile(filename, os.O_WRONLY, 0600) + defer file.Close() ch := newWriterChannel(kafkaMessageTypeReplicationTask) doneCh := make(chan struct{}) @@ -928,17 +900,15 @@ func TestStartWriter(t *testing.T) { assert.NoError(t, err) - result, _ := os.ReadFile("output.txt") + result, _ := os.ReadFile(filename) assert.Equal(t, "{\"sourceTaskId\":123,\"creationTime\":123}\n", string(result)) }) t.Run("successful write with visibility task", func(t *testing.T) { - file, _ := os.Create("output.txt") - defer func() { - _ = file.Close() - _ = os.Remove("output.txt") - }() + filename := createTempFileWithContent(t, "") + file, _ := os.OpenFile(filename, os.O_WRONLY, 0600) + defer file.Close() ch := newWriterChannel(kafkaMessageTypeVisibilityMsg) doneCh := make(chan struct{}) @@ -958,7 +928,7 @@ func TestStartWriter(t *testing.T) { assert.NoError(t, err) - result, _ := os.ReadFile("output.txt") + result, _ := os.ReadFile(filename) assert.Equal(t, "{\"domainID\":\"domain_id\",\"workflowID\":\"workflow_id\",\"runID\":\"run_id\"}\n", string(result)) }) diff --git a/tools/cli/workflow_commands_test.go b/tools/cli/workflow_commands_test.go index 344bb705e8d..2cd949ae48c 100644 --- a/tools/cli/workflow_commands_test.go +++ b/tools/cli/workflow_commands_test.go @@ -1071,7 +1071,7 @@ func Test_DoReset_SkipCurrentCompleted(t *testing.T) { assert.NoError(t, err) } -func createTempFileWithContent(t *testing.T, content string) (string, func()) { +func createTempFileWithContent(t *testing.T, content string) string { tmpFile, err := os.CreateTemp("", "testfile") if err != nil { t.Fatalf("Failed to create temporary file: %v", err) @@ -1085,18 +1085,16 @@ func createTempFileWithContent(t *testing.T, content string) (string, func()) { tmpFileName := tmpFile.Name() tmpFile.Close() - // Return a cleanup function to delete the file after the test - cleanup := func() { - os.Remove(tmpFileName) - } + t.Cleanup(func() { + _ = os.Remove(tmpFileName) + }) - return tmpFileName, cleanup + return tmpFileName } func TestLoadWorkflowIDsFromFile_Success(t *testing.T) { content := "wid1,wid2,wid3\n\nwid4,wid5\nwid6\n" - fileName, cleanup := createTempFileWithContent(t, content) - defer cleanup() + fileName := createTempFileWithContent(t, content) workflowIDs, err := loadWorkflowIDsFromFile(fileName, ",") assert.NoError(t, err) @@ -1171,8 +1169,8 @@ func Test_ResetInBatch_WithFile(t *testing.T) { set.String("reset_type", "BadBinary", "reset_type") set.String("reset_bad_binary_checksum", "test-bad-binary-checksum", "reset_bad_binary_checksum") content := "wid1,wid2,wid3\n\nwid4,wid5\nwid6\n" - fileName, cleanup := createTempFileWithContent(t, content) - defer cleanup() + fileName := createTempFileWithContent(t, content) + set.String(FlagInputFile, fileName, "input file") set.String(FlagParallismDeprecated, "1", "input parallism") set.String(FlagParallelism, "2", "parallelism") From f9fcccf12fe9aa4b01993fc3c91a742ebb69126d Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Tue, 5 Nov 2024 19:52:03 +0100 Subject: [PATCH 3/4] after rebase --- tools/cli/admin_db_scan_command_test.go | 3 +-- tools/cli/workflow_commands_test.go | 6 ++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/tools/cli/admin_db_scan_command_test.go b/tools/cli/admin_db_scan_command_test.go index c2b24bc8f34..58a52e139e9 100644 --- a/tools/cli/admin_db_scan_command_test.go +++ b/tools/cli/admin_db_scan_command_test.go @@ -310,8 +310,7 @@ func expectWorkFlow(td *cliTestData, workflowID string) { func TestAdminDBScanUnsupportedWorkflow(t *testing.T) { td := newCLITestData(t) - outPutFile, cleanup := createTempFileWithContent(t, "") - defer cleanup() + outPutFile := createTempFileWithContent(t, "") expectShard(td, 123) expectShard(td, 124) diff --git a/tools/cli/workflow_commands_test.go b/tools/cli/workflow_commands_test.go index 6b30b1de792..08d88daf397 100644 --- a/tools/cli/workflow_commands_test.go +++ b/tools/cli/workflow_commands_test.go @@ -2046,8 +2046,7 @@ func Test_QueryWorkflowHelper_MissingFlags(t *testing.T) { assert.ErrorContains(t, err, fmt.Sprintf("%s is required", FlagWorkflowID)) content := "wid1,wid2,wid3\n\nwid4,wid5\nwid6\n" - fileName, cleanup := createTempFileWithContent(t, content) - defer cleanup() + fileName := createTempFileWithContent(t, content) ctx := clitest.NewCLIContext(t, app, clitest.StringArgument(FlagDomain, "test-domain"), clitest.StringArgument(FlagWorkflowID, "test-workflow-id"), clitest.StringArgument(FlagInputFile, fileName)) err = QueryWorkflowUsingQueryTypes(ctx) @@ -2057,8 +2056,7 @@ func Test_QueryWorkflowHelper_MissingFlags(t *testing.T) { func Test_ProcessJsonInputHelper(t *testing.T) { app := NewCliApp(&clientFactoryMock{}) content := "wid1,wid2,wid3\n\nwid4,wid5\nwid6\n" - fileName, cleanup := createTempFileWithContent(t, content) - defer cleanup() + fileName := createTempFileWithContent(t, content) ctx := clitest.NewCLIContext(t, app, clitest.StringArgument(FlagInputFile, fileName)) _, err := processJSONInputHelper(ctx, jsonTypeInput) From 6ef8eb0d81620a7dd551151e54e1a8b3ec9e78b3 Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Wed, 6 Nov 2024 15:48:06 +0100 Subject: [PATCH 4/4] Fixed formatting --- tools/cli/admin_kafka_commands_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tools/cli/admin_kafka_commands_test.go b/tools/cli/admin_kafka_commands_test.go index c6085796aac..0d516509667 100644 --- a/tools/cli/admin_kafka_commands_test.go +++ b/tools/cli/admin_kafka_commands_test.go @@ -26,8 +26,14 @@ import ( "bytes" "context" "errors" + "os" + "testing" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "go.uber.org/thriftrw/protocol/binary" + "go.uber.org/thriftrw/ptr" + "github.com/uber/cadence/.gen/go/indexer" "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/client/admin" @@ -35,10 +41,6 @@ import ( "github.com/uber/cadence/common/types" "github.com/uber/cadence/tools/cli/clitest" "github.com/uber/cadence/tools/common/commoncli" - "go.uber.org/thriftrw/protocol/binary" - "go.uber.org/thriftrw/ptr" - "os" - "testing" ) func TestWriterChannel(t *testing.T) {