From f41a2082bc4e6a955b6998f78c160aa76ff79139 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 12 Sep 2023 20:30:42 +0530 Subject: [PATCH] chore: support generic rules to have routers drain events --- .../batchrouter/batchrouterBenchmark_test.go | 7 ++-- router/batchrouter/handle.go | 4 +-- router/batchrouter/handle_async.go | 2 +- router/batchrouter/types.go | 16 --------- router/batchrouter/worker.go | 7 ++-- router/handle.go | 6 ++-- router/types.go | 18 ---------- router/utils/utils.go | 36 ++++++++++++++++++- router/worker.go | 4 +-- 9 files changed, 51 insertions(+), 49 deletions(-) diff --git a/router/batchrouter/batchrouterBenchmark_test.go b/router/batchrouter/batchrouterBenchmark_test.go index bbdf525abfa..3ee662d35fa 100644 --- a/router/batchrouter/batchrouterBenchmark_test.go +++ b/router/batchrouter/batchrouterBenchmark_test.go @@ -16,6 +16,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/logger" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/jobsdb" + routerutils "github.com/rudderlabs/rudder-server/router/utils" ) func Benchmark_GetStorageDateFormat(b *testing.B) { @@ -47,7 +48,7 @@ func Benchmark_JSONUnmarshal(b *testing.B) { for i := 0; i < b.N; i++ { var jobs []*jobsdb.JobT for i := 0; i < 100; i++ { - params := JobParameters{ + params := routerutils.JobParameters{ EventName: "test", EventType: "track", MessageID: uuid.New().String(), @@ -64,7 +65,7 @@ func Benchmark_JSONUnmarshal(b *testing.B) { g := errgroup.Group{} g.Go(func() error { - params := JobParameters{ + params := routerutils.JobParameters{ EventName: "test", EventType: "track", MessageID: uuid.New().String(), @@ -84,7 +85,7 @@ func Benchmark_JSONUnmarshal(b *testing.B) { }) g.Go(func() error { for i := range jobs { - var params JobParameters + var params routerutils.JobParameters _ = json.Unmarshal(jobs[i].Parameters, ¶ms) } diff --git a/router/batchrouter/handle.go b/router/batchrouter/handle.go index fb78c69bf4e..a8c83319633 100644 --- a/router/batchrouter/handle.go +++ b/router/batchrouter/handle.go @@ -505,7 +505,7 @@ func (brt *Handle) pingWarehouse(batchJobs *BatchedJobs, output UploadResult) (e } } } - var sampleParameters JobParameters + var sampleParameters router_utils.JobParameters err = json.Unmarshal(batchJobs.Jobs[0].Parameters, &sampleParameters) if err != nil { brt.logger.Error("Unmarshal of job parameters failed in postToWarehouse function. ", string(batchJobs.Jobs[0].Parameters)) @@ -619,7 +619,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err errorResp = []byte(errorRespString) } - var parameters JobParameters + var parameters router_utils.JobParameters err = json.Unmarshal(job.Parameters, ¶meters) if err != nil { brt.logger.Error("Unmarshal of job parameters failed. ", string(job.Parameters)) diff --git a/router/batchrouter/handle_async.go b/router/batchrouter/handle_async.go index 79dca5841c5..48406b84f12 100644 --- a/router/batchrouter/handle_async.go +++ b/router/batchrouter/handle_async.go @@ -512,7 +512,7 @@ func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersM statusDetailsMap := make(map[string]*utilTypes.StatusDetail) routerWorkspaceJobStatusCount := make(map[string]int) for _, status := range statusList { - var parameters JobParameters + var parameters routerutils.JobParameters err := json.Unmarshal(parametersMap[status.JobID], ¶meters) if err != nil { brt.logger.Error("Unmarshal of job parameters failed. ", string(parametersMap[status.JobID])) diff --git a/router/batchrouter/types.go b/router/batchrouter/types.go index 22da0d1224a..0592344ee13 100644 --- a/router/batchrouter/types.go +++ b/router/batchrouter/types.go @@ -13,22 +13,6 @@ type Connection struct { Destination backendconfig.DestinationT } -type JobParameters struct { - SourceID string `json:"source_id"` - DestinationID string `json:"destination_id"` - ReceivedAt string `json:"received_at"` - TransformAt string `json:"transform_at"` - SourceTaskRunID string `json:"source_task_run_id"` - SourceJobID string `json:"source_job_id"` - SourceJobRunID string `json:"source_job_run_id"` - SourceDefinitionID string `json:"source_definition_id"` - DestinationDefinitionID string `json:"destination_definition_id"` - SourceCategory string `json:"source_category"` - EventName string `json:"event_name"` - EventType string `json:"event_type"` - MessageID string `json:"message_id"` -} - type DestinationJobs struct { destWithSources router_utils.DestinationWithSources jobs []*jobsdb.JobT diff --git a/router/batchrouter/worker.go b/router/batchrouter/worker.go index c991590b594..18c3648f9d7 100644 --- a/router/batchrouter/worker.go +++ b/router/batchrouter/worker.go @@ -8,7 +8,6 @@ import ( "time" "github.com/samber/lo" - "github.com/tidwall/gjson" "golang.org/x/exp/slices" "github.com/rudderlabs/rudder-go-kit/logger" @@ -75,7 +74,9 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin jobsBySource := make(map[string][]*jobsdb.JobT) for _, job := range destinationJobs.jobs { - if drain, reason := router_utils.ToBeDrained(job, destWithSources.Destination.ID, brt.toAbortDestinationIDs.Load(), destinationsMap); drain { + var params router_utils.JobParameters + _ = json.Unmarshal(job.Parameters, ¶ms) + if drain, reason := router_utils.ToBeDrained(job, params, router_utils.AbortConfigs{ToAbortDestinationIDs: brt.toAbortDestinationIDs.Load()}, destinationsMap); drain { status := jobsdb.JobStatusT{ JobID: job.JobID, AttemptNum: job.LastJobStatus.AttemptNum + 1, @@ -105,7 +106,7 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin drainStatsbyDest[destWithSources.Destination.ID].Reasons = append(drainStatsbyDest[destWithSources.Destination.ID].Reasons, reason) } } else { - sourceID := gjson.GetBytes(job.Parameters, "source_id").String() + sourceID := params.SourceID if _, ok := jobsBySource[sourceID]; !ok { jobsBySource[sourceID] = []*jobsdb.JobT{} } diff --git a/router/handle.go b/router/handle.go index 0cac437e6d4..de356f9eef6 100644 --- a/router/handle.go +++ b/router/handle.go @@ -288,7 +288,7 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) { var statusList []*jobsdb.JobStatusT var routerAbortedJobs []*jobsdb.JobT for _, workerJobStatus := range *workerJobStatuses { - var parameters JobParameters + var parameters routerutils.JobParameters err := json.Unmarshal(workerJobStatus.job.Parameters, ¶meters) if err != nil { rt.logger.Error("Unmarshal of job parameters failed. ", string(workerJobStatus.job.Parameters)) @@ -476,7 +476,7 @@ func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrd return nil, types.ErrContextCancelled } - var parameters JobParameters + var parameters routerutils.JobParameters if err := json.Unmarshal(job.Parameters, ¶meters); err != nil { rt.logger.Errorf(`[%v Router] :: Unmarshalling parameters failed with the error %v . Returning nil worker`, err) return nil, types.ErrParamsUnmarshal @@ -547,7 +547,7 @@ func (*Handle) shouldBackoff(job *jobsdb.JobT) bool { return job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0 } -func (rt *Handle) shouldThrottle(job *jobsdb.JobT, parameters JobParameters) (limited bool) { +func (rt *Handle) shouldThrottle(job *jobsdb.JobT, parameters routerutils.JobParameters) (limited bool) { if rt.throttlerFactory == nil { // throttlerFactory could be nil when throttling is disabled or misconfigured. // in case of misconfiguration, logging errors are emitted. diff --git a/router/types.go b/router/types.go index cb1ae018e61..ff39c71cf62 100644 --- a/router/types.go +++ b/router/types.go @@ -11,24 +11,6 @@ import ( "github.com/rudderlabs/rudder-server/utils/misc" ) -// JobParameters struct holds source id and destination id of a job -type JobParameters struct { - SourceID string `json:"source_id"` - DestinationID string `json:"destination_id"` - ReceivedAt string `json:"received_at"` - TransformAt string `json:"transform_at"` - SourceTaskRunID string `json:"source_task_run_id"` - SourceJobID string `json:"source_job_id"` - SourceJobRunID string `json:"source_job_run_id"` - SourceDefinitionID string `json:"source_definition_id"` - DestinationDefinitionID string `json:"destination_definition_id"` - SourceCategory string `json:"source_category"` - RecordID interface{} `json:"record_id"` - MessageID string `json:"message_id"` - WorkspaceID string `json:"workspaceId"` - RudderAccountID string `json:"rudderAccountId"` -} - type workerJobStatus struct { userID string worker *worker diff --git a/router/utils/utils.go b/router/utils/utils.go index 845e711c0d3..f5b6b5f4b0a 100644 --- a/router/utils/utils.go +++ b/router/utils/utils.go @@ -47,9 +47,37 @@ func getRetentionTimeForDestination(destID string) time.Duration { return config.GetDurationVar(720, time.Hour, "Router."+destID+".jobRetention", "Router.jobRetention") } -func ToBeDrained(job *jobsdb.JobT, destID, toAbortDestinationIDs string, destinationsMap map[string]*DestinationWithSources) (bool, string) { +type JobParameters struct { + SourceID string `json:"source_id"` + DestinationID string `json:"destination_id"` + ReceivedAt string `json:"received_at"` + TransformAt string `json:"transform_at"` + SourceTaskRunID string `json:"source_task_run_id"` + SourceJobID string `json:"source_job_id"` + SourceJobRunID string `json:"source_job_run_id"` + SourceDefinitionID string `json:"source_definition_id"` + DestinationDefinitionID string `json:"destination_definition_id"` + SourceCategory string `json:"source_category"` + RecordID interface{} `json:"record_id"` + MessageID string `json:"message_id"` + EventName string `json:"event_name"` + EventType string `json:"event_type"` + WorkspaceID string `json:"workspaceId"` + RudderAccountID string `json:"rudderAccountId"` +} + +type AbortConfigs struct { + ToAbortDestinationIDs string + // source + // connection + // jobRunID + // ... +} + +func ToBeDrained(job *jobsdb.JobT, jobParams JobParameters, abortConfig AbortConfigs, destinationsMap map[string]*DestinationWithSources) (bool, string) { // drain if job is older than the destination's retention time createdAt := job.CreatedAt + destID := jobParams.DestinationID if time.Since(createdAt) > getRetentionTimeForDestination(destID) { return true, "job expired" } @@ -58,6 +86,7 @@ func ToBeDrained(job *jobsdb.JobT, destID, toAbortDestinationIDs string, destina return true, "destination is disabled" } + toAbortDestinationIDs := abortConfig.ToAbortDestinationIDs if toAbortDestinationIDs != "" { abortIDs := strings.Split(toAbortDestinationIDs, ",") if slices.Contains(abortIDs, destID) { @@ -65,6 +94,11 @@ func ToBeDrained(job *jobsdb.JobT, destID, toAbortDestinationIDs string, destina } } + // toAbortSourceIDS + // toAbortConnection(sourceID, destID) + // toAbortJobRunID/TaskRunID + // ... + return false, "" } diff --git a/router/worker.go b/router/worker.go index 704ca64f06b..f6e3ee51f50 100644 --- a/router/worker.go +++ b/router/worker.go @@ -84,12 +84,12 @@ func (w *worker) workLoop() { job := message.job userID := job.UserID - var parameters JobParameters + var parameters routerutils.JobParameters if err := json.Unmarshal(job.Parameters, ¶meters); err != nil { panic(fmt.Errorf("unmarshalling of job parameters failed for job %d (%s): %w", job.JobID, string(job.Parameters), err)) } w.rt.destinationsMapMu.RLock() - abort, abortReason := routerutils.ToBeDrained(job, parameters.DestinationID, w.rt.reloadableConfig.toAbortDestinationIDs.Load(), w.rt.destinationsMap) + abort, abortReason := routerutils.ToBeDrained(job, parameters, routerutils.AbortConfigs{ToAbortDestinationIDs: w.rt.reloadableConfig.toAbortDestinationIDs.Load()}, w.rt.destinationsMap) abortTag := abortReason w.rt.destinationsMapMu.RUnlock() if !abort {