diff --git a/processor/processor_test.go b/processor/processor_test.go index 156cd3db63..09806b3c64 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -2,6 +2,7 @@ package processor import ( "context" + "database/sql" "encoding/json" "fmt" "net/http" @@ -1555,7 +1556,157 @@ var _ = Describe("Processor", Ordered, func() { } }) - processorSetupAndAssertJobHandling(processor, c) + c.MockReportingI.EXPECT(). + Report(gomock.Any(), gomock.Any()). + Do(func(metrics []*types.PUReportedMetric, _ *sql.Tx) { + Expect(len(metrics)).To(Equal(9)) + assertReport(metrics[0], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{ + SourceID: "enabled-source-only-ut", + }, + PUDetails: types.PUDetails{ + InPU: "", + PU: types.GATEWAY, + TerminalPU: false, + InitialPU: true, + }, + StatusDetail: &types.StatusDetail{ + Status: jobsdb.Succeeded.State, + StatusCode: 0, + Count: 5, + }, + }) + assertReport(metrics[1], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{ + SourceID: "enabled-source-only-ut", + }, + PUDetails: types.PUDetails{ + InPU: types.GATEWAY, + PU: types.DESTINATION_FILTER, + TerminalPU: false, + InitialPU: false, + }, + StatusDetail: &types.StatusDetail{ + Status: jobsdb.Succeeded.State, + StatusCode: 0, + Count: 3, + }, + }) + assertReport(metrics[2], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{ + SourceID: "enabled-source-only-ut", + }, + PUDetails: types.PUDetails{ + InPU: types.GATEWAY, + PU: types.DESTINATION_FILTER, + TerminalPU: false, + InitialPU: false, + }, + StatusDetail: &types.StatusDetail{ + Status: types.DiffStatus, + StatusCode: 0, + Count: -2, + }, + }) + assertReport(metrics[3], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{}, + PUDetails: types.PUDetails{ + InPU: types.DESTINATION_FILTER, + PU: types.USER_TRANSFORMER, + TerminalPU: false, + InitialPU: false, + }, + StatusDetail: &types.StatusDetail{ + Status: jobsdb.Succeeded.State, + StatusCode: 0, + Count: 3, + }, + }) + assertReport(metrics[4], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{ + SourceID: "enabled-source-only-ut", + DestinationID: "enabled-destination-b", + TransformationVersionID: "transformation-version-id", + }, + PUDetails: types.PUDetails{ + InPU: types.DESTINATION_FILTER, + PU: types.USER_TRANSFORMER, + TerminalPU: false, + InitialPU: false, + }, + StatusDetail: &types.StatusDetail{ + Status: types.DiffStatus, + StatusCode: 0, + Count: -3, + }, + }) + assertReport(metrics[5], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{ + SourceID: "enabled-source-only-ut", + DestinationID: "enabled-destination-b", + }, + PUDetails: types.PUDetails{ + InPU: types.USER_TRANSFORMER, + PU: types.EVENT_FILTER, + TerminalPU: false, + InitialPU: false, + }, + StatusDetail: &types.StatusDetail{ + Status: jobsdb.Succeeded.State, + StatusCode: 200, + Count: 3, + }, + }) + assertReport(metrics[6], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{}, + PUDetails: types.PUDetails{ + InPU: types.USER_TRANSFORMER, + PU: types.EVENT_FILTER, + TerminalPU: false, + InitialPU: false, + }, + StatusDetail: &types.StatusDetail{ + Status: types.DiffStatus, + StatusCode: 0, + Count: -3, + }, + }) + assertReport(metrics[7], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{ + SourceID: "source-from-transformer", + DestinationID: "destination-from-transformer", + }, + PUDetails: types.PUDetails{ + InPU: types.EVENT_FILTER, + PU: types.DEST_TRANSFORMER, + TerminalPU: false, + InitialPU: false, + }, + StatusDetail: &types.StatusDetail{ + Status: jobsdb.Succeeded.State, + StatusCode: 0, + Count: 2, + }, + }) + assertReport(metrics[8], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{ + SourceID: "enabled-source-only-ut", + DestinationID: "enabled-destination-b", + }, + PUDetails: types.PUDetails{ + InPU: types.EVENT_FILTER, + PU: types.DEST_TRANSFORMER, + TerminalPU: false, + InitialPU: false, + }, + StatusDetail: &types.StatusDetail{ + Status: types.DiffStatus, + StatusCode: 0, + Count: -3, + }, + }) + }).Times(1) + processorSetupAndAssertJobHandlingWithReporting(processor, c) }) It("should process unprocessed jobs to destination without user transformation with enabled Dedup", func() { @@ -1657,9 +1808,68 @@ var _ = Describe("Processor", Ordered, func() { _ = f(jobsdb.EmptyUpdateSafeTx()) }).Return(nil).Times(1) c.mockGatewayJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Len(len(unprocessedJobsList)), gatewayCustomVal, nil).Times(1).After(callStoreRouter) + c.MockReportingI.EXPECT(). + Report(gomock.Any(), gomock.Any()). + Do(func(metrics []*types.PUReportedMetric, _ *sql.Tx) { + // three stages to expect here: + // 1. gateway + // 2. destination_filter + // 3, event_fitler + // no destination_transform here + Expect(metrics).To(HaveLen(3)) + assertReport(metrics[0], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{ + SourceID: "enabled-source", + }, + PUDetails: types.PUDetails{ + InPU: "", + PU: types.GATEWAY, + TerminalPU: false, + InitialPU: true, + }, + StatusDetail: &types.StatusDetail{ + Status: jobsdb.Succeeded.State, + StatusCode: 0, + Count: 3, + }, + }) + assertReport(metrics[1], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{ + SourceID: "enabled-source", + }, + PUDetails: types.PUDetails{ + InPU: types.GATEWAY, + PU: types.DESTINATION_FILTER, + TerminalPU: false, + InitialPU: false, + }, + StatusDetail: &types.StatusDetail{ + Status: jobsdb.Succeeded.State, + StatusCode: 0, + Count: 3, + }, + }) + assertReport(metrics[2], &types.PUReportedMetric{ + ConnectionDetails: types.ConnectionDetails{ + SourceID: "enabled-source", + DestinationID: "enabled-destination-c", + }, + PUDetails: types.PUDetails{ + InPU: types.DESTINATION_FILTER, + PU: types.EVENT_FILTER, + TerminalPU: false, + InitialPU: false, + }, + StatusDetail: &types.StatusDetail{ + Status: jobsdb.Succeeded.State, + StatusCode: 200, + Count: 3, + }, + }) + }).Times(1) processor := prepareHandle(NewHandle(mockTransformer)) - Setup(processor, c, true, false) + Setup(processor, c, true, true) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -3552,6 +3762,15 @@ func processorSetupAndAssertJobHandling(processor *Handle, c *testContext) { handlePendingGatewayJobs(processor) } +func processorSetupAndAssertJobHandlingWithReporting(processor *Handle, c *testContext) { + Setup(processor, c, false, true) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) + GinkgoT().Log("Processor setup and init done") + handlePendingGatewayJobs(processor) +} + func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool) { setDisableDedupFeature(processor, enableDedup) processor.Setup( @@ -3922,3 +4141,21 @@ func TestStoreMessageMerge(t *testing.T) { require.Len(t, merged.dedupKeys, 2, "dedup keys should have 2 elements") require.Equal(t, merged.totalEvents, 2, "total events should be 2") } + +func assertReport(metric, expectedMetric *types.PUReportedMetric) { + Expect(metric.ConnectionDetails.SourceID).To(Equal(expectedMetric.ConnectionDetails.SourceID)) + Expect(metric.ConnectionDetails.DestinationID).To(Equal(expectedMetric.ConnectionDetails.DestinationID)) + Expect(metric.ConnectionDetails.SourceJobID).To(Equal(expectedMetric.ConnectionDetails.SourceJobID)) + Expect(metric.ConnectionDetails.SourceJobRunID).To(Equal(expectedMetric.ConnectionDetails.SourceJobRunID)) + Expect(metric.ConnectionDetails.SourceTaskRunID).To(Equal(expectedMetric.ConnectionDetails.SourceTaskRunID)) + Expect(metric.ConnectionDetails.TransformationVersionID).To(Equal(expectedMetric.ConnectionDetails.TransformationVersionID)) + Expect(metric.PUDetails.InPU).To(Equal(expectedMetric.PUDetails.InPU)) + Expect(metric.PUDetails.PU).To(Equal(expectedMetric.PUDetails.PU)) + Expect(metric.PUDetails.TerminalPU).To(Equal(expectedMetric.PUDetails.TerminalPU)) + Expect(metric.PUDetails.InitialPU).To(Equal(expectedMetric.PUDetails.InitialPU)) + Expect(metric.StatusDetail.Status).To(Equal(expectedMetric.StatusDetail.Status)) + Expect(metric.StatusDetail.StatusCode).To(Equal(expectedMetric.StatusDetail.StatusCode)) + Expect(metric.StatusDetail.Count).To(Equal(expectedMetric.StatusDetail.Count)) + // Expect(metric.StatusDetail.SampleResponse).To(Equal(expectedMetric.StatusDetail.SampleResponse)) + // Expect(metric.StatusDetail.SampleEvent).To(Equal(expectedMetric.StatusDetail.SampleEvent)) +}